Message lifecycle
The execution flow diagram shows the big picture. This page zooms in on one message traversing one route with multiple adapters — the most common case.
Anatomy of a route
Section titled “Anatomy of a route”A route has one inbound queue, N adapters in sequence, and zero-or-more outbound queues:
flowchart LR Q["inbound<br/>/queue/route.in"] --> P[ReactiveRouteProcessor] P --> A1[Adapter 1] A1 --> A2[Adapter 2] A2 --> A3[Adapter 3] A3 --> OUT1["outbound<br/>/queue/next-route.in"] A3 --> OUT2["outbound<br/>/queue/another-route.in"] P -.reads from.-> Q P -.writes to.-> OUT1 P -.writes to.-> OUT2
Inbound phase
Section titled “Inbound phase”When a message lands in the processor’s inbound queue:
sequenceDiagram participant Broker participant Proc as Processor participant Sem1 as Global semaphore participant Sem2 as Route semaphore participant BP as Backpressure Broker->>Proc: deliver message Proc->>Sem1: acquire global slot Sem1-->>Proc: granted Proc->>Sem2: acquire route slot Sem2-->>Proc: granted Proc->>BP: check high-water mark BP-->>Proc: OK / pause if saturated Proc->>Proc: build PipelineContext
Three layers of concurrency, acquired in fixed order:
| Layer | Scope | Default |
|---|---|---|
| Global | All routes in one execution (max_handlers) | 30 |
| Per-route | One route (InboundConfig.concurrency) | 1 |
| Fan-out | Per-route publishing (max_fanout_pending) | 500 |
Practical in-flight parallelism per route: min(prefetch, concurrency).
Adapter phase
Section titled “Adapter phase”Adapters run in the order declared in YAML. Each receives the previous one’s AdapterResult.message as its next PipelineContext.message:
flowchart LR CTX0[PipelineContext] --> A1[adapter 1 process] A1 -->|AdapterResult.message| CTX1[PipelineContext] CTX1 --> A2[adapter 2 process] A2 -->|AdapterResult.message| CTX2[PipelineContext] CTX2 --> A3[adapter 3 process] A3 --> OUT[AdapterResult final]
For every adapter invocation:
- Lineage row created.
status=pending, with hash of input. await adapter.process(ctx)— adapter does its work.- Lineage row updated.
status=completedorfailedwith exception payload. - Result becomes the input to the next adapter.
If any adapter raises, the chain stops; lineage marks the failing row; the processor returns FAILED.
Fan-out
Section titled “Fan-out”An adapter can emit multiple outbound messages via AdapterResult.emitted: list[...]:
sequenceDiagram participant A as Adapter participant Lin as Lineage participant Proc as Processor participant Sem as Fanout semaphore participant Q as Queue A->>Lin: record_pending_children(count=N) Note over Lin: CRITICAL: before publish A->>Proc: AdapterResult(emitted=[m1, m2, ..., mN]) loop N messages Proc->>Sem: acquire fanout slot Sem-->>Proc: granted Proc->>Q: publish message Proc->>Sem: release (synchronous) end Proc-->>Broker: return ACKNOWLEDGED (parent done)
Why pre-register?
- The completion check asks “is
pending + initiated + processing == 0?” - Without pre-registration, the parent finishes → count drops to zero → completion fires
- Then the children publish → count goes up again, but the orchestrator already stopped
With pre-registration, the parent’s “done” is contingent on all N children being accounted for.
Outbound phase
Section titled “Outbound phase”After all adapters succeed and fan-out publishes complete, the processor acks the original message. There is no explicit ack call. The processor’s handler returns MessageStatus.ACKNOWLEDGED and the queue provider interprets that:
flowchart TB
A[Handler returns<br/>MessageStatus] --> B{Which?}
B -->|ACKNOWLEDGED| C[Broker removes from queue]
B -->|FAILED| D[Broker redelivers or DLQs]
C --> E[Release semaphores]
D --> E
E --> F[Processor ready for next]
Key consequence: if the processor is cancelled mid-chain (server shutdown, timeout), the message is redelivered by the broker. Adapter process() methods must be cancel-safe.
Stateful adapters (fan-in)
Section titled “Stateful adapters (fan-in)”The above assumes a stateless adapter — every message processed independently. For adapters that batch across messages (StatefulAdapter), the shape shifts:
flowchart LR M1[msg 1] --> A[StatefulAdapter] M2[msg 2] --> A M3[msg 3] --> A A -->|batch not ready| K[continue_pipeline=False<br/>ACK messages, no emit] A -->|batch ready| E[emit aggregated result] T[Timeout trigger<br/>every 5s] --> A
Use case: a batch_embedding_generator that collects 100 segments before one LLM batch call.
Cross-references
Section titled “Cross-references”- Execution flow — the big picture around this page
- Queue protocol — the scoped queue mechanics behind every
publish - Lineage — what those rows look like and how to query them
- Workflow adapters — how to write your own