factflow-engine
Pipeline orchestration core. Loads YAML configs, instantiates adapters, wires routes through queue providers, and drives execution. The server runs this; users don’t import it directly.
Tier and role
Section titled “Tier and role”- Tier: shared service (complex; reads from protocols + foundation + lineage)
- Import name:
factflow_engine - Source:
backend/packages/factflow-engine/
Server-internal. Consumers see factflow-engine through the API (POST /executions, GET /pipelines) and CLI (factflow config run, factflow execution). Adapter authors interact with one piece of this package — the PipelineAdapter protocol — via factflow-protocols.
Context
Section titled “Context”The engine turns a declarative pipeline YAML into a running graph of adapters that consume and emit messages through queues. Seven concerns:
- Config loading + validation — YAML → typed config models with a stable error taxonomy
- Adapter discovery and registration — finds adapter classes across installed workflow packages
- Orchestration — one
PipelineOrchestratorper execution, wires routes and starts processors - Reactive route processing —
ReactiveRouteProcessorowns one route’s lifecycle (subscribe → process → ack) - Backpressure — adaptive concurrency per route
- Circuit breaker — per-adapter fail-fast when an external dependency degrades
- Stage storage — schema-driven writes to
StorageProtocolwith sidecar metadata
Public API
Section titled “Public API”All symbols listed below are in factflow_engine.__all__.
Orchestration + lifecycle
Section titled “Orchestration + lifecycle”from factflow_engine import ( OrchestratorManager, # one-process manager — owns multiple executions PipelineOrchestrator, # one execution end-to-end ReactiveRouteProcessor, # one route inside an orchestrator ExecutionScopedQueue, # queue wrapper that prefixes names per execution TooManyConcurrentExecutionsError,)Config loading + validation
Section titled “Config loading + validation”from factflow_engine import ( PipelineConfigLoader, ConfigurationError, validate_route_config, configure_validation, # global toggle: strict (default) vs warn-only)Config models
Section titled “Config models”from factflow_engine import ( PipelineGlobalConfig, # root OrchestratorConfig, ProcessorConfig, RouteConfig, AdapterConfig, ConditionConfig, InboundConfig, OutboundConfig, BackpressureConfig, ErrorHandlingConfig,)Discovery + registry
Section titled “Discovery + registry”from factflow_engine import ( AdapterDiscovery, auto_discover_and_register, ValidatingAdapterRegistry,)Conditions
Section titled “Conditions”from factflow_engine import ( ConditionOperator, # base class OperatorRegistry, # built-in operators (eq / neq / gt / lt / in / not_in / contains / matches / exists))Backpressure + circuit breaker
Section titled “Backpressure + circuit breaker”from factflow_engine import ( BackpressureController, CircuitBreaker, CircuitBreakerConfig, CircuitOpenError, CircuitState, CircuitStats, AdapterCircuitMetrics, DomainMetrics,)Stage storage + metadata
Section titled “Stage storage + metadata”from factflow_engine import ( StageStorageAdapter, StorageBackendAdapter, StageStorageResult, StorageMetadataConfig, StorageMetadataField, build_embedding_metadata,)Protocol re-exports
Section titled “Protocol re-exports”from factflow_engine import ( PipelineAdapter, PipelineContext, AdapterResult,)Re-exported from factflow-protocols for convenience — adapter-authoring examples can import everything they need from factflow_engine without a second line.
Dependencies
Section titled “Dependencies”- Runtime:
jsonschema,msgpack,async-task-worker - Workspace:
factflow-protocols,factflow-foundation,factflow-lineage - External services: relies on the server to provide a
QueueProtocol+StorageProtocolvia DI. Engine doesn’t bind to a specific provider.
Testing
Section titled “Testing”Tests at backend/packages/factflow-engine/tests/ — unit/, e2e/, performance/. Uses the validating_registry in tests to ensure every adapter registration is schema-compliant before the orchestrator runs.
See .claude/skills/backend/pipeline-testing/ for harness patterns and .claude/skills/backend/pipeline-config/ for the YAML spec.
Related
Section titled “Related”factflow-execution— per-execution scoping layer built on topfactflow-server— mountsOrchestratorManagerinto the FastAPI app- Rule:
.claude/rules/async-patterns.md— handler-return ack, cancel safety, fan-out/in - Rule:
.claude/rules/pipeline-yaml.md— config validator invariants