Skip to content

Pipeline YAML

A pipeline config is a YAML file. Schema below is validated at create-time (factflow config create), pre-flight (factflow config validate), and server startup (strict by default).

version: "1.0"
routes:
<route_id>:
...
init_message:
route: <route_id>
payload: {...}
error_handling: # optional, inherits to all routes
...
backpressure: # optional, inherits to all routes
...

One version: — today "1.0". Future major changes go behind version gates.

<route_id>:
name: "Human-readable name"
description: "What this route does"
inbound:
queue: "/queue/route.inbound" # broker-specific destination
subscription: "processors-name" # shared subscription across processors
concurrency: 5 # max in-flight messages per processor
prefetch: 10 # broker-side buffer
adapters:
- type: "adapter_name"
config:
...adapter-specific fields...
condition: # optional; gate on message fields
field: "some_field"
operator: "matches"
value: "^https://"
error_handling: # optional override
max_retries: 3
retry_delay_seconds: 10
dead_letter_queue: "/queue/dlq"
backpressure: # optional override
target_latency_ms: 500
max_concurrency: 20

Provider-specific:

  • Artemis (STOMP) — /queue/... or /topic/...
  • RabbitMQ (AMQP) — simple names like route.inbound
  • Pulsar — persistent://tenant/namespace/topic

The server’s NamingStrategyRegistry picks up the configured provider and validates names against its rules. Invalid names fail at validate-time, not at first message.

Every adapter in the adapters: list runs on every message the route consumes, in order. Each adapter’s type: must be registered in the adapter registry; unknown types fail with E103.

config: is validated against the adapter’s Pydantic config class. Unknown fields fail with E301; missing required fields fail with E302.

See the adapter catalog for every shipped adapter’s config shape.

Skip an adapter if the incoming message doesn’t match:

- type: "web_scraper"
condition:
field: "url"
operator: "matches"
value: "^https://.*\\.dnb\\.no/.*"

From factflow_engine.condition_operators.OperatorRegistry:

OperatorPurpose
eq / equalsEquality
neq / not_equalsInequality
inValue is in a list
not_inValue is not in a list
containsString / list contains substring / item
starts_withString starts with prefix
ends_withString ends with suffix
gt / greater_thanNumeric greater than
lt / less_thanNumeric less than
matchesRegex match
existsField is present (non-null)

Custom operators register via OperatorRegistry.register_function(name, fn).

init_message:
route: "sitemap_scraper"
payload:
sitemap_url: "https://example.com/sitemap.xml"
# any JSON-serialisable fields your first adapter expects

One init message per execution. Published to the named route’s inbound queue when the execution starts. Required — without it no work ever happens.

Stable taxonomy produced by the validator. Format: <severity><numeric>; E = error (fails validation), W = warning (validation succeeds with log output).

  • E001 — YAML parsing error
  • E002 — Unknown top-level key
  • E003 — Required top-level key missing
  • E004 — Type mismatch
  • E501 — Version unsupported
  • E101 — Unknown route field
  • E102 — Route id invalid (naming rules)
  • E103 — Unknown adapter type
  • E104 — Adapter list empty
  • E105 — Duplicate route id
  • E106 — Duplicate queue across routes
  • E107E109 — Inbound queue / subscription / concurrency errors
  • E110E111 — Adapter position errors
  • E201E209 — Condition errors (unknown operator, bad value, missing field)
  • E301 — Unknown config field for this adapter type
  • E302 — Required config field missing
  • E303E399 — Adapter-specific validation
  • E401 — Missing init_message
  • E402 — Route referenced by init message doesn’t exist
  • E403 — Payload fails the first adapter’s input validation
  • W501 — Deprecated field
  • W502 — Suboptimal concurrency setting
  • W503 — Queue-name prefix convention not followed

When an execution starts, the full YAML config is frozen into config_snapshot in the execution row. Subsequent edits to the stored config don’t affect in-flight executions. Replay resolves route → queue from the parent execution’s snapshot.