Execution flow
The complete path of control and data when you kick off a pipeline. Every arrow here is a real await point in the code.
Start-up sequence
Section titled “Start-up sequence”sequenceDiagram actor Op as Operator participant CLI as factflow CLI participant SRV as factflow-server participant ES as ExecutionService participant DB as PostgreSQL participant Mgr as OrchestratorManager participant Orch as PipelineOrchestrator participant Proc as ReactiveRouteProcessor participant Q as ExecutionScopedQueue Op->>CLI: factflow config run CFG_ID CLI->>SRV: POST /api/v1/executions SRV->>ES: create(config_id) ES->>DB: INSERT pipeline_execution (status=running, config_snapshot) ES->>Mgr: start(execution) Mgr->>Orch: new PipelineOrchestrator(snapshot) Orch->>Proc: for each route in snapshot.routes Proc->>Q: subscribe(exec:EXEC_ID:route.in) Proc-->>Orch: processor ready Orch->>Orch: wait all processors ready Orch->>Q: publish init_message to first route SRV-->>CLI: 201 execution_id + status=running
Key invariants in this phase:
config_snapshotis frozen. The row in PostgreSQL captures the full YAML at this moment. Subsequent config edits don’t affect this execution.- Processors subscribe before init. Ordering matters: if you publish to a queue nobody is subscribed to yet, the message is dropped (for topics) or buffered temporarily (for queues).
- Manager caps concurrency.
OrchestratorManagerenforcesmax_concurrent_executions. Over-limit raisesTooManyConcurrentExecutionsError.
Message flow phase
Section titled “Message flow phase”Once routes are subscribed and the init message is out, messages flow until every route drains.
sequenceDiagram participant Q as ExecutionScopedQueue participant Proc as ReactiveRouteProcessor participant Adapter as PipelineAdapter participant Store as StorageProtocol participant Lin as LineageService participant Q2 as Next-route queue Q->>Proc: deliver message Proc->>Proc: build PipelineContext (correlation_id, ...) Proc->>Lin: record pending lineage row Proc->>Adapter: await process(ctx) Adapter->>Store: write artefact (optional) Adapter->>Lin: record completion/failure Adapter-->>Proc: AdapterResult or raise alt adapter succeeded Proc->>Q2: publish emitted messages Proc-->>Q: return ACKNOWLEDGED else adapter raised Lin-->>Lin: mark row failed Proc-->>Q: return FAILED (to DLQ or retry) end
Critical points to internalise:
- Handler-return ack. There’s no explicit
ack()/nack()method onQueueProtocol. The processor returnsMessageStatus.ACKNOWLEDGEDorFAILED; the queue provider acts on that. - Lineage commits independently. Lineage writes happen on a separate connection. A lineage write failure does NOT fail the pipeline; a pipeline failure still records lineage.
- Pre-register children before publish. Fan-out adapters (
publish_batch_immediate) must callrecord_pending_children()before any publish call. Prevents a completion-detection race where the parent marks done before children register.
Completion detection
Section titled “Completion detection”An execution is complete when two conditions are simultaneously true:
flowchart LR
A["All routes idle<br/>(in_flight=0, fanout_pending=0)"] --> C{Both<br/>true?}
B["Lineage empty<br/>(pending + initiated + processing = 0)"] --> C
C -->|yes| D[Orchestrator.stop]
C -->|no| A
D --> E[ExecutionService.mark_completed]
E --> F[(PostgreSQL)]
E --> G[SSE event to clients]
On completion the orchestrator runs a strict shutdown sequence:
- Stop subscriptions first. Frees subscription names so a follow-up execution can reuse them without ActiveMQ warnings.
- Then DB callback.
ExecutionService.mark_completedupdates the row. - Then SSE event. Any client on
GET /executions/{id}/eventsreceivesstatus=completed.
The full happy path
Section titled “The full happy path”flowchart LR A[Op runs CLI] --> B[POST /executions] B --> C[ExecutionService.create] C --> D[Orchestrator.start] D --> E[Processors subscribe] E --> F[Publish init] F --> G[Message loop] G -->|all routes drain| H[Completion detection] H --> I[Orchestrator.stop] I --> J[Mark row completed] J --> K[SSE event] K --> L[CLI exits 0]
Failure paths
Section titled “Failure paths”Three kinds of failure, three different flows:
| Failure | What happens | Where to look |
|---|---|---|
| Adapter raises non-fatal | Message marked FAILED, routed to DLQ or retried per config | Lineage |
Adapter raises metadata["fatal"]=True | Orchestrator aborts execution, row → failed | lineage failure row + exec.error |
| Processor cancelled mid-message | Message redelivered by broker | no lineage row (incomplete) |
The should_retry() logic inside the processor treats ValueError, TypeError, KeyError, AttributeError, NotImplementedError as terminal → DLQ, not retry.
Related
Section titled “Related”- Message lifecycle — one message in even finer detail
- Multi-execution topology — how many of these run concurrently
- Guides / Orchestrator Engine — operator-facing controls
- Guides / Lineage / Debugging — forensic reading of a failed run