Skip to content

Extension points

Factflow is designed to be extended. Five plug-in points, each behind a Protocol in factflow-protocols.

flowchart TB
subgraph Plugins[Your code]
  P1[PipelineAdapter<br/>for a new data source]
  P2[QueueProtocol<br/>for a new broker]
  P3[StorageProtocol<br/>for a new backend]
  P4[LLMClientProtocol<br/>for a new model provider]
  P5[RateLimitStrategy<br/>for per-domain quotas]
end
subgraph Factflow
  E[factflow-engine]
  I[factflow-infra]
  L[factflow-llm]
  W[factflow-webscraper]
end
P1 --> E
P2 --> I
P3 --> I
P4 --> L
P5 --> W
style Plugins fill:#fef3c7,stroke:#d97706,color:#111
Five extension seams. Each is a Protocol contract your implementation fulfils; factory-style registration hooks your class into the running system.

The most common extension. You want a new pipeline step for a specific domain.

flowchart LR
C["PipelineAdapter<br/>(factflow-protocols)"] --> My[YourAdapter]
My -->|@register_adapter_type| Reg[Discovery registry]
YAML["type: your_adapter"] -.matches.-> Reg
Reg -->|instantiate| My
Subclass PipelineAdapter, decorate for discovery, reference by `type:` in YAML.

Lives in: a new workflow package (backend/packages/workflows/factflow-<name>/).

Walkthrough: Guides / Workflow Adapters / Writing.

You need a broker Factflow doesn’t support (Redis Streams, Kafka, SQS).

flowchart LR
C["QueueProtocol<br/>(factflow-protocols)"] --> My[YourProvider]
My -->|register| F[factflow-infra.queue.factory]
My -->|register naming rules| NS[NamingStrategyRegistry]
Config["queue.provider: your_provider"] -.selects.-> F
Implement QueueProtocol, register in the factory, register a naming strategy compatible with the broker's rules.

Lives in: backend/packages/factflow-infra/src/factflow_infra/queue/<your_provider>/.

Key methods to implement: publish, subscribe, publish_batch, stop. All return MessageStatus. Handler-return ack is mandatory — no explicit ack/nack.

You want to use a storage system Factflow doesn’t ship (Google Cloud Storage, Azure Blob, a custom HTTP endpoint).

flowchart LR
C["StorageProtocol<br/>(factflow-protocols)"] --> My[YourStorageProvider]
My -->|register| F[factflow-infra.storage.factory]
URI["URI: mybackend://bucket"] -.URI scheme dispatch.-> F
Implement StorageProtocol, register a URI scheme. `create_storage_provider(uri)` dispatches.

Lives in: backend/packages/factflow-infra/src/factflow_infra/storage/<your_backend>/.

Key methods: write, read, exists, list, delete. Plus sidecar metadata handling.

You need a model provider Factflow doesn’t ship (Cohere, local vLLM, custom inference).

flowchart LR
C1["LLMClientProtocol<br/>(factflow-protocols)"] --> My[YourLLMClient]
C2["EmbeddingClientProtocol"] --> My2[YourEmbedClient]
My -->|register| F[LLMClientFactory]
My2 -->|register| F
Profile["provider_type: your_llm"] -.selects.-> F
Implement LLMClientProtocol (chat) and/or EmbeddingClientProtocol. Factory picks by provider_type in the profile config.

Lives in: backend/packages/factflow-llm/src/factflow_llm/<your_provider>_client.py.

Common pattern: wrap the vendor SDK, translate Message / Role types, gate with AdaptiveRateLimiter.

You have per-endpoint quotas the default per-domain bucket doesn’t express.

flowchart LR
Base[RateLimitStrategy] --> My[YourStrategy]
My -->|register| Reg[RateLimitStrategyRegistry]
YAML["rate_limit_strategy: your_strategy"] -.selects.-> Reg
Register in the strategy registry, reference by name in the webscraper adapter config.

Lives in: anywhere your adapter can import. Registration typically happens at import time:

from factflow_webscraper import RateLimitStrategyRegistry
class MyStrategy:
def acquire(self, url: str) -> float:
"""Seconds to wait before this request."""
...
RateLimitStrategyRegistry.register("my_strategy", MyStrategy())
  • The handler-return ack pattern — baked into QueueProtocol and the processor. Explicit ack/nack would fracture the cancel-safety guarantees.
  • Lineage schema — the row shape is fixed. Adapters extend via the metadata JSON column, not by adding columns.
  • Config validation rules — the validator’s E-code taxonomy is stable. New rules can be added server-side but error codes are append-only.
  • ExecutionScopedQueue semantics — the scoping mechanism itself is non-negotiable. A new provider just implements QueueProtocol; scoping still wraps it.