igon7 Engine - Orchestration
Guide complet de l'orchestration de workflows.
🎯 Stratégies d'Orchestration
1. Orchestration Centralisée
import { Orchestrator } from '@igon7/core'
const orchestrator = new Orchestrator({
strategy: 'centralized',
maxConcurrentWorkflows: 100,
taskQueue: 'igon7-orchestrator'
})
// Tous les workflows passent par l'orchestrateur central
await orchestrator.start()
2. Orchestration Distribuée
const orchestrator = new Orchestrator({
strategy: 'distributed',
nodes: ['node-1', 'node-2', 'node-3'],
loadBalancing: 'round-robin',
healthCheck: {
interval: 5000,
timeout: 3000
}
})
3. Orchestration Hiérarchique
const parent = new Orchestrator({ name: 'parent' })
const child1 = new Orchestrator({ name: 'child-1', parent })
const child2 = new Orchestrator({ name: 'child-2', parent })
// Parent délègue aux enfants
📡 Communication Inter-Workflow
Events
const workflow = new WorkflowBuilder('event-emitter')
.addNode('emit-event', async (ctx) => {
ctx.emitEvent('data-processed', {
itemId: '123',
timestamp: Date.now()
})
})
.build()
// Écouter les événements
const listener = new WorkflowBuilder('event-listener')
.onEvent('data-processed', async (event) => {
console.log('Event received:', event)
})
.addNode('process', async (ctx) => {
const event = ctx.getEvent('data-processed')
// ...
})
.build()
Signals
// Envoyer un signal
await executor.sendSignal(executionId, 'cancel', {
reason: 'User requested'
})
// Recevoir un signal
const workflow = new WorkflowBuilder('signal-handler')
.addNode('wait-for-signal', async (ctx) => {
const signal = await ctx.waitForSignal('continue', {
timeout: 60000
})
return signal
})
.build()
Shared State
const workflow = new WorkflowBuilder('shared-state')
.addNode('write-state', async (ctx) => {
await ctx.storage.set('counter', 42)
})
.addNode('read-state', async (ctx) => {
const counter = await ctx.storage.get('counter')
return { counter }
}, {
dependsOn: ['write-state']
})
.build()
🔄 Gestion des Erreurs
Retry Strategies
// Exponential backoff
const retryConfig = {
maxAttempts: 5,
initialInterval: 1000,
backoffCoefficient: 2,
maxInterval: 30000,
}
// Linear backoff
const linearRetry = {
maxAttempts: 3,
initialInterval: 5000,
backoffCoefficient: 1, // Pas d'augmentation
}
// Custom retry logic
const customRetry = {
maxAttempts: 3,
onRetry: (error, attempt, context) => {
if (error.code === 'RATE_LIMIT') {
return 60000 // Attendre 1 minute pour rate limit
}
return Math.pow(2, attempt) * 1000
}
}
Error Boundaries
const withBoundary = new WorkflowBuilder('error-boundary')
.addNode('risky-operation', async () => {
// Peut échouer
})
.addNode('error-handler', async (ctx, error) => {
// Gère l'erreur
return { recovered: true }
}, {
onError: true,
forNodes: ['risky-operation'],
errorTypes: [NetworkError, TimeoutError]
})
.build()
Fallback Patterns
const withFallback = new WorkflowBuilder('fallback')
.addNode('primary', primaryTask, {
onError: 'fallback',
fallback: 'secondary'
})
.addNode('secondary', secondaryTask)
.build()
// Fallback en cascade
const cascadeFallback = new WorkflowBuilder('cascade')
.addNode('primary', primaryTask, {
fallback: 'fallback-1'
})
.addNode('fallback-1', fallback1Task, {
fallback: 'fallback-2'
})
.addNode('fallback-2', fallback2Task)
.build()
⚡ Optimisation des Performances
Parallelization
// Maximize parallelism
const parallel = new WorkflowBuilder('max-parallel')
.parallel(
Array.from({ length: 100 }, (_, i) => `task-${i}`),
Array.from({ length: 100 }, (_, i) => createTask(i)),
{
concurrency: 10 // Max 10 en parallèle
}
)
.build()
Batching
const batched = new WorkflowBuilder('batched')
.addNode('process-batch', async (ctx) => {
const items = ctx.input.items
const batchSize = 10
const batches = chunk(items, batchSize)
const results = []
for (const batch of batches) {
const result = await processBatch(batch)
results.push(result)
}
return results
})
.build()
Caching
const cached = new WorkflowBuilder('cached')
.addNode('expensive-operation', async () => {
return await expensiveComputation()
}, {
cache: {
enabled: true,
ttl: 3600000,
keyFn: (input) => hash(input),
store: 'redis'
}
})
.build()
📊 Monitoring en Temps Réel
Dashboard
const monitor = new WorkflowMonitor({
cloudApiEndpoint: 'http://localhost:3000',
apiKey: process.env.MONITOR_API_KEY
})
// Dashboard URL
const dashboardUrl = monitor.getDashboardURL(executionId)
console.log('Dashboard:', dashboardUrl)
// Métriques en temps réel
monitor.on('node.completed', (event) => {
console.log(`Node ${event.nodeId} completed in ${event.duration}ms`)
})
monitor.on('workflow.failed', (event) => {
console.error(`Workflow failed:`, event.error)
sendAlert(event.error)
})
Alerting
const alerting = new AlertingSystem({
rules: [
{
name: 'High error rate',
condition: 'error_rate > 0.1',
window: '5m',
action: 'send_alert'
},
{
name: 'Long duration',
condition: 'duration > 300000',
action: 'send_warning'
},
{
name: 'Workflow stuck',
condition: 'no_progress > 10m',
action: 'escalate'
}
],
channels: {
send_alert: 'slack',
send_warning: 'email',
escalate: ['slack', 'pagerduty']
}
})
🔐 Sécurité
Permissions
const secureWorkflow = new WorkflowBuilder('secure')
.addNode('sensitive-operation', async () => {
// Nécessite des permissions
}, {
permissions: {
requiredRoles: ['admin', 'operator'],
allowedUsers: ['alice', 'bob'],
requireMFA: true
}
})
.build()
Secrets
const withSecrets = new WorkflowBuilder('with-secrets')
.addNode('use-secret', async (ctx) => {
const apiKey = await ctx.getSecret('api-key')
const dbPassword = await ctx.getSecret('db-password')
return await callAPI(apiKey, dbPassword)
})
.build()
Audit Logging
const auditedWorkflow = new WorkflowBuilder('audited')
.addNode('action', async (ctx) => {
ctx.audit({
action: 'DATA_ACCESS',
resource: 'user-data',
details: { userId: '123' }
})
})
.build()
Version : 1.0.0
Patterns : 15+
Stratégies : 3