Skip to main content

igon7 Engine - Orchestration

Guide complet de l'orchestration de workflows.


🎯 Stratégies d'Orchestration

1. Orchestration Centralisée

import { Orchestrator } from '@igon7/core'

const orchestrator = new Orchestrator({
strategy: 'centralized',
maxConcurrentWorkflows: 100,
taskQueue: 'igon7-orchestrator'
})

// Tous les workflows passent par l'orchestrateur central
await orchestrator.start()

2. Orchestration Distribuée

const orchestrator = new Orchestrator({
strategy: 'distributed',
nodes: ['node-1', 'node-2', 'node-3'],
loadBalancing: 'round-robin',
healthCheck: {
interval: 5000,
timeout: 3000
}
})

3. Orchestration Hiérarchique

const parent = new Orchestrator({ name: 'parent' })
const child1 = new Orchestrator({ name: 'child-1', parent })
const child2 = new Orchestrator({ name: 'child-2', parent })

// Parent délègue aux enfants

📡 Communication Inter-Workflow

Events

const workflow = new WorkflowBuilder('event-emitter')
.addNode('emit-event', async (ctx) => {
ctx.emitEvent('data-processed', {
itemId: '123',
timestamp: Date.now()
})
})
.build()

// Écouter les événements
const listener = new WorkflowBuilder('event-listener')
.onEvent('data-processed', async (event) => {
console.log('Event received:', event)
})
.addNode('process', async (ctx) => {
const event = ctx.getEvent('data-processed')
// ...
})
.build()

Signals

// Envoyer un signal
await executor.sendSignal(executionId, 'cancel', {
reason: 'User requested'
})

// Recevoir un signal
const workflow = new WorkflowBuilder('signal-handler')
.addNode('wait-for-signal', async (ctx) => {
const signal = await ctx.waitForSignal('continue', {
timeout: 60000
})
return signal
})
.build()

Shared State

const workflow = new WorkflowBuilder('shared-state')
.addNode('write-state', async (ctx) => {
await ctx.storage.set('counter', 42)
})
.addNode('read-state', async (ctx) => {
const counter = await ctx.storage.get('counter')
return { counter }
}, {
dependsOn: ['write-state']
})
.build()

🔄 Gestion des Erreurs

Retry Strategies

// Exponential backoff
const retryConfig = {
maxAttempts: 5,
initialInterval: 1000,
backoffCoefficient: 2,
maxInterval: 30000,
}

// Linear backoff
const linearRetry = {
maxAttempts: 3,
initialInterval: 5000,
backoffCoefficient: 1, // Pas d'augmentation
}

// Custom retry logic
const customRetry = {
maxAttempts: 3,
onRetry: (error, attempt, context) => {
if (error.code === 'RATE_LIMIT') {
return 60000 // Attendre 1 minute pour rate limit
}
return Math.pow(2, attempt) * 1000
}
}

Error Boundaries

const withBoundary = new WorkflowBuilder('error-boundary')
.addNode('risky-operation', async () => {
// Peut échouer
})
.addNode('error-handler', async (ctx, error) => {
// Gère l'erreur
return { recovered: true }
}, {
onError: true,
forNodes: ['risky-operation'],
errorTypes: [NetworkError, TimeoutError]
})
.build()

Fallback Patterns

const withFallback = new WorkflowBuilder('fallback')
.addNode('primary', primaryTask, {
onError: 'fallback',
fallback: 'secondary'
})
.addNode('secondary', secondaryTask)
.build()

// Fallback en cascade
const cascadeFallback = new WorkflowBuilder('cascade')
.addNode('primary', primaryTask, {
fallback: 'fallback-1'
})
.addNode('fallback-1', fallback1Task, {
fallback: 'fallback-2'
})
.addNode('fallback-2', fallback2Task)
.build()

⚡ Optimisation des Performances

Parallelization

// Maximize parallelism
const parallel = new WorkflowBuilder('max-parallel')
.parallel(
Array.from({ length: 100 }, (_, i) => `task-${i}`),
Array.from({ length: 100 }, (_, i) => createTask(i)),
{
concurrency: 10 // Max 10 en parallèle
}
)
.build()

Batching

const batched = new WorkflowBuilder('batched')
.addNode('process-batch', async (ctx) => {
const items = ctx.input.items
const batchSize = 10

const batches = chunk(items, batchSize)
const results = []

for (const batch of batches) {
const result = await processBatch(batch)
results.push(result)
}

return results
})
.build()

Caching

const cached = new WorkflowBuilder('cached')
.addNode('expensive-operation', async () => {
return await expensiveComputation()
}, {
cache: {
enabled: true,
ttl: 3600000,
keyFn: (input) => hash(input),
store: 'redis'
}
})
.build()

📊 Monitoring en Temps Réel

Dashboard

const monitor = new WorkflowMonitor({
cloudApiEndpoint: 'http://localhost:3000',
apiKey: process.env.MONITOR_API_KEY
})

// Dashboard URL
const dashboardUrl = monitor.getDashboardURL(executionId)
console.log('Dashboard:', dashboardUrl)

// Métriques en temps réel
monitor.on('node.completed', (event) => {
console.log(`Node ${event.nodeId} completed in ${event.duration}ms`)
})

monitor.on('workflow.failed', (event) => {
console.error(`Workflow failed:`, event.error)
sendAlert(event.error)
})

Alerting

const alerting = new AlertingSystem({
rules: [
{
name: 'High error rate',
condition: 'error_rate > 0.1',
window: '5m',
action: 'send_alert'
},
{
name: 'Long duration',
condition: 'duration > 300000',
action: 'send_warning'
},
{
name: 'Workflow stuck',
condition: 'no_progress > 10m',
action: 'escalate'
}
],
channels: {
send_alert: 'slack',
send_warning: 'email',
escalate: ['slack', 'pagerduty']
}
})

🔐 Sécurité

Permissions

const secureWorkflow = new WorkflowBuilder('secure')
.addNode('sensitive-operation', async () => {
// Nécessite des permissions
}, {
permissions: {
requiredRoles: ['admin', 'operator'],
allowedUsers: ['alice', 'bob'],
requireMFA: true
}
})
.build()

Secrets

const withSecrets = new WorkflowBuilder('with-secrets')
.addNode('use-secret', async (ctx) => {
const apiKey = await ctx.getSecret('api-key')
const dbPassword = await ctx.getSecret('db-password')

return await callAPI(apiKey, dbPassword)
})
.build()

Audit Logging

const auditedWorkflow = new WorkflowBuilder('audited')
.addNode('action', async (ctx) => {
ctx.audit({
action: 'DATA_ACCESS',
resource: 'user-data',
details: { userId: '123' }
})
})
.build()

Version : 1.0.0
Patterns : 15+
Stratégies : 3