Skip to Content
DocsAPI classesArchitecturesProtocols + base types

Architecture protocols + base types

from loomflow import Architecture from loomflow.architecture import AgentSession, Dependencies

The three pieces every architecture works with: the protocol every architecture implements (Architecture), the mutable per-run state (AgentSession), and the bundle of protocol implementations the Agent hands the architecture each run (Dependencies).

For the conceptual overview see Architectures. For implementing a custom architecture see Custom architectures.


Architecture

from typing import Protocol, runtime_checkable from collections.abc import AsyncIterator @runtime_checkable class Architecture(Protocol): name: str def declared_workers(self) -> dict[str, "Agent"]: ... async def run( self, session: AgentSession, deps: Dependencies, prompt: str, ) -> AsyncIterator[Event]: ...

The strategy that drives the agent loop. One Agent calls architecture.run(session, deps, prompt) and consumes the events it yields. Setup events (Event.started) and teardown events (Event.completed) are emitted by the Agent, NOT the architecture , architectures yield events that happen during iteration.

Attributes

AttributeTypeDescription
namestrShort identifier used in telemetry / logging (e.g. "react", "supervisor", "my-strategy").

Methods

declared_workers

def declared_workers(self) -> dict[str, Agent]: ...

Names of any sub-agents the architecture delegates to, mapped to their Agent instances. Used by AgentGraph for visualization. Empty dict for single-agent loops.

run

async def run( self, session: AgentSession, deps: Dependencies, prompt: str, ) -> AsyncIterator[Event]: ...

Async generator yielding Events as iteration progresses. Mutates session in place (messages, turns, output, cumulative_usage, interrupted, interruption_reason, metadata).

ParameterTypeDescription
sessionAgentSessionThe mutable per-run state. Architecture writes its progress here.
depsDependenciesBundled protocol implementations. Read-only struct.
promptstrThe user’s opening message.

AgentSession

@dataclass class AgentSession: id: str instructions: str messages: list[Message] = field(default_factory=list) turns: int = 0 output: str = "" cumulative_usage: Usage = field(default_factory=Usage) interrupted: bool = False interruption_reason: str | None = None metadata: dict[str, Any] = field(default_factory=dict)

Mutable per-run state shared between Agent and the architecture. The Agent constructs this once per run, the architecture mutates it as iteration progresses, and the Agent reads the final state to build a RunResult.

Fields

FieldTypeDefaultDescription
idstrrequiredRun id (a ULID). Used as the prefix for journal step names and event session ids.
instructionsstrrequiredThe system prompt (already concatenated with any skill catalog).
messageslist[Message][]The conversation so far. Architecture appends; Agent reads the seed messages from memory at run start.
turnsint0Increments as each iteration completes. Compared against Dependencies.max_turns.
outputstr""Final assistant answer. Architecture writes at termination.
cumulative_usageUsageUsage()Aggregate token / cost usage across all model calls in this run.
interruptedboolFalseSet to True when terminating early (budget, max_turns, cancellation).
interruption_reasonstr | NoneNoneHuman-readable reason: "budget:max_tokens", "max_turns", "swarm_cycle", etc.
metadatadict[str, Any]{}Free-form dict for architecture-specific state (planners stash plans, debate stashes round transcripts, etc.).

Dependencies

@dataclass class Dependencies: model: Model memory: Memory runtime: Runtime tools: ToolHost budget: Budget permissions: Permissions hooks: HookRegistry telemetry: Telemetry audit_log: AuditLog | None max_turns: int approval_handler: ApprovalHandler | None = None streaming: bool = False fast_audit: bool = True fast_telemetry: bool = True fast_permissions: bool = True fast_hooks: bool = True fast_runtime: bool = True

Bundled protocol implementations passed to every architecture. Constructed once per run from the Agent’s configured backends. Architectures treat this as read-only. They call methods on the contained protocols but don’t mutate the struct.

Core fields

FieldTypeDescription
modelModelThe model adapter to call (already wrapped by RetryingModel if a retry policy is enabled).
memoryMemoryThe memory backend for recall / remember. Already wrapped by AutoExtractMemory if auto_extract=True.
runtimeRuntimeWrap external calls with runtime.step(name, fn, *args) for replay-correct journaling.
toolsToolHostTool dispatch surface.
budgetBudgetCall budget.allows_step() before expensive work; budget.consume(...) after model calls.
permissionsPermissionsPer-tool gate. Architecture calls permissions.check(call, ...) before dispatch.
hooksHookRegistryhooks.pre_tool(...) / hooks.post_tool(...) dispatch.
telemetryTelemetryasync with telemetry.trace(...); telemetry.emit_metric(...).
audit_logAuditLog | Noneawait audit_log.append(entry). None when not configured.
max_turnsintHard cap on iterations; check session.turns >= max_turns.

Optional fields

FieldTypeDefaultDescription
approval_handlerApprovalHandler | NoneNoneAsync callable that resolves Decision.ask_(...) outcomes from the permissions layer. When unset, ask falls back to deny.
streamingboolFalseTrue when the consumer reads from agent.stream(). Architectures should preserve real-time event arrival when set. When False (the default for agent.run()), batching events for fewer task-group / channel allocations is allowed on the hot path.

Fast-mode flags

Auto-set by Agent._loop based on which protocol implementations are no-op defaults. Hot paths skip integration points when their layer is no-op:

FieldDefaultWhat it skips
fast_auditTrue_audit(...) calls when audit_log is None.
fast_telemetryTruetelemetry.trace(...) contextmanagers + emit_metric calls when telemetry is NoTelemetry.
fast_permissionsTruePer-tool permissions.check(...) when permissions is the no-op AllowAll.
fast_hooksTruehooks.pre_tool / hooks.post_tool dispatch when no hooks have been registered.
fast_runtimeTrueruntime.step(...) indirection when runtime is InProcRuntime.

The moment a user wires a real implementation, the corresponding flag flips false and the integration point becomes active. Same Agent class, same API, no flags to set.


ApprovalHandler (type alias)

ApprovalHandler = Callable[[ToolCall, str | None], Awaitable[bool]]

Async callable that resolves a Decision.ask_(...) from permissions.

ParameterTypeDescription
callToolCallThe pending tool call awaiting approval.
user_idstr | NoneThe active RunContext.user_id, so the handler can scope the approval to the right user / channel.

Returns, bool. True to allow the call, False to deny. Raising is treated as deny + warning logged. See Approval handlers.


Source

loomflow/architecture/base.py

Reading further. ReAct is the canonical reference implementation , ~250 lines exercising every protocol method. Read react.py once to see how the pieces fit together, then the multi-agent shapes in supervisor.py and swarm.py.

Last updated on