Skip to content

factflow-replay

Storage replay and recovery — re-publishes stored artefacts back into pipeline queues so a downstream stage can be re-run without re-fetching upstream source data.

Two flavours: same-pipeline replay (rerun a stage within an existing execution, via an init message) and cross-pipeline replay (full coordinator, detached, via POST /api/v1/executions/{id}/replay with from_stage + to_route).

Storage is the replay source of truth. Replay reads objects under executions/<source-exec>/<route>/<stage>/* and publishes them to the target route’s queue. If an artefact is missing, replay fails — storage is not a cache.

Two subpackages:

Cross-pipeline detached replay.

  • detached_replay_service.py — orchestrates a full new execution that reads from a parent execution’s storage
  • replayer.py — the publisher that walks storage and emits to queues
  • route_resolver.py — resolves route_id → queue_name from the parent execution’s config snapshot, not the current global directory (this is the fix that prevents collisions when two configs define the same route name with different queues)
  • models.py — replay request / response types

Same-pipeline resumption after a mid-execution failure.

  • manager.py — inspects lineage for the last successful stage and publishes an init message to restart from there
  • Snapshot resolution. The config_snapshot invariant is load-bearing: replay route → queue from the snapshot keeps behaviour stable even when the global config changes.
  • Publisher is a managed task. Under OrchestratorManager, the replay publisher is an asyncio task the manager owns; cancelling the execution cancels the publisher cleanly.

Top-level factflow_replay/__init__.py uses pkgutil.extend_path for dev-mode fixtures but exports nothing. Consumers:

  • Call the API: POST /api/v1/replay or POST /api/v1/executions/{id}/replay
  • Use the CLI: factflow execution replay <id> --from-stage <stage> --to-route <route>
  • For direct programmatic use: from factflow_replay.replay.detached_replay_service import DetachedReplayService
  • Workspace: factflow-protocols, factflow-foundation, factflow-engine, factflow-execution, factflow-lineage
  • External services: storage provider + queue broker (the same ones the parent execution used)

Tests at backend/packages/workflows/factflow-replay/tests/. The test-cli harness includes replay scenarios across multiple pipelines (s1s2 sequence, s4 for cross-pipeline).