Skip to content

Pipelines

A pipeline is a YAML file that describes how messages flow through a sequence of adapter chains. One pipeline maps to one stored config. One config runs zero or more executions.

version: "1.0"
routes:
my_route:
name: "Human-readable name"
description: "What this route does"
inbound:
queue: "/queue/my_route.in"
subscription: "my-route-processors"
concurrency: 5
prefetch: 10
adapters:
- type: "some_adapter"
config:
some_field: 42
- type: "another_adapter"
init_message:
route: "my_route"
payload:
seed: "hello"

Three top-level concepts: routes, adapters, init message.

A route is one named “lane” in the pipeline. Each route owns:

  • An inbound queue (where it reads messages from)
  • A subscription name (so multiple processors share the load)
  • A concurrency + prefetch tuning
  • An ordered list of adapters to run on every message

Routes don’t know about each other directly. They communicate through queues: one adapter in route A emits a message, another route’s processor subscribes to that queue and picks it up.

An adapter is one unit of work — take one message, do something, emit zero or more messages. Each adapter is a Python class implementing PipelineAdapter from factflow-protocols.

Adapter declaration in YAML:

- type: "html_to_markdown"
config:
preserve_images: true
max_tokens: 4096
  • type: is the registered name — matched against the adapter registry at startup
  • config: is the adapter’s Pydantic config model — validated at config-load time, not at first message

See the adapter catalog for every shipped adapter and its config shape.

Every execution needs a starting message. The init_message: block defines what gets published to which route when the execution starts.

init_message:
route: "sitemap_scraper"
payload:
sitemap_url: "https://example.com/sitemap.xml"

Without an init message, the execution would start but no work would ever begin.

Adapters can be gated by conditions — run only if the incoming message matches:

adapters:
- type: "web_scraper"
condition:
field: "url"
operator: "matches"
value: "^https://.*\\.dnb\\.no/.*"

Supported operators: eq, neq, gt, lt, gte, lte, in, not_in, contains, matches, exists.

When an execution starts, the full YAML config is frozen into config_snapshot in the execution row. Subsequent edits to the config don’t affect in-flight executions.

Replay honours the snapshot. A replay of execution X resolves route → queue from X’s snapshot, not from the current global directory. This prevents a class of subtle bugs where two configs define the same route name with different queues.

Every config is validated:

  • On factflow config create — the server rejects invalid YAML with a stable error taxonomy (E001–E501, W501–W503).
  • Pre-flight with the CLIfactflow config validate <file> runs the same validator without touching the server.
  • On server startup — every shipped YAML under backend/config/pipelines/ is validated; by default (FACTFLOW_CONFIG_VALIDATOR_STRICT=1) startup aborts if any YAML fails.

See the pipeline YAML reference for the full schema and error codes.

Two reasons, both practical:

  • Operators iterate without deploy. Edit YAML, factflow config edit, factflow config run. No Python redeploys.
  • Replay, audit, diff. A config snapshot is a complete description of what ran. Comparing snapshots reveals what changed between executions.