Skip to content

Queue payloads

Messages on queues are JSON blobs encoded via msgpack (for compact binary transport) wrapping a common envelope plus adapter-specific payload.

Every message on every queue has these fields:

{
"message_id": "msg-uuid",
"correlation_id": "lineage-parent-uuid",
"execution_id": "exec-uuid",
"route_id": "the_route_name",
"stage": "last_adapter_that_emitted",
"timestamp": "2026-04-22T18:30:00.123456+00:00",
"payload": { ... }, // adapter-specific
"metadata": { ... } // adapter/pipeline-specific extras
}
FieldPurpose
message_idUnique id for this message. Used as lineage row id.
correlation_idThe parent lineage id — lets lineage chain queries walk upstream.
execution_idScopes to one execution. Used by ExecutionScopedQueue for name prefixing.
route_idWhich route this message belongs to.
stageName of the last adapter that emitted this message.
timestampISO-8601 UTC with microseconds.
payloadThe actual data — shape depends on emitting adapter.
metadataFreeform dict for pipeline-specific context (origin, config hash, user tags).

Each adapter’s PipelineContext.message is the payload from the prior queue message. Each adapter emits a new payload shape for its successor:

  • sitemap_parser emits: {"urls": ["...", "..."]}
  • url_expander emits: {"url": "..."} (one message per URL)
  • web_scraper emits: {"url": "...", "html": "...", "status": 200, "headers": {...}}
  • web_content_storage emits: {"storage_key": "...", "url": "...", "content_length": N}
  • storage_retriever emits: {"html": "...", "source_key": "...", "source_url": "..."}
  • html_to_markdown emits: {"markdown": "...", "front_matter": {...}}
  • smart_segmenter emits: {"segments": [{"text": "...", "token_count": N, "heading_path": [...]}, ...]}
  • segment_publisher emits: one message per segment, each with {"segment": {...}, "segment_index": i, "total": N}
  • markdown_storage_writer emits: {"storage_key": "...", "segment_count": N}
  • embedding_generator emits: {"segment_id": "...", "model": "text-embedding-3-small", "vector_storage_key": "..."}

Batching adapters use a BatchMessage wrapper:

{
"batch_id": "batch-uuid",
"messages": [
{envelope + payload},
{envelope + payload},
...
]
}

The adapter processes the batch, emits either one message per input (with matching batch_id) or one aggregated message.

  • Point-to-point queues — each message consumed by exactly one processor (work distribution)
  • Topics — each message consumed by every subscriber (broadcast; used for cross-execution signals)

See the queue isolation concept for the point-to-point vs topic distinction.

  • Wire format: msgpack binary
  • Bodies < 16KB inline; larger bodies get a storage reference ({"__ref__": "storage-key"}) and are rehydrated on consumption — keeps broker memory bounded

POST /api/v1/system/mq-diagnostic round-trips a synthetic envelope against every configured queue to verify the pipeline. Use when a message seems stuck.