factflow-replay
Storage replay and recovery — re-publishes stored artefacts back into pipeline queues so a downstream stage can be re-run without re-fetching upstream source data.
Tier and role
Section titled “Tier and role”- Tier: workflow
- Import name:
factflow_replay - Source:
backend/packages/workflows/factflow-replay/
Two flavours: same-pipeline replay (rerun a stage within an existing execution, via an init message) and cross-pipeline replay (full coordinator, detached, via POST /api/v1/executions/{id}/replay with from_stage + to_route).
Context
Section titled “Context”Storage is the replay source of truth. Replay reads objects under executions/<source-exec>/<route>/<stage>/* and publishes them to the target route’s queue. If an artefact is missing, replay fails — storage is not a cache.
Two subpackages:
replay
Section titled “replay”Cross-pipeline detached replay.
detached_replay_service.py— orchestrates a full new execution that reads from a parent execution’s storagereplayer.py— the publisher that walks storage and emits to queuesroute_resolver.py— resolvesroute_id → queue_namefrom the parent execution’s config snapshot, not the current global directory (this is the fix that prevents collisions when two configs define the same route name with different queues)models.py— replay request / response types
recovery
Section titled “recovery”Same-pipeline resumption after a mid-execution failure.
manager.py— inspects lineage for the last successful stage and publishes an init message to restart from there
Rationale
Section titled “Rationale”- Snapshot resolution. The
config_snapshotinvariant is load-bearing: replay route → queue from the snapshot keeps behaviour stable even when the global config changes. - Publisher is a managed task. Under
OrchestratorManager, the replay publisher is an asyncio task the manager owns; cancelling the execution cancels the publisher cleanly.
Public API
Section titled “Public API”Top-level factflow_replay/__init__.py uses pkgutil.extend_path for dev-mode fixtures but exports nothing. Consumers:
- Call the API:
POST /api/v1/replayorPOST /api/v1/executions/{id}/replay - Use the CLI:
factflow execution replay <id> --from-stage <stage> --to-route <route> - For direct programmatic use:
from factflow_replay.replay.detached_replay_service import DetachedReplayService
Dependencies
Section titled “Dependencies”- Workspace:
factflow-protocols,factflow-foundation,factflow-engine,factflow-execution,factflow-lineage - External services: storage provider + queue broker (the same ones the parent execution used)
Testing
Section titled “Testing”Tests at backend/packages/workflows/factflow-replay/tests/. The test-cli harness includes replay scenarios across multiple pipelines (s1 → s2 sequence, s4 for cross-pipeline).
Related
Section titled “Related”factflow-execution— records replay provenance (ReplaySourcefield)factflow-server—/api/v1/replayand/api/v1/executions/{id}/replayendpointsfactflow-engine— config snapshot invariant is defined here