Architecture protocols + base types
from loomflow import Architecture
from loomflow.architecture import AgentSession, DependenciesThe three pieces every architecture works with: the protocol every
architecture implements (Architecture), the mutable per-run state
(AgentSession), and the bundle of protocol implementations the
Agent hands the architecture each run (Dependencies).
For the conceptual overview see Architectures. For implementing a custom architecture see Custom architectures.
Architecture
from typing import Protocol, runtime_checkable
from collections.abc import AsyncIterator
@runtime_checkable
class Architecture(Protocol):
name: str
def declared_workers(self) -> dict[str, "Agent"]: ...
async def run(
self,
session: AgentSession,
deps: Dependencies,
prompt: str,
) -> AsyncIterator[Event]:
...The strategy that drives the agent loop. One Agent calls
architecture.run(session, deps, prompt) and consumes the events it
yields. Setup events (Event.started) and teardown events
(Event.completed) are emitted by the Agent, NOT the architecture
, architectures yield events that happen during iteration.
Attributes
| Attribute | Type | Description |
|---|---|---|
name | str | Short identifier used in telemetry / logging (e.g. "react", "supervisor", "my-strategy"). |
Methods
declared_workers
def declared_workers(self) -> dict[str, Agent]: ...Names of any sub-agents the architecture delegates to, mapped to
their Agent instances. Used by AgentGraph
for visualization. Empty dict for single-agent loops.
run
async def run(
self,
session: AgentSession,
deps: Dependencies,
prompt: str,
) -> AsyncIterator[Event]: ...Async generator yielding Events as iteration progresses. Mutates
session in place (messages, turns, output, cumulative_usage,
interrupted, interruption_reason, metadata).
| Parameter | Type | Description |
|---|---|---|
session | AgentSession | The mutable per-run state. Architecture writes its progress here. |
deps | Dependencies | Bundled protocol implementations. Read-only struct. |
prompt | str | The user’s opening message. |
AgentSession
@dataclass
class AgentSession:
id: str
instructions: str
messages: list[Message] = field(default_factory=list)
turns: int = 0
output: str = ""
cumulative_usage: Usage = field(default_factory=Usage)
interrupted: bool = False
interruption_reason: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)Mutable per-run state shared between Agent and the architecture.
The Agent constructs this once per run, the architecture mutates it
as iteration progresses, and the Agent reads the final state to
build a RunResult.
Fields
| Field | Type | Default | Description |
|---|---|---|---|
id | str | required | Run id (a ULID). Used as the prefix for journal step names and event session ids. |
instructions | str | required | The system prompt (already concatenated with any skill catalog). |
messages | list[Message] | [] | The conversation so far. Architecture appends; Agent reads the seed messages from memory at run start. |
turns | int | 0 | Increments as each iteration completes. Compared against Dependencies.max_turns. |
output | str | "" | Final assistant answer. Architecture writes at termination. |
cumulative_usage | Usage | Usage() | Aggregate token / cost usage across all model calls in this run. |
interrupted | bool | False | Set to True when terminating early (budget, max_turns, cancellation). |
interruption_reason | str | None | None | Human-readable reason: "budget:max_tokens", "max_turns", "swarm_cycle", etc. |
metadata | dict[str, Any] | {} | Free-form dict for architecture-specific state (planners stash plans, debate stashes round transcripts, etc.). |
Dependencies
@dataclass
class Dependencies:
model: Model
memory: Memory
runtime: Runtime
tools: ToolHost
budget: Budget
permissions: Permissions
hooks: HookRegistry
telemetry: Telemetry
audit_log: AuditLog | None
max_turns: int
approval_handler: ApprovalHandler | None = None
streaming: bool = False
fast_audit: bool = True
fast_telemetry: bool = True
fast_permissions: bool = True
fast_hooks: bool = True
fast_runtime: bool = TrueBundled protocol implementations passed to every architecture.
Constructed once per run from the Agent’s configured backends.
Architectures treat this as read-only. They call methods on the
contained protocols but don’t mutate the struct.
Core fields
| Field | Type | Description |
|---|---|---|
model | Model | The model adapter to call (already wrapped by RetryingModel if a retry policy is enabled). |
memory | Memory | The memory backend for recall / remember. Already wrapped by AutoExtractMemory if auto_extract=True. |
runtime | Runtime | Wrap external calls with runtime.step(name, fn, *args) for replay-correct journaling. |
tools | ToolHost | Tool dispatch surface. |
budget | Budget | Call budget.allows_step() before expensive work; budget.consume(...) after model calls. |
permissions | Permissions | Per-tool gate. Architecture calls permissions.check(call, ...) before dispatch. |
hooks | HookRegistry | hooks.pre_tool(...) / hooks.post_tool(...) dispatch. |
telemetry | Telemetry | async with telemetry.trace(...); telemetry.emit_metric(...). |
audit_log | AuditLog | None | await audit_log.append(entry). None when not configured. |
max_turns | int | Hard cap on iterations; check session.turns >= max_turns. |
Optional fields
| Field | Type | Default | Description |
|---|---|---|---|
approval_handler | ApprovalHandler | None | None | Async callable that resolves Decision.ask_(...) outcomes from the permissions layer. When unset, ask falls back to deny. |
streaming | bool | False | True when the consumer reads from agent.stream(). Architectures should preserve real-time event arrival when set. When False (the default for agent.run()), batching events for fewer task-group / channel allocations is allowed on the hot path. |
Fast-mode flags
Auto-set by Agent._loop based on which protocol implementations
are no-op defaults. Hot paths skip integration points when their
layer is no-op:
| Field | Default | What it skips |
|---|---|---|
fast_audit | True | _audit(...) calls when audit_log is None. |
fast_telemetry | True | telemetry.trace(...) contextmanagers + emit_metric calls when telemetry is NoTelemetry. |
fast_permissions | True | Per-tool permissions.check(...) when permissions is the no-op AllowAll. |
fast_hooks | True | hooks.pre_tool / hooks.post_tool dispatch when no hooks have been registered. |
fast_runtime | True | runtime.step(...) indirection when runtime is InProcRuntime. |
The moment a user wires a real implementation, the corresponding
flag flips false and the integration point becomes active. Same
Agent class, same API, no flags to set.
ApprovalHandler (type alias)
ApprovalHandler = Callable[[ToolCall, str | None], Awaitable[bool]]Async callable that resolves a Decision.ask_(...) from permissions.
| Parameter | Type | Description |
|---|---|---|
call | ToolCall | The pending tool call awaiting approval. |
user_id | str | None | The active RunContext.user_id, so the handler can scope the approval to the right user / channel. |
Returns, bool. True to allow the call, False to deny.
Raising is treated as deny + warning logged. See Approval handlers.
Source
Reading further. ReAct is the canonical reference implementation
, ~250 lines exercising every protocol method. Read react.py
once to see how the pieces fit together, then the multi-agent shapes
in supervisor.py
and swarm.py.