Skip to main content

Genesis Temporal - Services

Documentation des services Temporal.


🏗️ Architecture des services


🌐 Frontend Service

// service/frontend/service.go
type Service struct {
config *Config
workflowHandler *WorkflowHandler
healthHandler *HealthHandler
nexusHandler *NexusHandler
server *grpc.Server
}

type Config struct {
GRPCPort int
HTTPPort int
MaxConnections int
TLS TLSConfig
KeepAlive KeepAliveConfig
}

func (s *Service) Start() error {
grpcServer := grpc.NewServer(
grpc.MaxConcurrentStreams(uint32(s.config.MaxConnections)),
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: s.config.KeepAlive.MaxConnectionIdle,
MaxConnectionAge: s.config.KeepAlive.MaxConnectionAge,
Time: s.config.KeepAlive.Time,
Timeout: s.config.KeepAlive.Timeout,
}),
)

workflowservice.RegisterWorkflowServiceServer(grpcServer, s.workflowHandler)
healthgrpc.RegisterHealthServer(grpcServer, s.healthHandler)

lis, err := net.Listen("tcp", fmt.Sprintf(":%d", s.config.GRPCPort))
if err != nil {
return err
}

s.server = grpcServer
return grpcServer.Serve(lis)
}

📜 History Service

// service/history/history_engine.go
type HistoryEngine struct {
shardController ShardController
stateMachine *StateMachine
eventStore *EventStore
chasmEngine *chasm.Engine
}

func (e *HistoryEngine) ScheduleWorkflowTask(
ctx context.Context,
request *ScheduleRequest,
) error {
shard, err := e.shardController.GetShard(request.WorkflowID)
if err != nil {
return err
}

if err := shard.AcquireLock(ctx); err != nil {
return err
}
defer shard.ReleaseLock()

mutableState, err := e.stateMachine.Load(ctx, shard.ID, request.WorkflowID)
if err != nil {
return err
}

event := &history.HistoryEvent{
EventId: mutableState.GetNextEventID(),
EventType: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED,
}

if err := e.eventStore.AppendEvents(ctx, shard.ID, event); err != nil {
return err
}

mutableState.AddEvent(event)
return e.stateMachine.Persist(ctx, mutableState)
}

📋 Matching Service

// service/matching/matching_engine.go
type MatchingEngine struct {
taskQueueManager TaskQueueManager
pollerRegistry *PollerRegistry
forwarder *TaskForwarder
}

func (e *MatchingEngine) PollWorkflowTaskQueue(
ctx context.Context,
request *workflowservice.PollWorkflowTaskQueueRequest,
) (*workflowservice.PollWorkflowTaskQueueResponse, error) {
queue, err := e.taskQueueManager.GetQueue(
request.GetNamespaceId(),
request.GetTaskQueue().GetName(),
enums.TASK_QUEUE_TYPE_WORKFLOW,
)
if err != nil {
return nil, err
}

task, err := queue.Poll(ctx, request.GetPollerId(), e.config.LongPollExpirationInterval)
if err != nil {
return nil, err
}

if task == nil {
return &workflowservice.PollWorkflowTaskQueueResponse{}, nil
}

return &workflowservice.PollWorkflowTaskQueueResponse{
TaskToken: task.Token,
WorkflowExecution: task.WorkflowExecution,
WorkflowType: task.WorkflowType,
History: task.History,
}, nil
}

👷 Worker Service

// service/worker/worker.go
type Worker struct {
replicator *Replicator
batcher *Batcher
scanner *Scanner
}

func (w *Worker) Start() error {
go w.replicator.Run()
go w.batcher.Run()
go w.scanner.Run()
return nil
}

func (w *Worker) Stop() {
w.replicator.Stop()
w.batcher.Stop()
w.scanner.Stop()
}

📊 Métriques par service

ServiceMétriques clés
FrontendRequests/s, Latency P99, Connections
HistoryShard acquisitions, Events appended
MatchingTask queue size, Poller count
WorkerActive replicators, Batch size

Version : 1.0.0