Skip to content

Testing an adapter

Once your adapter implements PipelineAdapter, testing has three layers — unit (in-process, mocked dependencies), integration (real queue / storage via Testcontainers), and pipeline E2E (full orchestrator, factflow config validate + factflow config run).

Construct the adapter directly. Pass a realistic PipelineContext. Assert on AdapterResult.

tests/unit/test_my_flow_adapter.py
import pytest
from factflow_myflow.adapter import MyFlowAdapter
from factflow_myflow.config import MyFlowConfig
from factflow_protocols import PipelineContext
@pytest.mark.asyncio
async def test_my_flow_emits_per_item():
adapter = MyFlowAdapter(
config=MyFlowConfig(max_items=10, mode="fast"),
)
context = PipelineContext(
message={"items": ["a", "b", "c"]},
correlation_id="test-corr-id",
# ... other fields as needed
)
result = await adapter.process(context)
assert result is not None
assert len(result.emitted) == 3
assert result.emitted[0]["item"] == "a"

Run:

Terminal window
cd backend
uv run pytest packages/workflows/factflow-myflow/tests/unit/ -v

Never run bare pytest — always target a path.

When the adapter hits a real queue or real storage, use Testcontainers fixtures. The factflow-infra package exposes fixtures via dev-mode-dirs — import them directly from the testing/ directory.

tests/integration/test_my_flow_e2e.py
import pytest
from factflow_infra.testing import postgres_container, artemis_container
from factflow_myflow.adapter import MyFlowAdapter
@pytest.mark.asyncio
async def test_my_flow_writes_to_storage(tmp_path, storage_provider):
adapter = MyFlowAdapter(config=...)
# ... realistic setup

Integration tests are slower — tag them so they can be skipped in fast local runs:

@pytest.mark.integration
async def test_...:
...

And mark the target directory appropriately in pytest.ini / pyproject.toml.

Validate that the adapter composes correctly with the orchestrator. The test-cli harness at scripts/test-cli/run.sh runs canonical scenarios (s1 through s8). Adding a new scenario for your adapter:

scripts/test-cli/fixtures/my-flow-test.yaml
version: "1.0"
routes:
test_route:
inbound: ...
adapters:
- type: "my_flow"
config: {...}
init_message:
route: "test_route"
payload: {...}

Then invoke:

Terminal window
scripts/test-cli/run.sh your-scenario

If your adapter calls an LLM, use unittest.mock directly — factflow-llm doesn’t ship a mock client:

from unittest.mock import AsyncMock
from factflow_protocols import LLMClientProtocol
@pytest.fixture
def mock_llm():
mock = AsyncMock(spec=LLMClientProtocol)
mock.complete.return_value = CompletionResponse(
content="mocked response",
model="claude-sonnet-4-6",
tokens_in=10,
tokens_out=20,
)
return mock
async def test_with_llm(mock_llm):
adapter = MyFlowAdapter(config=..., llm_client=mock_llm)
# ...
mock_llm.complete.assert_awaited_once()

Provided by the factflow test skills in .claude/skills/backend/:

  • queue-testing — realistic queue provider fixtures (Artemis, RabbitMQ, Pulsar)
  • database-testing — PostgreSQL / pgvector with Testcontainers
  • lineage-testing — canonical lineage chains to assert against
  • pipeline-testing — full orchestrator for E2E scenarios
  • api-testing — FastAPI TestClient patterns
  • llm-unit-testing — canonical LLM mock patterns

Cancel-safety must be tested:

import asyncio
@pytest.mark.asyncio
async def test_my_flow_cancel_safe():
adapter = MyFlowAdapter(config=...)
context = PipelineContext(...)
task = asyncio.create_task(adapter.process(context))
await asyncio.sleep(0.01) # let it start
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
# Assert no partial state: no files left behind, no DB transactions open
Terminal window
cd backend
uv run ruff check .
uv run ruff format --check .
uv run pytest packages/workflows/factflow-myflow/tests/ -v

If your adapter touches public API types that frontend/CLI consume, check those too (run the frontend + CLI test suites — see CLAUDE.md).