Skip to content

Message lifecycle

The execution flow diagram shows the big picture. This page zooms in on one message traversing one route with multiple adapters — the most common case.

A route has one inbound queue, N adapters in sequence, and zero-or-more outbound queues:

flowchart LR
Q["inbound<br/>/queue/route.in"] --> P[ReactiveRouteProcessor]
P --> A1[Adapter 1]
A1 --> A2[Adapter 2]
A2 --> A3[Adapter 3]
A3 --> OUT1["outbound<br/>/queue/next-route.in"]
A3 --> OUT2["outbound<br/>/queue/another-route.in"]
P -.reads from.-> Q
P -.writes to.-> OUT1
P -.writes to.-> OUT2
One processor per route. Adapters run in order on every message. A single adapter can emit multiple messages (fan-out).

When a message lands in the processor’s inbound queue:

sequenceDiagram
participant Broker
participant Proc as Processor
participant Sem1 as Global semaphore
participant Sem2 as Route semaphore
participant BP as Backpressure
Broker->>Proc: deliver message
Proc->>Sem1: acquire global slot
Sem1-->>Proc: granted
Proc->>Sem2: acquire route slot
Sem2-->>Proc: granted
Proc->>BP: check high-water mark
BP-->>Proc: OK / pause if saturated
Proc->>Proc: build PipelineContext
Three layers of concurrency gating. Global slot (per-execution), route slot (per-route concurrency), backpressure watermark.

Three layers of concurrency, acquired in fixed order:

LayerScopeDefault
GlobalAll routes in one execution (max_handlers)30
Per-routeOne route (InboundConfig.concurrency)1
Fan-outPer-route publishing (max_fanout_pending)500

Practical in-flight parallelism per route: min(prefetch, concurrency).

Adapters run in the order declared in YAML. Each receives the previous one’s AdapterResult.message as its next PipelineContext.message:

flowchart LR
CTX0[PipelineContext] --> A1[adapter 1 process]
A1 -->|AdapterResult.message| CTX1[PipelineContext]
CTX1 --> A2[adapter 2 process]
A2 -->|AdapterResult.message| CTX2[PipelineContext]
CTX2 --> A3[adapter 3 process]
A3 --> OUT[AdapterResult final]
Each adapter's AdapterResult.message becomes the next adapter's input. Returning None terminates the chain (no outbound publish).

For every adapter invocation:

  1. Lineage row created. status=pending, with hash of input.
  2. await adapter.process(ctx) — adapter does its work.
  3. Lineage row updated. status=completed or failed with exception payload.
  4. Result becomes the input to the next adapter.

If any adapter raises, the chain stops; lineage marks the failing row; the processor returns FAILED.

An adapter can emit multiple outbound messages via AdapterResult.emitted: list[...]:

sequenceDiagram
participant A as Adapter
participant Lin as Lineage
participant Proc as Processor
participant Sem as Fanout semaphore
participant Q as Queue
A->>Lin: record_pending_children(count=N)
Note over Lin: CRITICAL: before publish
A->>Proc: AdapterResult(emitted=[m1, m2, ..., mN])
loop N messages
  Proc->>Sem: acquire fanout slot
  Sem-->>Proc: granted
  Proc->>Q: publish message
  Proc->>Sem: release (synchronous)
end
Proc-->>Broker: return ACKNOWLEDGED (parent done)
Fan-out: lineage child count pre-registered before any publish. The pre-registration is load-bearing — without it, the completion-detection gate races.

Why pre-register?

  • The completion check asks “is pending + initiated + processing == 0?”
  • Without pre-registration, the parent finishes → count drops to zero → completion fires
  • Then the children publish → count goes up again, but the orchestrator already stopped

With pre-registration, the parent’s “done” is contingent on all N children being accounted for.

After all adapters succeed and fan-out publishes complete, the processor acks the original message. There is no explicit ack call. The processor’s handler returns MessageStatus.ACKNOWLEDGED and the queue provider interprets that:

flowchart TB
A[Handler returns<br/>MessageStatus] --> B{Which?}
B -->|ACKNOWLEDGED| C[Broker removes from queue]
B -->|FAILED| D[Broker redelivers or DLQs]
C --> E[Release semaphores]
D --> E
E --> F[Processor ready for next]

Key consequence: if the processor is cancelled mid-chain (server shutdown, timeout), the message is redelivered by the broker. Adapter process() methods must be cancel-safe.

The above assumes a stateless adapter — every message processed independently. For adapters that batch across messages (StatefulAdapter), the shape shifts:

flowchart LR
M1[msg 1] --> A[StatefulAdapter]
M2[msg 2] --> A
M3[msg 3] --> A
A -->|batch not ready| K[continue_pipeline=False<br/>ACK messages, no emit]
A -->|batch ready| E[emit aggregated result]
T[Timeout trigger<br/>every 5s] --> A
StatefulAdapter accumulates across messages; emits only when batch is full or the background checker fires on timeout.

Use case: a batch_embedding_generator that collects 100 segments before one LLM batch call.