Skip to content

Execution flow

The complete path of control and data when you kick off a pipeline. Every arrow here is a real await point in the code.

sequenceDiagram
actor Op as Operator
participant CLI as factflow CLI
participant SRV as factflow-server
participant ES as ExecutionService
participant DB as PostgreSQL
participant Mgr as OrchestratorManager
participant Orch as PipelineOrchestrator
participant Proc as ReactiveRouteProcessor
participant Q as ExecutionScopedQueue
Op->>CLI: factflow config run CFG_ID
CLI->>SRV: POST /api/v1/executions
SRV->>ES: create(config_id)
ES->>DB: INSERT pipeline_execution (status=running, config_snapshot)
ES->>Mgr: start(execution)
Mgr->>Orch: new PipelineOrchestrator(snapshot)
Orch->>Proc: for each route in snapshot.routes
Proc->>Q: subscribe(exec:EXEC_ID:route.in)
Proc-->>Orch: processor ready
Orch->>Orch: wait all processors ready
Orch->>Q: publish init_message to first route
SRV-->>CLI: 201 execution_id + status=running
From `factflow config run` to first message published. Note: processors must subscribe BEFORE init_message publishes — messages published to unsubscribed queues are lost.

Key invariants in this phase:

  • config_snapshot is frozen. The row in PostgreSQL captures the full YAML at this moment. Subsequent config edits don’t affect this execution.
  • Processors subscribe before init. Ordering matters: if you publish to a queue nobody is subscribed to yet, the message is dropped (for topics) or buffered temporarily (for queues).
  • Manager caps concurrency. OrchestratorManager enforces max_concurrent_executions. Over-limit raises TooManyConcurrentExecutionsError.

Once routes are subscribed and the init message is out, messages flow until every route drains.

sequenceDiagram
participant Q as ExecutionScopedQueue
participant Proc as ReactiveRouteProcessor
participant Adapter as PipelineAdapter
participant Store as StorageProtocol
participant Lin as LineageService
participant Q2 as Next-route queue
Q->>Proc: deliver message
Proc->>Proc: build PipelineContext (correlation_id, ...)
Proc->>Lin: record pending lineage row
Proc->>Adapter: await process(ctx)
Adapter->>Store: write artefact (optional)
Adapter->>Lin: record completion/failure
Adapter-->>Proc: AdapterResult or raise
alt adapter succeeded
  Proc->>Q2: publish emitted messages
  Proc-->>Q: return ACKNOWLEDGED
else adapter raised
  Lin-->>Lin: mark row failed
  Proc-->>Q: return FAILED (to DLQ or retry)
end
One message through one route. The adapter's return — not a separate ack call — drives message acknowledgement. Cancellation during process() yields redelivery.

Critical points to internalise:

  • Handler-return ack. There’s no explicit ack() / nack() method on QueueProtocol. The processor returns MessageStatus.ACKNOWLEDGED or FAILED; the queue provider acts on that.
  • Lineage commits independently. Lineage writes happen on a separate connection. A lineage write failure does NOT fail the pipeline; a pipeline failure still records lineage.
  • Pre-register children before publish. Fan-out adapters (publish_batch_immediate) must call record_pending_children() before any publish call. Prevents a completion-detection race where the parent marks done before children register.

An execution is complete when two conditions are simultaneously true:

flowchart LR
A["All routes idle<br/>(in_flight=0, fanout_pending=0)"] --> C{Both<br/>true?}
B["Lineage empty<br/>(pending + initiated + processing = 0)"] --> C
C -->|yes| D[Orchestrator.stop]
C -->|no| A
D --> E[ExecutionService.mark_completed]
E --> F[(PostgreSQL)]
E --> G[SSE event to clients]
Two-condition completion gate. Both must be true at the same polling moment — hence the pending-children pre-registration.

On completion the orchestrator runs a strict shutdown sequence:

  1. Stop subscriptions first. Frees subscription names so a follow-up execution can reuse them without ActiveMQ warnings.
  2. Then DB callback. ExecutionService.mark_completed updates the row.
  3. Then SSE event. Any client on GET /executions/{id}/events receives status=completed.
flowchart LR
A[Op runs CLI] --> B[POST /executions]
B --> C[ExecutionService.create]
C --> D[Orchestrator.start]
D --> E[Processors subscribe]
E --> F[Publish init]
F --> G[Message loop]
G -->|all routes drain| H[Completion detection]
H --> I[Orchestrator.stop]
I --> J[Mark row completed]
J --> K[SSE event]
K --> L[CLI exits 0]

Three kinds of failure, three different flows:

FailureWhat happensWhere to look
Adapter raises non-fatalMessage marked FAILED, routed to DLQ or retried per configLineage
Adapter raises metadata["fatal"]=TrueOrchestrator aborts execution, row → failedlineage failure row + exec.error
Processor cancelled mid-messageMessage redelivered by brokerno lineage row (incomplete)

The should_retry() logic inside the processor treats ValueError, TypeError, KeyError, AttributeError, NotImplementedError as terminal → DLQ, not retry.