Workflow Execution Engine
This page is the authoritative specification of WorkflowExecutionService in libs/infra/execution/src/lib/workflow-execution.service.ts.
Scope
The engine consumes a validated DAG of WorkflowNode + WorkflowEdge plus a WorkflowExecutionContext and returns a finalised set of workflow_node_results. It is intentionally pure: side effects (DB writes, moderation, storage) are injected as callbacks or gateways.
Input types
interface WorkflowNode {
id: string
lens_id: string
version_id: string | null
label: string | null
ordinal: number
config: WorkflowNodeConfig
}
interface WorkflowEdge {
id: string
source_node_id: string
target_node_id: string
source_output_key: string // default 'output'
target_param_label: string
merge_strategy?: MergeStrategy // optional override; falls back to target node's config.merge
}
interface WorkflowExecutionContext {
getLensTemplate(lensId: string, versionId: string | null): Promise<string>
getVersionContracts(versionId: string): Promise<{ input: LensInputContract | null; output: LensOutputContract | null }>
updateNodeResult(nodeId: string, patch: Partial<WorkflowNodeResultRecord>): Promise<void>
onPartialOutput?(nodeId: string, partial: { text: string }): void
moderation?: ModerationGateway
signal?: AbortSignal
rootInputs: Record<string, unknown>
}Algorithm
- Cycle detection. Run Kahn against
edges. If the processed count is less thannodes.length, throwCycleDetectedand abort before any side effect. - In-degree map. Build
indegree[nodeId] = number of incoming edges. - Initial wave. Any node with
indegree === 0is scheduled. - Wave execution. For each wave: a. Build each node's rendered inputs by resolving
rootInputsand upstreamresults. Apply the target node's merge strategy when two or more edges share atarget_param_label. b. Validate inputs againstinput_contract— on failure, markfailedwitherror_message: "input_contract_violation". c. Callmoderation.check('input')ifconfig.moderation ∈ {'input','both'}. d. Call the provider viaIExecutionProvider.execute(orIStreamingExecutionProvider.streamwhen available) underPromise.race(provider, timeout(config.timeout_ms)), cancelled byctx.signal. e. Callmoderation.check('output')ifconfig.moderation ∈ {'output','both'}. f. Validate output againstoutput_contract. If validation fails, either retry or markfailedwitherror_message: "output_contract_violation"depending onconfig.retry.retry_on. g. Persist the envelope viaupdateNodeResult. Stream partials viaonPartialOutputwhen the provider supports it. - Next wave. After all wave members settle, decrement the in-degree of each dependent. A dependent becomes runnable when its in-degree hits zero and its own
on_parent_failurepolicy permits running given the aggregated parent statuses. - Termination. When no runnable nodes remain, compute the run-level status:
completedif every terminal leaf iscompletedorskipped,cancelledif any node iscancelledand none arefailed,failedotherwise.
Node status lifecycle
Failure propagation policy
Each node's config.on_parent_failure controls what happens when any upstream parent is failed or cancelled:
| Value | Behaviour |
|---|---|
skip | Node becomes skipped. Its dependents see a parent status of skipped and follow their own policy. |
propagate | Node becomes failed with error_message: "upstream_failure" and no provider call is made. |
substitute_default | Missing upstream substitutions default to ''. Node runs as if the parent returned an empty string. This reproduces pre-hardening behaviour and should be used only for legacy compatibility. |
Retry and backoff
Per-node:
config.retry = {
attempts: number // total attempts including the first; default 1 (no retry)
backoff_ms: number // base delay; default 500
max_backoff_ms?: number // cap; default 8000
retry_on: RetryCause[] // ['timeout', 'provider_error', 'rate_limit', 'contract_violated']
}Backoff formula (jittered exponential):
delay = min(max_backoff_ms, backoff_ms * 2^(attempt - 1)) * (0.5 + Math.random() * 0.5)Retries reset the AbortController subscription but respect the parent ctx.signal — if it aborts mid-retry, the node becomes cancelled.
Timeout
config.timeout_ms wraps the provider call in Promise.race. Timeout is a retryable cause (retry_on: ['timeout']).
Merge strategies
Merge strategy is resolved in this priority order for each inbound edge group:
- The edge's own
merge_strategy(if set). - The target node's
config.merge. - Default
last_write_wins.
| Strategy | Semantics |
|---|---|
last_write_wins | The last edge in edges array order replaces the value. Template sees a single string. |
concat | All values joined with \n\n in edge-array order. |
array | Values wrapped as JSON.stringify(values) and rendered into the template. |
json_object | Values collected as { [sourceNodeLabel]: value } and JSON-stringified. |
Cancellation
- UI triggers
stopExecution→AbortController.abort(). - All in-flight
fetchcalls abort. - Pending nodes never enter
running;markRemainingCancelledsets their status tocancelled. - Coordinated with
stopRunwhich flipsworkflow_runs.status = 'cancelled'.
Streaming
Providers that implement IStreamingExecutionProvider:
interface IStreamingExecutionProvider {
stream(modelId: string, input: ExecutionInput, signal?: AbortSignal): AsyncIterable<StreamChunk>
}
type StreamChunk =
| { type: 'partial'; text: string }
| { type: 'media'; url: string; mime: string }
| { type: 'final'; envelope: NodeOutputEnvelope }The engine calls onPartialOutput on every partial chunk (throttled client-side) so WorkflowProgressView in libs/features/workflows/src/lib/components/WorkflowProgressView.tsx can reflect live progress.
Canonical SSE event taxonomy
The enum WorkflowEventType in libs/types/src/lib/workflow-events.types.ts is the single source of truth for every event that crosses the engine / transport / client boundary. Engine-side snake_case names (node_completed, node_started, …) are translated through mapEngineEventToSse() before persistence; never emit a raw string.
Run-scoped events: run.started, run.status.changed, run.completed, run.failed, run.cancelled, run.timed_out, run.recovered, heartbeat. Node-scoped events: node.queued, node.started, node.stream.delta, node.log, node.retried, node.completed, node.failed, node.cancelled, node.skipped, node.timed_out, node.blocked, node.invalidated. Gate events: moderation.flagged, contract.violated.
Every frame is wrapped in a WorkflowSseEventEnvelope:
interface WorkflowSseEventEnvelope<TPayload = Record<string, unknown>> {
eventId: number // monotonic per runId (advisory lock in fn_append_workflow_run_event)
sequence?: number // mirror of eventId for transport-agnostic consumers
type: WorkflowEventType
runId: string
workflowId?: string
timestamp: string // ISO 8601
correlation?: { traceId?: string; parentEventId?: number; wave?: number; phase?: 'engine' | 'transport' | 'client' }
payload: TPayload
}node.stream.delta ordering
Every partial output emits a node.stream.delta event with a deltaIndex that is monotonic per nodeId. The client reducer in useWorkflowRun.ts keeps a per-(runId, nodeId) high-water-mark and drops any delta whose deltaIndex is <= seen, so duplicate frames delivered after a reconnect are idempotent. A missing delta index (i.e. a gap in the sequence) is a signal the client should tear down the connection and re-open with afterEventId=<last_event_id>.
Reconnection & Last-Event-ID fallback
The SSE route (deployed as a Cloudflare Worker) resumes from a cursor in two ways, in this order of precedence:
- Query string:
GET /execute/workflows/:runId/events?afterEventId=<n> - Request header:
Last-Event-ID: <n>
EventSource implementations (browser native EventSource and most polyfills) automatically set Last-Event-ID from the last id: frame they received. The ?afterEventId= form is provided for fetch()-based clients that do not control the header, and for explicit manual resumes from the UI.
Headers written on every SSE response:
Content-Type: text/event-stream; charset=utf-8
Cache-Control: no-cache, no-transform
Connection: keep-alive
X-Accel-Buffering: noX-Accel-Buffering: no defeats reverse-proxy / CDN buffering that would otherwise stall frames. Transfer-Encoding: chunked is implicit when the response body is a ReadableStream; never set it explicitly (Cloudflare Workers rejects).
Observability
Every status transition emits an execution.execution_tags row:
INSERT INTO execution.execution_tags (run_id, node_id, tag, metadata)
VALUES ($1, $2, $3, $4);Canonical tags:
| Tag | Context metadata |
|---|---|
node_started | { attempt, wave } |
node_retried | { attempt, cause, delay_ms } |
moderation_flagged | `{ phase: 'input' |
contract_violated | { phase, errors[] } |
timed_out | { timeout_ms } |
node_failed | { error_code, error_message } |
node_completed | { duration_ms, input_tokens, output_tokens } |
node_cancelled | {} |
Idempotency
workflow_runs carries idempotency_key (Phase 6). The client derives it from sha256(workflow_id || rootInputsCanonicalJson) so that retrying the same submission does not double-charge or double-write.