Agent
from loomflow import AgentThe fully-async, MCP-native, model-agnostic agent harness. One
Agent instance is a configured loop driver. It bundles instructions,
a model, memory, runtime, tools, budget, permissions, telemetry, and
audit log into a single callable. The same instance serves N users
via user_id= on each agent.run().
For the conceptual overview see What is an Agent.
Class signature
class Agent:
def __init__(
self,
instructions: str,
*,
model: Model | str | None = None,
memory: Memory | str | Mapping[str, Any] | None = None,
runtime: Runtime | None = None,
budget: Budget | None = None,
permissions: Permissions | None = None,
hooks: HookRegistry | None = None,
tools: list[Tool | Callable[..., object]]
| ToolHost
| Tool
| Callable[..., object]
| None = None,
telemetry: Telemetry | None = None,
audit_log: AuditLog | None = None,
max_turns: int = 50,
architecture: Architecture | str | None = None,
skills: list[Any] | None = None,
auto_extract: bool | None = None,
approval_handler: ApprovalHandler | None = None,
output_schema: type[BaseModel] | None = None,
effort: str | None = None,
strict_effort: bool = False,
prompt_caching: bool | Mapping[str, Any] | None = None,
workspace: Workspace | WorkspaceMembership | str | Mapping[str, Any] | None = None,
living_plan: bool | LivingPlan | None = None,
tuning: Tuning | None = None, # rarely-touched knobs (0.10.24+)
) -> None: ...
tuning(0.10.24+) groups the rarely-touched knobs —retry_policy,secrets,auto_consolidate,response_tone,stop_hooks,max_stop_hook_iterations,tool_result_summary_threshold,tool_transcript_max_bytes,auto_compact_summariser,auto_compact_keep_recent_turns— into oneTuningdataclass. Passtuning=Tuning(...). The old flat kwargs still work but emit aDeprecationWarning.
Constructor parameters
instructions
| Type | str |
| Default | required (positional) |
The system prompt the model sees. When skills= is non-empty, the
framework appends a per-skill catalog (~50 tokens / skill) to this
string so the model can decide which skill to load on demand.
agent = Agent("You are a careful research assistant. Cite your sources.")model
| Type | Model | str | Mapping[str, Any] | None |
| Default | None |
Three accepted shapes. A Model protocol implementation, a string
the resolver maps to one, or a dict config. Required at runtime.
None raises ConfigError on the first agent.run(). The resolver
fails fast on purpose rather than silently falling back to
EchoModel.
String prefixes:
| Prefix | Resolves to | Env var |
|---|---|---|
claude-* | AnthropicModel | ANTHROPIC_API_KEY |
gpt-*, o1-*, o3-*, o4-* | OpenAIModel | OPENAI_API_KEY |
mistral-*, command-*, bedrock/*, vertex_ai/*, ollama/*, groq/*, litellm/* | LiteLLMModel | provider-specific |
echo | EchoModel | none |
agent = Agent("...", model="claude-opus-4-7")
agent = Agent("...", model=AnthropicModel("claude-opus-4-7", max_tokens=8192))Dict-config form (0.9.36+). When you want the model spec and its related dials in one place:
agent = Agent(
"...",
model={
"name": "claude-opus-4-7",
"effort": "high", # see Reasoning effort
"strict_effort": True, # fail loudly if model can't honour effort
},
)Same shape philosophy as audit_log={...}. Top-level kwargs win over
matching keys in the dict, so you can layer environment overrides on
top of a shared config. See Reasoning effort
for the dict’s effort / strict_effort keys.
memory
| Type | Memory | str | Mapping[str, Any] | None |
| Default | None (resolves to InMemoryMemory()) |
Conversational state. Episodes, working blocks, bi-temporal facts. The resolver accepts:
None→InMemoryMemory()(lost on process exit)."inmemory"→InMemoryMemory()."sqlite:./bot.db"→SqliteMemory.connect(...)(lazy)."chroma"/"chroma:./path"→ChromaMemory.local(...)."postgres://user:pw@host/db"→PostgresMemory.connect(...)(lazy)."redis://localhost:6379/0"→RedisMemory.connect(...)(lazy).- A
Mapping(dict):{"backend": ..., "path": ..., "namespace": ..., "embedder": ..., "with_facts": ...}. - An explicit
Memoryinstance.
For async backends (Postgres / Redis) the resolver returns a
LazyMemory proxy; the connection opens on first use so Agent(...)
stays synchronous. See Memory backends.
runtime
| Type | Runtime | None |
| Default | None (resolves to InProcRuntime()) |
The journaling layer. With a durable runtime
(SqliteRuntime("./journal.db") or PostgresRuntime), every model
call and tool dispatch is keyed by (session_id, step_name) and
cached. A crashed run resumes via agent.resume(session_id, prompt).
InProcRuntime (the default) makes runtime.step(...) collapse to
await fn(...) directly. Zero hot-path overhead. See Runtime.
budget
| Type | Budget | None |
| Default | None (resolves to NoBudget()) |
Hard caps on tokens / cost / wall-clock per run AND per user_id.
StandardBudget(BudgetConfig(...)) is the production choice; see
Per-user budget caps.
When the budget is exceeded, the run terminates cleanly with
RunResult.interrupted = True, RunResult.interruption_reason = "budget:max_tokens" (or matching field).
permissions
| Type | Permissions | None |
| Default | None (resolves to AllowAll()) |
Decides allow / deny / ask per tool call. Production usually wants
StandardPermissions(mode=Mode.DEFAULT) paired with an
approval_handler. For multi-tenant policy routing, use
PerUserPermissions(policies=, default=). See Permissions.
hooks
| Type | HookRegistry | None |
| Default | None (a fresh empty HookRegistry()) |
Registry that holds before_tool / after_tool callbacks. Usually
you don’t pass this directly. Use the @agent.before_tool /
@agent.after_tool decorator sugar after construction. See Hooks.
tools
| Type | list[Tool | Callable] | ToolHost | Tool | Callable | None |
| Default | None (a fresh empty InProcessToolHost()) |
The tool surface. Five accepted shapes:
# A list (the common case)
agent = Agent("...", tools=[get_weather, send_email])
# A single tool
agent = Agent("...", tools=get_weather)
# A pre-built host (lets you swap the dispatch implementation)
agent = Agent("...", tools=InProcessToolHost([get_weather]))
# An MCP registry
agent = Agent("...", tools=MCPRegistry([git_server, fs_server]))
# A sandbox-wrapped host
agent = Agent("...", tools=FilesystemSandbox(host, roots=["/work"]))Bare callables are auto-wrapped via @tool. See Tools.
telemetry
| Type | Telemetry | None |
| Default | None (resolves to NoTelemetry()) |
Telemetry sink for spans + metrics. Choose InMemoryTelemetry /
ConsoleTelemetry / FileTelemetry / MultiTelemetry for
collector-free dev, or OTelTelemetry(tracer_provider=...) for
production OTLP. The loop emits loom.run / loom.turn /
loom.model.stream / loom.tool spans plus loom.tokens.* /
loom.cost.usd / loom.tool.duration_ms metrics, all tagged with
user_id. See Telemetry.
audit_log
| Type | AuditLog | Mapping[str, Any] | None |
| Default | None |
InMemoryAuditLog() for tests, FileAuditLog(path, secret="...")
for production. Every run_started / tool_call / tool_result /
run_completed entry is HMAC-signed and attributed to the active
user_id. See Audit log attribution.
Dict-config form (0.9.36+). Hand the resolver a dict and it builds the right backend for you:
agent = Agent(
"...",
audit_log={
"name": "./audit.jsonl", # path; omit this for in-memory
"scope_full": True, # capture prompts + outputs verbatim
"secret": "my-org-key", # optional; HMAC-signs every entry
},
)The default audit log is compliance-friendly. Prompts get truncated
at 500 characters, the model’s output isn’t recorded, and tool
results carry only ok / denied / error / reason. Flip
scope_full: True to opt into verbatim capture for debugging. The
resolver wraps the backend in FullTranscriptAuditLog. Signatures
still verify. The same dict shape works on Workflow.
max_turns
| Type | int |
| Default | 50 |
Hard cap on iterations of the agent loop. Hitting it terminates
cleanly with RunResult.interrupted = True,
RunResult.interruption_reason = "max_turns". Lower for expensive
tools or runaway prevention; raise for deep multi-step tasks.
auto_consolidate
| Type | bool |
| Default | False |
When True, calls await self.consolidate() after every agent.run()
to extract facts from the new episode. For most production cases
prefer auto_extract=True (the default for real models). Same
extraction, but on the write path with proper telemetry. The
auto_consolidate flag exists for callers wanting an explicit
post-run consolidation pass.
(0.10.24+) Pass via
tuning=Tuning(auto_consolidate=True). The flatAgent(auto_consolidate=...)form is deprecated and warns.
architecture
| Type | Architecture | str | None |
| Default | None (resolves to ReAct()) |
The loop strategy. Pass an instance (Supervisor(workers={...}),
PlanAndExecute(...), Reflexion(base=...)) or a string spec
("react", "plan-and-execute", "rewoo", "reflexion",
"self-refine", "tree-of-thoughts"). Multi-agent shapes
(Supervisor / Router / Swarm / Debate / ActorCritic / Blackboard)
require an instance. The string resolver only knows single-agent
loops. See Architectures.
skills
| Type | list[str | Path | Skill | tuple[str | Path, str]] | None |
| Default | None |
Anthropic-format SKILL.md packages. Accepts paths to skill
directories, in-line Skill instances, or (path, label) tuples
for source labelling. Multi-source layering applies last-source-wins
override semantics; tools auto-namespace with skill_name__ prefix.
See Skills.
retry_policy
| Type | RetryPolicy | None |
| Default | None (auto-picked per model) |
When None, the framework picks RetryPolicy.default() for
network-backed models (AnthropicModel / OpenAIModel /
LiteLLMModel) and RetryPolicy.disabled() for in-process fakes
(EchoModel / ScriptedModel). Pass RetryPolicy.aggressive(),
RetryPolicy.disabled(), or a custom RetryPolicy(max_attempts=..., base_delay_s=..., max_delay_s=..., jitter=..., honor_retry_after=...)
to override. See RetryPolicy.
(0.10.24+) Pass via
tuning=Tuning(retry_policy=...). The flatAgent(retry_policy=...)form is deprecated and warns.
auto_extract
| Type | bool | None |
| Default | None (auto-picked per model) |
When None, the framework picks True for real network adapters
(OpenAIModel / AnthropicModel / LiteLLMModel) and False for
in-process fakes (EchoModel / ScriptedModel). When True, every
agent.run() ends with a small Consolidator pass that pulls
structured (subject, predicate, object) facts into the bi-temporal
fact store. Telemetry signals: loom.auto_extract.duration_ms,
loom.auto_extract.invocations. See Bi-temporal facts.
approval_handler
| Type | Callable[[ToolCall, str | None], Awaitable[bool]] | None |
| Default | None |
Async callable that resolves Decision.ask_(...) outcomes from the
permissions layer. Receives the pending ToolCall plus the live
user_id; returns True to allow, False to deny.
When unset, ask falls back to deny so the agent never silently
bypasses the gate. Failure-mode contract:
- Returns
False→ tool result isdeniedwithreason="approval declined". - Not wired (
None) →deniedwithreason="approval required; no approver". - Raises → treated as deny + warning logged.
See Approval handlers.
secrets
| Type | Secrets | None |
| Default | None (resolves to EnvSecrets()) |
API key resolver. Resolution order inside model adapters:
- Explicit
api_key=argument on the adapter. secrets.lookup_sync(<ENV_VAR_NAME>).os.environ[<ENV_VAR_NAME>].
For Vault / AWS Secrets Manager / 1Password, write a custom adapter
satisfying the Secrets protocol. See Secrets resolution.
(0.10.24+) Pass via
tuning=Tuning(secrets=...). The flatAgent(secrets=...)form is deprecated and warns.
output_schema (0.9.10+)
| Type | type[BaseModel] | None |
| Default | None |
Agent-bound default schema. When set, every agent.run() /
agent.stream() call applies it to validate the final answer; a
per-call output_schema= override on run() still takes precedence
for one-off shapes. Mirrors Pydantic AI’s output_type= ergonomics.
from pydantic import BaseModel
from loomflow import Agent
class Invoice(BaseModel):
amount_cents: int
vendor: str
# Once on construction:
extractor = Agent(
"Extract invoice fields.",
model="gpt-4o",
output_schema=Invoice,
)
# Every run uses Invoice automatically:
result = await extractor.run("From: alice@acme...")
invoice: Invoice = result.parsedTagged-union schemas (0.9.11+). Pass output_schema=A | B
(or Union[A, B]) and the framework tries each member in declaration
order and accepts the first that fits. Lets you model “valid result
vs structured error” without a discriminator field:
class Found(BaseModel):
invoice_no: str
total_cents: int
class NotFound(BaseModel):
reason: str
agent = Agent("Look up the invoice.", model="gpt-4o",
output_schema=Found | NotFound)Native structured-output support (0.9.9+). When an
output_schema is set, model adapters translate it into the
provider’s native idiom so the model is constrained at decode time:
- OpenAI.
Response_format={"type": "json_schema", ..., "strict": True}onchat.completions.create. - Anthropic. Synthetic
__output__tool with the schema asinput_schema, plustool_choiceforcing the model to call it. - LiteLLM. Passthrough where the underlying provider supports it.
Adapters without native support fall back to the prompt-augmentation
- retry-with-validation path. Either way the user-facing API is
identical: pass
output_schema=, get a validatedresult.parsed.
Cost optimization (0.9.30+). When an adapter declares
supports_native_structured_output = True, the framework skips
appending the JSON-schema directive to the system prompt (native
constraint at decode time + in-prompt schema were
belt-and-suspenders that bloated cost without adding reliability).
The retry path still injects the schema if the model produces
invalid JSON, so the safety net is preserved.
Concrete impact on the framework’s bench (gpt-4.1-mini, RAG +
Pydantic PdfSummary): structured-output input tokens 3091 → ~1100
(~64% reduction). OpenAI / Anthropic / LiteLLM adapters all
ship with the flag enabled; custom user-supplied adapters default
to False so the prompt-augmentation safety net stays on for
unknown models.
response_tone (0.9.32+)
| Type | str | None |
| Default | None (no tone directive; no behaviour change) |
Steers how the agent phrases its output. Not what it answers. The framework appends a one-line style directive to the system prompt, after any schema directive (late-system-prompt instructions empirically get the most weight).
Three orthogonal levers. Don’t conflate them:
| Lever | Controls | Where |
|---|---|---|
instructions= | What the agent does | Agent("...") positional |
| Persona (part of instructions) | Who the agent is | Agent("You are a tax lawyer...") |
response_tone= | How the agent phrases output | Agent("...", tuning=Tuning(response_tone=...)) |
Shipped presets (one tight sentence each, intentionally short , longer prompt fragments dilute the effect):
| Preset | Effect |
|---|---|
casual | Warm, conversational, plain language, contractions. |
professional | Neutral, polished, structured; no slang. |
technical | Precise terminology; step-by-step reasoning; specificity over generality. |
legal | Formal legal tone; precise terminology; explicit qualifications. |
finance | Numbers / percentages / timeframes; distinguish data from estimates. |
executive | Brief, decision-oriented; lead with the recommendation. |
academic | Citation-aware, hedged; reasoning before claims. |
Free-form passthrough. Anything that’s not a preset is sent verbatim. The preset map is convenience, not a gatekeeper:
from loomflow import Tuning
agent = Agent("...", model="gpt-4.1-mini", tuning=Tuning(response_tone="legal"))
agent = Agent("...", tuning=Tuning(response_tone="warm but precise, like a doctor explaining a diagnosis"))Resolution order (highest priority first):
- Per-call
agent.run(..., response_tone=...)override. Agent(tuning=Tuning(response_tone=...))default.- Workflow ambient,
Workflow(response_tone=...)propagates to every nestedAgentstep that didn’t bring its own. Same contextvar pattern asWorkflow(memory=...). None. No tone directive, no behaviour change.
The contextvar resets in finally so tones do not leak across
workflow runs.
effort (0.9.36+)
| Type | str | None |
| Default | None (provider default) |
Unified reasoning-effort dial across providers. Pass one of
"minimal", "low", "medium", "high", "xhigh", "max"; the
framework translates to the right native shape for the model:
OpenAI’s reasoning_effort, Anthropic’s adaptive thinking +
output_config.effort (Opus 4.7 honours the full enum, 4.6 clamps
xhigh/max to high), legacy Sonnets’ thinking.budget_tokens
integer, or LiteLLM’s normalized passthrough.
agent = Agent("...", model="claude-opus-4-7", effort="high")Resolution order (highest priority first):
- Per-call
agent.run(..., effort=...)override. Agent(effort=...)default.None. Provider’s own default.
Models that can’t honour the dial (Haiku, base GPT-4, older
Claudes) drop the kwarg and emit a one-time UserWarning per
(model, effort) pair. Opt into hard-fail with strict_effort=True.
See Reasoning effort for the full cross-provider mapping table and the Anthropic three-regime breakdown.
strict_effort (0.9.36+)
| Type | bool |
| Default | False |
When True, wiring effort= to a model that can’t honour it
raises EffortNotSupportedError instead of dropping the kwarg with
a warning:
from loomflow import Agent
from loomflow.model._effort import EffortNotSupportedError
agent = Agent(
"...",
model="claude-haiku-3-5", # doesn't support thinking
effort="high",
strict_effort=True,
)
try:
await agent.run("hi")
except EffortNotSupportedError as exc:
... # caught — wiring was wrongUse in CI / pre-prod to surface model-mismatch typos immediately.
Leave at the default False in production so a vendor outage that
falls back onto a non-reasoning model doesn’t crash the agent.
strict_effort is agent-level only. There’s no per-call
override. Whether a model can honour effort is a property of the
adapter, not of any single call.
prompt_caching (0.9.41+)
| Type | bool | Mapping[str, Any] | None |
| Default | None (caching off) |
Per-provider prompt caching. True enables it with a 5-minute TTL.
A dict gives per-field control:
agent = Agent(
LARGE_SYSTEM_PROMPT,
model="claude-opus-4-7",
prompt_caching={"enabled": True, "ttl": "1h", "cache_key": "session_42"},
)On Anthropic the framework injects cache_control markers on the
last system block and last tool definition. On OpenAI it parses
cached_tokens for accurate accounting and forwards cache_key as
the routing hint. Read tokens land in RunResult.cached_tokens_in;
cost_usd already reflects the discount. See Prompt caching.
workspace (0.9.39+)
| Type | Workspace | WorkspaceMembership | str | Mapping[str, Any] | None |
| Default | None (no shared notebook) |
Wires a shared notebook and installs the five
notebook tools (note, read_note, list_notes, search_notes,
update_note) on the agent’s tool host. Accepts:
- A
WorkspaceMembershipviaws.member("name", teammates=[...])— the usual path, carries the notebook plus this agent’s identity. - A bare
Workspaceinstance — shared notebook, genericagentattribution. - A string —
"temp","memory", or a filesystem path. - A dict —
{"backend": ..., "author": ..., "teammates": [...]}.
from loomflow import Agent, LocalDiskWorkspace
ws = LocalDiskWorkspace.temp()
agent = Agent(
"...",
workspace=ws.member("researcher", teammates=["analyst", "writer"]),
)Propagates through Workflow and Team the same way memory=
does. See Workspace.
living_plan (0.9.42+)
| Type | bool | LivingPlan | None |
| Default | None (no plan tools) |
Enables the TodoWrite-style living plan.
True installs plan_write and plan_read and augments the
system prompt with the plan discipline. Pass a constructed
LivingPlan to pre-seed the run with a plan already in place.
agent = Agent("...", model="claude-opus-4-7", living_plan=True)When a workspace= is also wired, a third tool appears,
recall_past_plans, and every plan_write mirrors to a
kind="plan" note so future runs can bootstrap from prior plans.
Methods
from_config (0.9.37+)
@classmethod
def from_config(
cls,
path: str | Path,
*,
model: ... | None = None, # kwargs override matching cfg
memory: ... | None = None,
runtime: ... | None = None,
telemetry: ... | None = None,
audit_log: ... | None = None,
permissions: ... | None = None,
tools: list[Tool] | ToolHost | None = None,
secrets: Any | None = None,
hooks: HookRegistry | None = None,
retry_policy: RetryPolicy | None = None,
approval_handler: Any | None = None,
) -> Agent: ...Reads a TOML file and builds an Agent from it. Designed for ops / SRE / compliance who want declarative config separate from code. Every backend the framework can build sync (model, memory, runtime, telemetry, audit log, permissions, budget, architecture, effort, skills, MCP servers) goes in the TOML. Things TOML can’t naturally express (real callables, custom hook objects, secret stores, retry policies) come in through kwargs that override matching cfg entries.
from loomflow import Agent
agent = Agent.from_config("./agent.toml")Requires Python 3.11+ for tomllib. See Config file
for the full TOML schema reference.
from_dict (0.9.37+)
@classmethod
def from_dict(
cls,
cfg: dict[str, Any],
*,
# same kwargs as from_config — they override matching cfg keys
) -> Agent: ...Same shape as from_config but skips the file read. Useful when the
config already lives somewhere structured: a Pydantic BaseSettings,
a YAML file you’ve already parsed, a service-config response, env-var
overrides, anything that hands you a dict.
agent = Agent.from_dict({
"instructions": "You are a helpful assistant.",
"model": "echo",
"memory": {"backend": "sqlite", "path": "./m.db"},
"budget": {"max_tokens": 10_000},
})run
async def run(
self,
prompt: str,
*,
user_id: str | None = None,
session_id: str | None = None,
metadata: Mapping[str, Any] | None = None,
context: RunContext | None = None,
extra_tools: list[Tool] | None = None,
emit: Callable[[Event], Awaitable[None]] | None = None,
output_schema: type[BaseModel] | None = None,
output_validation_retries: int = 1,
response_tone: str | None = None,
effort: str | None = None,
) -> RunResult: ...Run the agent to completion and return its RunResult.
| Parameter | Type | Default | Description |
|---|---|---|---|
prompt | str | required | The user message that opens this run. |
user_id | str | None | None | Multi-tenant partition key. Episodes / facts / budget / audit / permissions all scope by it. None is the anonymous bucket. See Multi-tenancy. |
session_id | str | None | None | Conversation continuity key. Same id rehydrates prior turns from memory; with a durable runtime, completed steps replay from the journal. |
metadata | Mapping[str, Any] | None | None | Free-form bag the framework does not interpret. Tools and hooks read it via get_run_context().metadata. |
context | RunContext | None | None | A pre-built RunContext instead of the individual kwargs. When both are provided, the explicit kwargs override the matching fields on context. Useful for forwarding parent context across multi-agent boundaries. |
extra_tools | list[Tool] | None | None | Tools added for this run only. The host is wrapped so the model sees the extras alongside the configured tools, without permanently mutating the agent’s static configuration. Used by Swarm / Supervisor to inject handoff / delegate. |
emit | Callable[[Event], Awaitable[None]] | None | None | Per-event callback. None (default) drops events. Used by stream() and by multi-agent forwarders. |
output_schema | type[BaseModel] | None | None | A Pydantic model the final answer must match. The framework appends a JSON-schema directive to the system prompt, parses the assistant text, and populates RunResult.parsed. |
output_validation_retries | int | 1 | Extra turns spent recovering from a parse failure (the model is given the validation error as feedback). After exhausting them, raises OutputValidationError. Set to 0 to fail fast. |
response_tone (0.9.32+) | str | None | None | Per-call override of the agent’s tone. Preset name ("casual", "professional", "technical", "legal", "finance", "executive", "academic") or any free-form string. Resolution: per-call > Agent(tuning=Tuning(response_tone=)) default > workflow ambient > none. See response_tone. |
effort (0.9.36+) | str | None | None | Per-call override of the agent’s reasoning-effort dial. One of "minimal", "low", "medium", "high", "xhigh", "max". Resolution: per-call > Agent(effort=) default > provider default. See effort and the Reasoning effort cross-provider reference. |
Returns, RunResult with output, parsed, value, turns,
tokens_in, tokens_out, cost_usd, started_at, finished_at,
interrupted, interruption_reason, id, total_tokens,
duration.
| Field | Type | When populated |
|---|---|---|
output | str | Always. The raw (cleaned) assistant text. Useful for logging / audit even when a schema is also set. |
parsed | Any | None | Set when output_schema= validated. None otherwise. |
value (0.9.11+) | Any | Smart accessor: returns parsed when a schema validated, else the raw output string. Removes the “did the schema even fire?” footgun. Always “the answer” in the shape the caller expects. |
turns | int | Number of agent-loop iterations. |
tokens_in / tokens_out / total_tokens | int | Aggregate Usage. tokens_in is prompt tokens billed at the full rate (cache misses). |
cached_tokens_in (0.9.41+) | int | Prompt tokens served from the provider’s prompt cache. Zero when caching is off or the model doesn’t support it. See Prompt caching. |
cache_write_tokens (0.9.41+) | int | Prompt tokens written to cache on this run (Anthropic only). |
cost_usd | float | Estimated cost. Already reflects any cache discount. |
interrupted / interruption_reason | bool / str | None | Set when the run terminated early (budget, max_turns, cancellation). |
id | str | The run id (ULID). |
started_at / finished_at / duration | datetimes / timedelta | Wall-clock instrumentation. |
Example. Typed structured output with retry:
from pydantic import BaseModel
from loomflow import Agent
class Invoice(BaseModel):
amount_cents: int
vendor: str
issued_on: date
result = await agent.run(
"Extract the invoice fields from this email body: ...",
user_id="alice",
session_id="invoice_extract_2026_05_09",
output_schema=Invoice,
output_validation_retries=2,
)
invoice: Invoice = result.parsed # validated typed instance
# or use the smart accessor: result.value behaves the same hereExample, result.value smart accessor:
# Without a schema:
r = await agent.run("Hello")
r.value # → str (same as r.output)
# With a schema:
r = await agent.run("Extract...", output_schema=Invoice)
r.value # → Invoice (same as r.parsed)
# Code that handles both paths uniformly:
def show(answer):
print(f"Got: {answer}")
show(r.value) # works regardless of whether a schema firedresume
async def resume(
self,
session_id: str,
prompt: str,
*,
user_id: str | None = None,
metadata: Mapping[str, Any] | None = None,
context: RunContext | None = None,
extra_tools: list[Tool] | None = None,
emit: Callable[[Event], Awaitable[None]] | None = None,
output_schema: type[BaseModel] | None = None,
output_validation_retries: int = 1,
) -> RunResult: ...Resume a previously-interrupted run from its journal. Equivalent to
agent.run(prompt, session_id=session_id, ...). Exists as a separate
method so the intent is explicit at the call site.
Same kwargs as run() minus the standalone session_id (which is
positional here). With a durable Runtime (SqliteRuntime /
PostgresRuntime), already-completed steps replay from the journal
instead of re-executing.
agent = Agent("...", model="...", runtime=SqliteRuntime("./journal.db"))
# First attempt — interrupted by Ctrl-C / OOM / power outage.
result = await agent.run("complex task", session_id="task-2026-05-09")
# Later, after the process restarted:
result = await agent.resume("task-2026-05-09", "complex task")See Replay and resume.
stream
async def stream(
self,
prompt: str,
*,
user_id: str | None = None,
session_id: str | None = None,
metadata: Mapping[str, Any] | None = None,
context: RunContext | None = None,
extra_tools: list[Tool] | None = None,
output_schema: type[BaseModel] | None = None,
output_validation_retries: int = 1,
) -> AsyncIterator[Event]: ...Same loop as run(), exposed as an async generator of Events.
The loop runs as a background task. Events flow through a bounded memory stream so a slow consumer applies backpressure. Breaking out of the iteration cancels the producer cleanly. Even if a tool call is mid-flight, it’ll be cancelled within the cancel scope.
Event.kind values: STARTED, MODEL_CHUNK, TOOL_CALL,
TOOL_RESULT, BUDGET_WARNING, BUDGET_EXCEEDED, ERROR,
COMPLETED.
async for event in agent.stream("plan a Tokyo trip"):
if event.kind == "model_chunk":
chunk = event.payload["chunk"]
if chunk["kind"] == "text":
print(chunk["text"], end="", flush=True)
elif event.kind == "tool_call":
print(f"\n[calling {event.payload['call']['tool']}]")recall
async def recall(
self,
query: str,
*,
kind: str = "episodes",
limit: int = 5,
user_id: str | None = None,
) -> list[Any]: ...Convenience wrapper around agent.memory.recall(...). Returns the
top-limit items most similar to query, scoped by user_id.
| Parameter | Type | Default | Description |
|---|---|---|---|
query | str | required | Free-text query; embedded and matched against the store. |
kind | str | "episodes" | What to recall, "episodes" (chat history) or "facts" (when the memory exposes a fact store). |
limit | int | 5 | Max items returned. |
user_id | str | None | None | Partition scope. When None, returns from the anonymous bucket. |
recent = await agent.recall("payment refund flow", kind="episodes", limit=10, user_id="alice")consolidate
async def consolidate(self) -> int: ...Run a one-shot Consolidator pass over the most recent episodes for
the active user, extracting structured facts into the bi-temporal
store. Returns the count of new facts added.
When auto_extract=True (the default for real models) this runs
automatically after every agent.run(). Call manually when you’ve
batched many episodes through auto_extract=False and want to
process them in one go.
n = await agent.consolidate()
print(f"extracted {n} new facts")add_tool
def add_tool(self, item: Tool | Callable[..., object]) -> Tool: ...Register a tool after construction. Returns the resulting Tool
instance (after wrapping a bare callable with @tool). Raises
ConfigError if the host doesn’t support dynamic registration
(MCP, custom hosts). Wrap with InProcessToolHost first.
agent.add_tool(my_new_tool)remove_tool
def remove_tool(self, name: str) -> bool: ...Unregister a tool by name. Returns True if removed, False if
no such tool was registered. Same host-support contract as
add_tool.
removed = agent.remove_tool("delete_file")tools_list
def tools_list(self) -> list[str]: ...Return the names of all registered tools, in registration order.
before_tool
def before_tool(self, fn: PreToolHook) -> PreToolHook: ...Decorator that registers a pre-tool hook. The hook receives a
ToolCall; returning a PermissionDecision.deny_(...) short-circuits
the dispatch. Returning None falls through to the permissions
layer.
Multiple hooks fire in registration order; the first non-None
decision wins.
@agent.before_tool
async def review(call):
if call.tool == "send_email" and "@enemy.com" in str(call.args):
return PermissionDecision.deny_("blocked by reviewer")
return Noneafter_tool
def after_tool(self, fn: PostToolHook) -> PostToolHook: ...Decorator that registers a post-tool callback. The callback receives
the ToolCall and the resulting ToolResult (whether ok or error).
Best-effort. Exceptions raised in the callback are logged, not
propagated.
@agent.after_tool
async def log(call, result):
print(f"{call.tool} → ok={result.ok}, took={result.duration_ms}ms")Properties
| Property | Type | Description |
|---|---|---|
model | Model | The configured model adapter (the un-retry-wrapped instance, for introspection). |
memory | Memory | The configured memory backend (the un-auto-extract-wrapped instance). |
runtime | Runtime | The configured runtime. |
tool_host | ToolHost | The tool host the loop dispatches through. |
budget | Budget | The configured budget. |
permissions | Permissions | The configured permission policy. |
hooks | HookHost | The hook registry. |
These are the supported access path for introspection. Earlier
versions exposed _model / _memory / etc.; the public properties
land in 0.2+ and the underscored versions stay as the implementation
detail.
Special methods
__repr__
def __repr__(self) -> str: ...Returns a one-line summary for dev-time inspection:
>>> agent
Agent(model='claude-opus-4-7', memory=PostgresMemory, runtime=SqliteRuntime,
tools=InProcessToolHost, max_turns=50)Concurrency model
Agent is safe to share across concurrent calls. Each run() /
stream() constructs its own AgentSession; there’s no
cross-call state on the Agent instance except for hook
registrations and the add_tool / remove_tool mutations (which
are not protected by a lock. Call them from a single thread, not
mid-run from a tool).
Per-user state (memory partitions, budget buckets) lives on the
underlying primitives, not on the Agent.
Source
Style note. This page is the reference shape. Every public class will get one of these. For the conceptual model, see What is an Agent. For end-to-end usage, see Quickstart.