Skip to content

factflow-infra

Queue and storage provider implementations — concrete classes behind the QueueProtocol and StorageProtocol contracts.

The server and factflow-execution wire these into the orchestrator. Adapter authors never import providers directly — they receive QueueProtocol / StorageProtocol instances via dependency injection and code against the protocol.

Providers are plug-replaceable because they share a protocol. Swapping FilesystemStorageProvider for MinioStorageProvider in a deployment is a config change, not a code change.

Queue providers have more variance than storage — STOMP (Artemis), AMQP (RabbitMQ), and Pulsar protocols have different semantics for queue naming, ordering, and ack. Each provider package handles its own adaptation to QueueProtocol.

Only two re-exports at the root:

from factflow_infra import create_storage_provider

Most callers import from the subpackages directly.

from factflow_infra.storage import (
create_storage_provider, # URI → provider (file:// / minio:// / s3://)
StorageProviderFactory, # factory class (used when DI manages lifecycle)
FilesystemStorageProvider,
MinioStorageProvider,
)
from factflow_infra.storage.settings import StorageSettings
settings = StorageSettings(uri="file://./output/storage")
provider = create_storage_provider(settings)

URI dispatch:

  • file://<path>FilesystemStorageProvider
  • minio://<bucket> or s3://<bucket>MinioStorageProvider
from factflow_infra.queue import BatchHandle # handle returned by subscribe + ack API

Provider-specific clients live in factflow_infra.queue.artemis, .rabbitmq, .pulsar. The server wires one at startup based on config.

  • Runtime: stompman (Artemis STOMP), aio-pika (RabbitMQ AMQP), pulsar-client (Pulsar native), boto3 (S3), aiofiles (filesystem)
  • Workspace: factflow-protocols, factflow-foundation
  • External services: one queue broker (Artemis / RabbitMQ / Pulsar) + one storage backend (filesystem / MinIO / S3) per deployment

Tests at backend/packages/factflow-infra/tests/. dev-mode-dirs includes testing/ so provider fixtures are available to downstream packages without a separate published -testing package. See .claude/skills/backend/queue-testing/ for queue provider test patterns.

  • factflow-protocolsQueueProtocol, StorageProtocol, naming strategies
  • factflow-execution — wraps QueueProtocol with ExecutionScopedQueue
  • Rule: .claude/rules/queue-provider-conventions.md
  • Rule: .claude/rules/storage-conventions.md