Orchestration
The orchestrator is the runtime that reads a pipeline config and drives adapters to completion. It lives in factflow-engine; executions are owned by factflow-execution; the whole thing runs inside factflow-server.
The three layers
Section titled “The three layers”OrchestratorManager (one per server process — owns every running execution) └── PipelineOrchestrator (one per running execution — owns the routes) └── ReactiveRouteProcessor (one per route — owns the queue subscription) └── PipelineAdapter (many per processor — invoked in sequence per message)OrchestratorManager
Section titled “OrchestratorManager”Single-process. Caps concurrent executions (TooManyConcurrentExecutionsError raised when over-limit). Holds references to PipelineOrchestrator instances keyed by execution id. Shutting down the manager gracefully stops every orchestrator.
PipelineOrchestrator
Section titled “PipelineOrchestrator”One per execution. Constructs ReactiveRouteProcessor for each route in the config. Wires them together via shared queue providers (wrapped in ExecutionScopedQueue — see Queue isolation). Publishes the init message. Signals completion when every route drains.
ReactiveRouteProcessor
Section titled “ReactiveRouteProcessor”One per route. Subscribes to the route’s inbound queue. For each message: invokes every adapter in adapters: in order, propagates results, acks after the chain returns. Respects the route’s concurrency (max in-flight messages) and prefetch (broker-side buffer) settings. Handles backpressure via BackpressureController.
Message lifecycle
Section titled “Message lifecycle”One message, one route, end-to-end:
- Broker delivers message → processor receives
- Processor constructs a
PipelineContext(correlation id, lineage parent, storage context) - For each adapter in the route:
- Lineage row recorded with status
pending await adapter.process(context)is invoked- Returned
AdapterResultbecomes the nextcontext.message;Noneterminates the chain - Lineage row updated to
completed(orfailedwith exception payload)
- Lineage row recorded with status
- Results (
emittedfield on the finalAdapterResult) are published to the outbound queues - Message ack sent to broker
Handler-return ack
Section titled “Handler-return ack”The ack happens only after the whole adapter chain returns. Never fire-and-forget. If the processor is cancelled mid-chain, the message is redelivered.
Cancel safety
Section titled “Cancel safety”asyncio.CancelledError is expected during shutdown. Adapters must be written so that receiving cancellation mid-process() doesn’t leave partial state. See the adapter-patterns skill.
Completion detection
Section titled “Completion detection”An execution is complete when every route has drained — no in-flight messages, no queued messages, no scheduled retries. The orchestrator watches drain signals from each processor and resolves the ExecutionWaiter when all routes have quiesced.
Reactive, not polling. Completion fires within milliseconds of the last message acking.
Circuit breaker + backpressure
Section titled “Circuit breaker + backpressure”- Circuit breaker — per-adapter. Repeated failures open the breaker; subsequent messages short-circuit (
CircuitOpenError) until the breaker half-opens. Prevents one flaky adapter from grinding through a queue only to fail everything. - Backpressure — per-route. Adaptive concurrency based on in-flight count and recent latency. Prevents a fast upstream from overwhelming a slow downstream.
Both are configured in the route’s YAML (backpressure: and error_handling: blocks). Defaults are sane.
Discovery and registration
Section titled “Discovery and registration”On server startup:
- Every installed
factflow-*package is scanned for classes inheritingPipelineAdapter - Each class registers itself against a
type:name via@register_adapter(name)decorator or by class-attribute convention - The
ValidatingAdapterRegistryverifies that every adapter’s declared config matches its Pydantic config class - Config-load time lookups resolve
type:strings to adapter classes
If a config references an unknown type:, validation fails with error code E103.
Multi-execution concurrency
Section titled “Multi-execution concurrency”Multiple pipelines run concurrently in one server process:
- Each execution owns one
PipelineOrchestrator - Each orchestrator has its own route processors
- All processors share the single queue broker connection but use
ExecutionScopedQueuefor name isolation - Cross-execution interference is impossible at the queue layer; resource contention (CPU, memory, LLM budget) is managed by OS scheduling + each adapter’s rate limiting
Why reactive, not scheduled?
Section titled “Why reactive, not scheduled?”Traditional workflow engines poll: “which tasks are ready? run them.” Factflow doesn’t poll. The queue broker pushes messages to subscribed processors; processors consume-and-ack. Less latency, no central bottleneck, natural backpressure.
Related
Section titled “Related”- Pipelines — what the orchestrator reads
- Queue isolation — how multi-execution works
- Lineage — the independent record of what the orchestrator did
- factflow-engine reference
- factflow-execution reference