Workflow Engine Architecture
Current Implementation Status
The workflow engine supports Lens-type nodes via the execution provider registry (OpenAI, Anthropic, Google, Mistral, Ollama, Fal.ai). DAG validation (cycle detection, binding completeness, wave calculation) is fully implemented. Multi-node orchestration with inter-node data flow is available for Lens-to-Lens chains. Non-Lens node types (Logic, Data, Storage, Communication, etc.) are defined in the UI catalog but do not yet have runtime execution implementations.
DAG Execution Model
Every workflow is a directed acyclic graph (DAG). Nodes represent discrete units of work; edges encode execution order and data dependencies. WorkflowExecutionService (libs/infra/execution/src/lib/workflow-execution.service.ts) is the central orchestrator — it resolves the traversal order, fans out to execution providers, and aggregates status.
Edges carry optional condition predicates. When a condition evaluates to false, the downstream node transitions to skipped rather than pending.
Data flow and source_output_key
Bindings merge upstream output_data / envelope fields into the target lens template labels. source_output_key supports dotted paths into structured outputs (for example data.summary on a research node). Resolution is shared with the prompt resolver via resolveMappedOutputValue in libs/infra/execution/src/lib/output-path.ts.
When a rendered label value looks like an image URL, the engine attaches it as a vision ExecutionInput.attachments entry so text providers that support multimodal messages can consume upstream image nodes without ad-hoc string hacks.
Per-node providers (mixed DAGs)
WorkflowExecutionContext.resolveExecutionProvider selects an IExecutionProvider per node (for example Claude for research, Fal for fal-ai/... image models). The browser hook caches providers by model key; the scheduled worker resolves from workflow_nodes.config.model_id with the same mechanism. If the hook is omitted, the service falls back to the single provider passed to new WorkflowExecutionService(provider).
Node Status State Machine
pending → running → completed
→ failed
→ skipped- pending — node is queued; upstream deps not yet resolved.
- running — node has been claimed by the engine.
- completed — execution resolved without error.
- failed — execution threw or returned an error result; eligible for retry.
- skipped — upstream condition was false; treated as a no-op terminal.
Execution Provider Registry
Node type → implementation is resolved at runtime via getExecutionProvider() from libs/infra/execution/src/lib/execution.registry.ts. Each provider implements IExecutionProvider and is registered once at bootstrap. Unknown node types cause an immediate failed transition with a registry-miss error.
onNodeStatusChange Contract
The orchestrator emits progress events through this callback on every state transition:
type NodeStatusChange = {
nodeId: string
status: NodeStatus
error?: string
attemptNumber: number
}
onNodeStatusChange: (change: NodeStatusChange) => voidConsumers use this to stream progress to the UI or persist audit trails into lenses.workflow_run_events.
Retry Policy
Retry behaviour is declared per node in NodeRuntime (libs/infra/execution/src/lib/node-runtime.ts):
interface RetryPolicy {
maxRetries: number // 0 = no retry
backoffMs: number // flat delay between attempts
}WARNING
There is no exponential backoff by default. If a node hits a rate-limited external API, set backoffMs conservatively or implement jitter inside the provider.
fn_dispatch_scheduled_workflows_with_approval — Critical Path
Invoked by pg_cron every minute. Execution order inside the dispatch loop:
- Kill switch — reads
platform.system_flags.autonomy_dispatch_enabled; aborts immediately iffalse. - Budget check — checks
agents.quota_snapshots.credits_spentvsagents.policies.spending_limit_credits; setslast_dispatch_status = 'budget_exceeded'and skips if exceeded. - Cycle check — calls
lenses.fn_workflow_has_cycle(); setslast_dispatch_status = 'validation_failed'if cyclic. - Overlap check — ensures no in-flight run exists for the same schedule.
- Dispatch — inserts
lenses.workflow_runs+lenses.workflow_node_results, optionally aagents.team_runsrow with approval gate.
TIP
Steps 1–4 are guard clauses that leave no partial state. A schedule either fully dispatches or produces a structured rejection in last_dispatch_status.
Worker Loop
apps/worker/src/worker/scheduled-workflow-worker.ts follows a claim → execute → complete cycle:
- Claim — calls
fn_claim_scheduled_workflow_run; receives an exclusive run ID ornullif nothing is ready. - Execute — calls
WorkflowExecutionService.execute()with the claimed run's context and node graph. - Complete — calls
fn_complete_scheduled_workflow_runwith final status and output.
The worker polls at a configurable interval (WORKER_POLL_INTERVAL_MS) and exits cleanly on SIGTERM.
Dead-Letter / Error Handling
Unhandled throws inside the worker are caught at the top level. The worker calls fn_complete_scheduled_workflow_run with status: 'failed' and the serialised error message. The run lands in the failed partition of lenses.workflow_runs, queryable for ops triage.
WARNING
A crashed worker process (OOM, SIGKILL) leaves the run in running. A background cleanup job reaps stale running rows older than the configured run timeout and marks them failed.