Skip to content

factflow-lineage

Message lineage tracking. Records one row per message per adapter invocation, decoupled from the main pipeline flow so a lineage write failure never fails the pipeline and a pipeline failure still records lineage.

  • Tier: shared service (cross-cutting — imported by engine and any adapter that records lineage)
  • Import name: factflow_lineage
  • Source: backend/packages/factflow-lineage/

Engine calls LineageService around every adapter invocation. Adapter authors don’t invoke lineage directly — it’s a concern of the orchestrator.

Lineage answers two questions operators repeatedly need:

  1. What happened to this message? (forward trace — given an input, find every downstream message)
  2. Why did this message fail? (backward trace — given a failed message, find the full parent chain and the failing adapter)

It’s stored in PostgreSQL (via psycopg) and queried via the API’s /lineage/* endpoints.

  • Commits independently. Lineage writes happen out-of-band. A lineage DB hiccup never fails the pipeline; a pipeline crash still records the failure.
  • Per-message, not per-batch. Batched adapters produce one lineage row per input message, with the batch id recorded alongside.
  • Correlation id is load-bearing. Every row carries the parent correlation id. Chain/children queries are index lookups on that column.
from factflow_lineage import (
LineageService, # service-level facade used by the engine
PipelineLineageRepository, # raw repository for direct queries (API endpoints use this)
PipelineLineageEntry, # the row model
StageStatus, # enum: pending / completed / failed / cancelled
)

Four public exports. Three modules (service.py, repository.py, models.py).

LineageService is the canonical entry point for runtime. Most test code uses it too, with a test-scoped repository. API endpoints reach past the service into the repository for read-heavy queries (list / chain / children / failures / stats).

  • Runtime: psycopg[pool]>=3.3.3
  • Workspace: factflow-protocols, factflow-foundation
  • External services: PostgreSQL 18 (pgvector optional — not used by lineage rows)

Tests at backend/packages/factflow-lineage/tests/. Uses Testcontainers for integration tests. The lineage-testing skill captures canonical patterns — see .claude/skills/backend/lineage-testing/.

  • factflow-engine — calls LineageService around every adapter invocation
  • factflow-server — exposes /api/v1/lineage/* endpoints that query the repository
  • Rule: .claude/rules/lineage-conventions.md — the invariants (independent commits, pending-children race fix, failure isolation)