Queue payloads
Messages on queues are JSON blobs encoded via msgpack (for compact binary transport) wrapping a common envelope plus adapter-specific payload.
Common envelope
Section titled “Common envelope”Every message on every queue has these fields:
{ "message_id": "msg-uuid", "correlation_id": "lineage-parent-uuid", "execution_id": "exec-uuid", "route_id": "the_route_name", "stage": "last_adapter_that_emitted", "timestamp": "2026-04-22T18:30:00.123456+00:00", "payload": { ... }, // adapter-specific "metadata": { ... } // adapter/pipeline-specific extras}| Field | Purpose |
|---|---|
message_id | Unique id for this message. Used as lineage row id. |
correlation_id | The parent lineage id — lets lineage chain queries walk upstream. |
execution_id | Scopes to one execution. Used by ExecutionScopedQueue for name prefixing. |
route_id | Which route this message belongs to. |
stage | Name of the last adapter that emitted this message. |
timestamp | ISO-8601 UTC with microseconds. |
payload | The actual data — shape depends on emitting adapter. |
metadata | Freeform dict for pipeline-specific context (origin, config hash, user tags). |
Payload shapes per adapter
Section titled “Payload shapes per adapter”Each adapter’s PipelineContext.message is the payload from the prior queue message. Each adapter emits a new payload shape for its successor:
Webscraper
Section titled “Webscraper”sitemap_parseremits:{"urls": ["...", "..."]}url_expanderemits:{"url": "..."}(one message per URL)web_scraperemits:{"url": "...", "html": "...", "status": 200, "headers": {...}}web_content_storageemits:{"storage_key": "...", "url": "...", "content_length": N}
Markdown
Section titled “Markdown”storage_retrieveremits:{"html": "...", "source_key": "...", "source_url": "..."}html_to_markdownemits:{"markdown": "...", "front_matter": {...}}smart_segmenteremits:{"segments": [{"text": "...", "token_count": N, "heading_path": [...]}, ...]}segment_publisheremits: one message per segment, each with{"segment": {...}, "segment_index": i, "total": N}markdown_storage_writeremits:{"storage_key": "...", "segment_count": N}
Embeddings
Section titled “Embeddings”embedding_generatoremits:{"segment_id": "...", "model": "text-embedding-3-small", "vector_storage_key": "..."}
Batch messages
Section titled “Batch messages”Batching adapters use a BatchMessage wrapper:
{ "batch_id": "batch-uuid", "messages": [ {envelope + payload}, {envelope + payload}, ... ]}The adapter processes the batch, emits either one message per input (with matching batch_id) or one aggregated message.
Subscription semantics
Section titled “Subscription semantics”- Point-to-point queues — each message consumed by exactly one processor (work distribution)
- Topics — each message consumed by every subscriber (broadcast; used for cross-execution signals)
See the queue isolation concept for the point-to-point vs topic distinction.
Serialisation
Section titled “Serialisation”- Wire format:
msgpackbinary - Bodies < 16KB inline; larger bodies get a storage reference (
{"__ref__": "storage-key"}) and are rehydrated on consumption — keeps broker memory bounded
Diagnostic
Section titled “Diagnostic”POST /api/v1/system/mq-diagnostic round-trips a synthetic envelope against every configured queue to verify the pipeline. Use when a message seems stuck.
Related
Section titled “Related”- Concept: Queue isolation
- Queue diagnostics guide
- factflow-protocols reference —
QueueMessage,BatchMessage