Custom architectures
Architectures are pluggable. Satisfy the Architecture protocol ,
three methods. And pass an instance to Agent(architecture=...).
The protocol
from collections.abc import AsyncIterator
from typing import Protocol, runtime_checkable
from loomflow.architecture.base import AgentSession, Dependencies
from loomflow.core.types import Event
@runtime_checkable
class Architecture(Protocol):
@property
def name(self) -> str: ...
@property
def declared_workers(self) -> tuple[str, ...]: ...
async def run(
self,
session: AgentSession,
deps: Dependencies,
) -> AsyncIterator[Event]:
...name. Short identifier for telemetry / logging (e.g."react","my-strategy").declared_workers. Names of any sub-agents the architecture delegates to. Used for graph visualization. Empty tuple for single-agent loops.run(session, deps). Async generator yieldingEvents as iteration progresses.
What AgentSession and Dependencies give you
AgentSession is the mutable per-run state:
| Field | Purpose |
|---|---|
id | Run id (ULID) |
instructions | The agent’s system instructions |
messages | The model conversation so far |
turns | Increment as each iteration completes |
output | Final answer (write at termination) |
cumulative_usage | Aggregate Usage |
interrupted, interruption_reason | Set when terminating early |
metadata | Free-form dict for architecture-specific state |
Dependencies is the read-only protocol bundle:
| Field | Use |
|---|---|
model | The Model to call |
memory | Recall episodes / facts; remember new ones |
runtime | runtime.step(name, fn, *args) for replay-correct calls |
tools | The ToolHost to dispatch tools through |
budget | budget.check(...) before expensive work |
permissions | permissions.check(call) before tool dispatch |
hooks | hooks.pre_tool(...) / hooks.post_tool(...) |
telemetry | async with telemetry.trace("span_name"): |
audit_log | await audit_log.append(entry) |
max_turns | Hard cap on iterations |
approval_handler | Resolves Decision.ask_(...) |
Setup events (Event.started) and teardown events (Event.completed)
are emitted by Agent, NOT the architecture. Architectures yield the
events that happen during iteration. Per-turn, per-tool, per-step,
budget warnings, errors.
Minimal example: a “single-shot” architecture
from collections.abc import AsyncIterator
from loomflow import Agent
from loomflow.architecture.base import AgentSession, Architecture, Dependencies
from loomflow.core.types import Event
class SingleShot:
"""One model call, no tools, return the answer. For testing."""
name: str = "single-shot"
declared_workers: tuple[str, ...] = ()
async def run(
self,
session: AgentSession,
deps: Dependencies,
) -> AsyncIterator[Event]:
chunks = []
async for chunk in deps.model.stream(messages=session.messages):
chunks.append(chunk)
yield Event.model_chunk(session.id, chunk)
text = "".join(c.text or "" for c in chunks)
session.output = text
session.turns = 1
# No need to yield Event.completed — Agent emits that.
agent = Agent("...", model="claude-opus-4-7", architecture=SingleShot())Replay-correctness contract
If you make external calls inside run, wrap them with
runtime.step(...) so they’re cached on the journal:
result = await deps.runtime.step(
f"my_call_{session.turns}",
my_async_callable,
arg1, arg2,
)Without this, a crashed run can’t resume cleanly. The call would re-execute and produce a different result. See Runtime.
Telemetry-correctness contract
Open a span per logical “turn” so the OTel trace matches the iteration:
async with deps.telemetry.trace(
"myarch.turn",
attributes={"turn": session.turns, "session_id": session.id},
):
...ReAct opens loom.turn, loom.model.stream, loom.tool per
call. Match that pattern for compatible dashboards.
Reading existing architectures. loomflow/architecture/react.py
is the canonical reference. About 250 lines, exercises every
protocol method. For multi-agent shapes, read supervisor.py next.