Skip to content

factflow-protocols

Abstract protocols (interfaces) for every infrastructure concern in Factflow — queue, storage, LLM, content resolution. Zero implementations live here; every other package depends on these contracts.

This is the root of the dependency DAG. Every shared service (factflow-infra, factflow-llm, factflow-engine, etc.) and every workflow package imports from here. Changes to these protocols ripple outward — review carefully.

Users writing a new queue provider, storage backend, or LLM client implement these protocols. Users writing a pipeline adapter implement PipelineAdapter. Users writing YAML configs don’t interact with this package directly.

The naming discipline is load-bearing:

  • Protocol — abstract contract (this package)
  • Provider — infrastructure implementation (factflow-infra)
  • Adapter — domain-specific pipeline step (workflow packages)

Keeping protocols in their own package forces the contract to be portable and implementation-free. It also lets tests use simple mock implementations without pulling heavy infrastructure dependencies into a test environment.

Seven modules, seven export groups.

from factflow_protocols import (
PipelineAdapter, # the base adapter contract
StatefulAdapter, # adapter variant that carries per-execution state
PipelineContext, # message context passed into process()
AdapterResult, # adapter return type
)

PipelineAdapter.process(context: PipelineContext) -> AdapterResult | None is the single method every workflow adapter implements. Must be cancel-safe — asyncio.CancelledError is expected during shutdown.

from factflow_protocols import (
QueueProtocol, # provider contract (Artemis / RabbitMQ / Pulsar implement)
QueueMessage, # single-message payload
BatchMessage, # batch payload
BatchResult,
BatchStatus,
BatchHandleProtocol, # handle for acking a batch
Subscription,
SubscriptionStatus,
MessageStatus,
)

QueueProtocol defines publish/subscribe/ack semantics. Handlers return an AdapterResult; ack happens only after return. No fire-and-forget.

from factflow_protocols import (
QueueNamingStrategy, # base contract
AmqpNamingStrategy, # RabbitMQ / AMQP 0-9-1 rules
StompNamingStrategy, # Artemis / STOMP rules
PulsarNamingStrategy, # Pulsar topic rules
NamingStrategyRegistry,
register_builtin_strategies,
)

Each provider has different rules for valid queue / topic names. Strategies validate names before publish/subscribe so invalid characters fail fast with a clear error rather than silently mangled by the broker.

from factflow_protocols import (
StorageProtocol, # provider contract (Filesystem / MinIO implement)
StorageInfo, # listing entry
StorageMetadata, # sidecar metadata
SourceMetadata, # origin metadata on an object
DataSensitivity, # enum: public / internal / restricted / ...
StorageError,
KeyNotFoundError,
)
from factflow_protocols import (
StorageEvent,
StorageEventPublisher,
EVENT_TYPE_OBJECT_CREATED,
)

Used by adapters subscribing to “object appeared in storage” signals — e.g. replay triggering when a missing artefact arrives.

from factflow_protocols import (
LLMClientProtocol, # chat / completion
EmbeddingClientProtocol, # embeddings
)

Provider factories in factflow-llm return instances satisfying these protocols. Adapter authors depend only on the protocol, not the concrete client class.

from factflow_protocols import (
ContentResolver,
ContentItem,
ContentSummary,
)

Abstract resolver for loading content by reference — used where an adapter needs the bytes behind a lineage or storage reference without knowing which backend holds them.

  • Runtime: pydantic>=2.12.5 — every protocol return type uses Pydantic models.
  • Workspace: none. This is the DAG root.
  • External services: none.

Protocol definitions are verified by implementations in factflow-infra, factflow-llm, and adapter packages. No dedicated test suite here — a broken protocol signature fails compile-time (via ty / mypy) and runtime (via every implementer’s test suite).

  • factflow-infra — queue + storage providers implementing these protocols
  • factflow-llm — LLM client factory returning LLMClientProtocol instances
  • factflow-engine — pipeline orchestration runtime consuming PipelineAdapter
  • Any workflow package (e.g. factflow-webscraper) — adapter implementations