Skip to content

factflow-engine

Pipeline orchestration core. Loads YAML configs, instantiates adapters, wires routes through queue providers, and drives execution. The server runs this; users don’t import it directly.

Server-internal. Consumers see factflow-engine through the API (POST /executions, GET /pipelines) and CLI (factflow config run, factflow execution). Adapter authors interact with one piece of this package — the PipelineAdapter protocol — via factflow-protocols.

The engine turns a declarative pipeline YAML into a running graph of adapters that consume and emit messages through queues. Seven concerns:

  1. Config loading + validation — YAML → typed config models with a stable error taxonomy
  2. Adapter discovery and registration — finds adapter classes across installed workflow packages
  3. Orchestration — one PipelineOrchestrator per execution, wires routes and starts processors
  4. Reactive route processingReactiveRouteProcessor owns one route’s lifecycle (subscribe → process → ack)
  5. Backpressure — adaptive concurrency per route
  6. Circuit breaker — per-adapter fail-fast when an external dependency degrades
  7. Stage storage — schema-driven writes to StorageProtocol with sidecar metadata

All symbols listed below are in factflow_engine.__all__.

from factflow_engine import (
OrchestratorManager, # one-process manager — owns multiple executions
PipelineOrchestrator, # one execution end-to-end
ReactiveRouteProcessor, # one route inside an orchestrator
ExecutionScopedQueue, # queue wrapper that prefixes names per execution
TooManyConcurrentExecutionsError,
)
from factflow_engine import (
PipelineConfigLoader,
ConfigurationError,
validate_route_config,
configure_validation, # global toggle: strict (default) vs warn-only
)
from factflow_engine import (
PipelineGlobalConfig, # root
OrchestratorConfig,
ProcessorConfig,
RouteConfig,
AdapterConfig,
ConditionConfig,
InboundConfig,
OutboundConfig,
BackpressureConfig,
ErrorHandlingConfig,
)
from factflow_engine import (
AdapterDiscovery,
auto_discover_and_register,
ValidatingAdapterRegistry,
)
from factflow_engine import (
ConditionOperator, # base class
OperatorRegistry, # built-in operators (eq / neq / gt / lt / in / not_in / contains / matches / exists)
)
from factflow_engine import (
BackpressureController,
CircuitBreaker,
CircuitBreakerConfig,
CircuitOpenError,
CircuitState,
CircuitStats,
AdapterCircuitMetrics,
DomainMetrics,
)
from factflow_engine import (
StageStorageAdapter,
StorageBackendAdapter,
StageStorageResult,
StorageMetadataConfig,
StorageMetadataField,
build_embedding_metadata,
)
from factflow_engine import (
PipelineAdapter,
PipelineContext,
AdapterResult,
)

Re-exported from factflow-protocols for convenience — adapter-authoring examples can import everything they need from factflow_engine without a second line.

  • Runtime: jsonschema, msgpack, async-task-worker
  • Workspace: factflow-protocols, factflow-foundation, factflow-lineage
  • External services: relies on the server to provide a QueueProtocol + StorageProtocol via DI. Engine doesn’t bind to a specific provider.

Tests at backend/packages/factflow-engine/tests/unit/, e2e/, performance/. Uses the validating_registry in tests to ensure every adapter registration is schema-compliant before the orchestrator runs.

See .claude/skills/backend/pipeline-testing/ for harness patterns and .claude/skills/backend/pipeline-config/ for the YAML spec.

  • factflow-execution — per-execution scoping layer built on top
  • factflow-server — mounts OrchestratorManager into the FastAPI app
  • Rule: .claude/rules/async-patterns.md — handler-return ack, cancel safety, fan-out/in
  • Rule: .claude/rules/pipeline-yaml.md — config validator invariants