Skip to content

factflow-execution

Execution lifecycle — the per-run entity that binds stored config, lineage, storage, and status together into one observable unit.

Server-internal. The API endpoints under /api/v1/executions/ and /api/v1/configs/ map almost 1:1 onto ExecutionService methods. CLI commands (factflow execution …, factflow config run) call those endpoints.

An execution is a row plus a coordinated lifecycle. The row captures:

  • The config that ran (snapshotted — config_snapshot is immutable for that execution)
  • The status (running / completed / failed / cancelled / interrupted)
  • Timestamps, source (how the execution was started), replay source (if any)

The coordinated lifecycle covers:

  • Creation (from a config or replay request)
  • Route ownership — the OrchestratorManager owns PipelineOrchestrator instances keyed by execution id
  • Completion detection — ExecutionWaiter reactively resolves when all routes drain
  • Live streaming — ExecutionEventStreamer + StorageEventStreamer push updates via SSE to the frontend / CLI
  • Stats — derived from lineage data, materialised on demand for list views
  • Config snapshot. Subsequent edits to the stored config don’t affect in-flight executions. Replay resolves route → queue from the parent execution’s snapshot, not from the current global directory.
  • Single-process OrchestratorManager. One Python process owns all running executions. Scaling is horizontal at the request level, not at the orchestrator level.
  • Reactive waiter. No polling. Processor / adapter completion events fire callbacks that resolve waiters; waiters expose an async API for the CLI and API to block until done.
from factflow_execution import (
PipelineConfig, # stored config row (versioned)
PipelineExecution, # running / completed execution row
ConfigSourceType, # enum: cli / api / replay / ...
ReplaySource, # where a replay execution came from
ExecutionStatus, # type alias
ExecutionStats, # derived from lineage
)
from factflow_execution import (
DAGNode,
DAGEdge,
DAGResponse, # returned by GET /executions/{id}/dag
)
from factflow_execution import (
ExecutionService, # high-level operations (create / start / cancel / list / replay)
PipelineExecutionRepository, # per-row CRUD
PipelineConfigRepository, # stored-config CRUD
)

Service methods are the canonical entry point; the repositories are used by read-heavy API endpoints and by tests.

from factflow_execution import ExecutionWaiter, ExecutionWaitResult
waiter = ExecutionWaiter(execution_id, ...)
result = await waiter.wait(timeout=timedelta(minutes=10))

ExecutionWaitResult carries the terminal status plus stats.

from factflow_execution import (
ExecutionEventStreamer, # per-execution status / route / lineage events
StorageEventStreamer, # per-execution storage write events
)

Both yield async iterators of structured events. The server adapts them into SSE responses; the CLI’s factflow storage watch reads StorageEventStreamer.

from factflow_execution import ExecutionStatusCallbackAdapter

Hooks PipelineOrchestrator completion signals back to the ExecutionService so the row transitions to completed / failed atomically with orchestrator stop.

  • Workspace: factflow-engine, factflow-infra, factflow-lineage, factflow-protocols, factflow-foundation
  • External services: PostgreSQL (execution + config rows)

Tests at backend/packages/factflow-execution/tests/. Uses Testcontainers for the database, mock orchestrator for service-level tests. See .claude/skills/backend/pipeline-testing/ for patterns that exercise the full execution flow.