Skip to content

Writing a new adapter

An adapter is one unit of pipeline work: consume a message, do something, emit zero or more messages. This guide walks from an empty package to a registered, config-driven, testable adapter.

Add a new adapter when:

  • The step doesn’t fit any existing workflow package’s purpose
  • The step is reusable across pipelines
  • A pure function inside an existing adapter isn’t enough (because you need DI, concurrency, or its own lifecycle)

If it’s a variation of an existing adapter, see if configuration or subclassing covers it first.

New workflow package lives at backend/packages/workflows/factflow-<name>/:

backend/packages/workflows/factflow-myflow/
├── pyproject.toml
├── src/factflow_myflow/
│ ├── __init__.py # re-exports public classes
│ ├── adapter.py # implements PipelineAdapter
│ ├── models.py # Pydantic models for input/output payloads
│ └── config.py # Pydantic config model
└── tests/
├── conftest.py
└── unit/test_adapter.py

pyproject.toml:

[project]
name = "factflow-myflow"
version = "0.7.0"
description = "Factflow my-flow pipeline adapters"
requires-python = ">=3.13"
dependencies = [
"factflow-protocols",
"factflow-foundation",
"factflow-engine",
]
[tool.uv.sources]
factflow-protocols = { workspace = true }
factflow-foundation = { workspace = true }
factflow-engine = { workspace = true }
[build-system]
requires = ["hatchling>=1.26.3"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/factflow_myflow"]

Register the package in the workspace root (backend/pyproject.toml):

[tool.uv.workspace]
members = [
# ... existing members
"packages/workflows/factflow-myflow",
]

Then cd backend && uv sync --all-groups to wire it up.

src/factflow_myflow/config.py
from pydantic import BaseModel, Field
class MyFlowConfig(BaseModel):
max_items: int = Field(100, ge=1, le=10000)
mode: str = Field(..., pattern="^(fast|thorough)$")
src/factflow_myflow/adapter.py
from factflow_protocols import (
PipelineAdapter,
PipelineContext,
AdapterResult,
)
from factflow_myflow.config import MyFlowConfig
from factflow_myflow.models import MyFlowInput, MyFlowOutput
class MyFlowAdapter(PipelineAdapter):
type = "my_flow"
config_class = MyFlowConfig
def __init__(self, config: MyFlowConfig):
self.config = config
async def process(self, context: PipelineContext) -> AdapterResult | None:
input_data = MyFlowInput.model_validate(context.message)
# Your work here — must be cancel-safe
output_items = await self._do_work(input_data)
# Emit one message per output item (fan-out)
return AdapterResult(
emitted=[
MyFlowOutput(item=item).model_dump()
for item in output_items[: self.config.max_items]
],
)
async def _do_work(self, input: MyFlowInput) -> list:
# ...
return []

Re-export the public surface:

src/factflow_myflow/__init__.py
"""Factflow my-flow — one-line summary."""
from factflow_myflow.adapter import MyFlowAdapter
from factflow_myflow.config import MyFlowConfig
from factflow_myflow.models import MyFlowInput, MyFlowOutput
__all__ = [
"MyFlowAdapter",
"MyFlowConfig",
"MyFlowInput",
"MyFlowOutput",
]

The engine auto-discovers adapters on server startup. Two mechanisms, both work:

The example above uses type = "my_flow" at the class level. Discovery scans every installed factflow-* package for classes inheriting PipelineAdapter with a type attribute.

If auto-discovery misses your adapter (e.g., you conditionally import it), explicitly register:

from factflow_engine import ValidatingAdapterRegistry
ValidatingAdapterRegistry.register("my_flow", MyFlowAdapter)

Call this during module import.

version: "1.0"
routes:
my_route:
inbound:
queue: "/queue/my_route.in"
subscription: "my-route-processors"
concurrency: 5
adapters:
- type: "my_flow"
config:
max_items: 200
mode: "fast"
init_message:
route: "my_route"
payload:
example: "payload"

Validate:

Terminal window
factflow config validate my-pipeline.yaml

Should succeed with my_flow resolved.

Every adapter’s process() must tolerate asyncio.CancelledError. If cancelled mid-call:

  • Don’t leave partial storage writes
  • Don’t leave database transactions open
  • Don’t hold external resources (files, network sockets) past the try/finally

Simplest rule: if you have async with, use it. If you allocate something, clean up on CancelledError explicitly.