Skip to content

Multi-execution topology

One server process runs many executions concurrently. Each execution uses the same route names from the same pipeline configs. Without isolation, execution A’s messages would land in execution B’s processors. ExecutionScopedQueue prevents that.

Two executions of the same pipeline. Naive queue naming:

flowchart LR
subgraph Ex1[Execution A]
  P1A[Processor A-web] --> Q1[/queue/web_scraper.in/]
  Q1 --> P2A[Processor A-markdown]
end
subgraph Ex2[Execution B]
  P1B[Processor B-web] --> Q2[/queue/web_scraper.in/]
  Q2 --> P2B[Processor B-markdown]
end
Q1 -.SAME QUEUE.-> Q2
style Q1 fill:#fee2e2,stroke:#dc2626,color:#111
style Q2 fill:#fee2e2,stroke:#dc2626,color:#111
Without isolation, both executions publish to and subscribe from the same queue name. Data goes to the wrong processors. Lineage breaks. Storage keys collide.

Every publish and subscribe goes through ExecutionScopedQueue, which rewrites the queue name to include the execution id:

flowchart LR
subgraph Ex1[Execution A exec_id=aaa]
  P1A[Processor A-web] --> Q1A[/queue/aaa/web_scraper.in/]
  Q1A --> P2A[Processor A-markdown]
end
subgraph Ex2[Execution B exec_id=bbb]
  P1B[Processor B-web] --> Q2B[/queue/bbb/web_scraper.in/]
  Q2B --> P2B[Processor B-markdown]
end
style Q1A fill:#dcfce7,stroke:#16a34a,color:#111
style Q2B fill:#dcfce7,stroke:#16a34a,color:#111
With ExecutionScopedQueue, names are prefixed with the execution id. Distinct queues, zero cross-talk, single broker.

Implementation: factflow_execution.scoped_queue.ExecutionScopedQueue wraps a QueueProtocol instance. Every call translates names via _prefix(queue_name, exec_id). Consumers see the prefix; they don’t interact with raw queue names.

Point-to-point queues get scoped. Topics (broadcast) deliberately do not — topics are shared signalling channels:

flowchart TB
subgraph Executions[All running executions share...]
  T[/topic/system.pipeline.events/]
end
Ex1[Execution A] -->|publish| T
Ex2[Execution B] -->|publish| T
Ex3[Execution C] -->|publish| T
T --> Orch1[A's orchestrator<br/>filter on exec_id=A]
T --> Orch2[B's orchestrator<br/>filter on exec_id=B]
T --> Orch3[C's orchestrator<br/>filter on exec_id=C]
SYSTEM_PIPELINE_TOPIC is one shared broadcast. Subscribers filter by execution_id in their handler. Used for completion events, config-reload signals, etc.

Consumers receiving broadcast events must filter on the message’s execution_id themselves.

The isolation works uniformly across providers, but the name format differs:

flowchart LR
PR[ExecutionScopedQueue] --> ST[StompNamingStrategy]
PR --> AM[AmqpNamingStrategy]
PR --> PU[PulsarNamingStrategy]
ST -->|Artemis| AT["/queue/<exec>/route.in"]
AM -->|RabbitMQ| RQ["exec.ROUTE.exec_id"]
PU -->|Pulsar| PL["persistent://ns/exec_id/route.in"]
The strategy registry translates per-provider. Your YAML pipeline doesn't care which provider runs underneath.

Completion detection in a multi-exec world

Section titled “Completion detection in a multi-exec world”

Each orchestrator’s completion logic is local to its execution:

flowchart TB
subgraph One["One server process"]
  Mgr[OrchestratorManager]
  Orch1[Orchestrator exec=A]
  Orch2[Orchestrator exec=B]
  Orch3[Orchestrator exec=C]
  Mgr --> Orch1
  Mgr --> Orch2
  Mgr --> Orch3
end
Orch1 -.checks only A's.-> QA[(A's scoped queues)]
Orch1 -.checks only A's.-> LA[(A's lineage rows)]
Orch2 -.checks only B's.-> QB[(B's scoped queues)]
Orch2 -.checks only B's.-> LB[(B's lineage rows)]
Orch3 -.checks only C's.-> QC[(C's scoped queues)]
Orch3 -.checks only C's.-> LC[(C's lineage rows)]
Each orchestrator tracks idle for its own execution only. Completion checks never cross execution boundaries.

Per-execution metrics (via GET /api/v1/executions/{id}/stats) scope to that execution only. Per-system metrics (via GET /api/v1/system/metrics) aggregate across executions.

  • execution.in_flight — messages being processed in this execution
  • system.in_flight — total across everything

A processor crash in execution A does not affect execution B. The orchestrators are independent PipelineOrchestrator instances; a crash inside one doesn’t cascade.

Provider-level failures (broker restart, DB disconnect) affect all executions equally — they’re a global concern handled by provider reconnect logic.