Writing a new adapter
An adapter is one unit of pipeline work: consume a message, do something, emit zero or more messages. This guide walks from an empty package to a registered, config-driven, testable adapter.
Decide whether you need a new adapter
Section titled “Decide whether you need a new adapter”Add a new adapter when:
- The step doesn’t fit any existing workflow package’s purpose
- The step is reusable across pipelines
- A pure function inside an existing adapter isn’t enough (because you need DI, concurrency, or its own lifecycle)
If it’s a variation of an existing adapter, see if configuration or subclassing covers it first.
Package layout
Section titled “Package layout”New workflow package lives at backend/packages/workflows/factflow-<name>/:
backend/packages/workflows/factflow-myflow/├── pyproject.toml├── src/factflow_myflow/│ ├── __init__.py # re-exports public classes│ ├── adapter.py # implements PipelineAdapter│ ├── models.py # Pydantic models for input/output payloads│ └── config.py # Pydantic config model└── tests/ ├── conftest.py └── unit/test_adapter.pypyproject.toml:
[project]name = "factflow-myflow"version = "0.7.0"description = "Factflow my-flow pipeline adapters"requires-python = ">=3.13"dependencies = [ "factflow-protocols", "factflow-foundation", "factflow-engine",]
[tool.uv.sources]factflow-protocols = { workspace = true }factflow-foundation = { workspace = true }factflow-engine = { workspace = true }
[build-system]requires = ["hatchling>=1.26.3"]build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]packages = ["src/factflow_myflow"]Register the package in the workspace root (backend/pyproject.toml):
[tool.uv.workspace]members = [ # ... existing members "packages/workflows/factflow-myflow",]Then cd backend && uv sync --all-groups to wire it up.
Implement the adapter
Section titled “Implement the adapter”Config model
Section titled “Config model”from pydantic import BaseModel, Field
class MyFlowConfig(BaseModel): max_items: int = Field(100, ge=1, le=10000) mode: str = Field(..., pattern="^(fast|thorough)$")Adapter
Section titled “Adapter”from factflow_protocols import ( PipelineAdapter, PipelineContext, AdapterResult,)from factflow_myflow.config import MyFlowConfigfrom factflow_myflow.models import MyFlowInput, MyFlowOutput
class MyFlowAdapter(PipelineAdapter): type = "my_flow" config_class = MyFlowConfig
def __init__(self, config: MyFlowConfig): self.config = config
async def process(self, context: PipelineContext) -> AdapterResult | None: input_data = MyFlowInput.model_validate(context.message)
# Your work here — must be cancel-safe output_items = await self._do_work(input_data)
# Emit one message per output item (fan-out) return AdapterResult( emitted=[ MyFlowOutput(item=item).model_dump() for item in output_items[: self.config.max_items] ], )
async def _do_work(self, input: MyFlowInput) -> list: # ... return []Package __init__.py
Section titled “Package __init__.py”Re-export the public surface:
"""Factflow my-flow — one-line summary."""
from factflow_myflow.adapter import MyFlowAdapterfrom factflow_myflow.config import MyFlowConfigfrom factflow_myflow.models import MyFlowInput, MyFlowOutput
__all__ = [ "MyFlowAdapter", "MyFlowConfig", "MyFlowInput", "MyFlowOutput",]Registration
Section titled “Registration”The engine auto-discovers adapters on server startup. Two mechanisms, both work:
Class-attribute convention
Section titled “Class-attribute convention”The example above uses type = "my_flow" at the class level. Discovery scans every installed factflow-* package for classes inheriting PipelineAdapter with a type attribute.
Explicit register
Section titled “Explicit register”If auto-discovery misses your adapter (e.g., you conditionally import it), explicitly register:
from factflow_engine import ValidatingAdapterRegistry
ValidatingAdapterRegistry.register("my_flow", MyFlowAdapter)Call this during module import.
Use it in a pipeline
Section titled “Use it in a pipeline”version: "1.0"
routes: my_route: inbound: queue: "/queue/my_route.in" subscription: "my-route-processors" concurrency: 5
adapters: - type: "my_flow" config: max_items: 200 mode: "fast"
init_message: route: "my_route" payload: example: "payload"Validate:
factflow config validate my-pipeline.yamlShould succeed with my_flow resolved.
Cancel safety
Section titled “Cancel safety”Every adapter’s process() must tolerate asyncio.CancelledError. If cancelled mid-call:
- Don’t leave partial storage writes
- Don’t leave database transactions open
- Don’t hold external resources (files, network sockets) past the
try/finally
Simplest rule: if you have async with, use it. If you allocate something, clean up on CancelledError explicitly.
Related
Section titled “Related”- Testing an adapter — unit + integration test patterns
- factflow-protocols reference — the full
PipelineAdaptercontract - Reference: Adapter catalog — every shipped adapter
- Concept: Pipelines