Quickstart
Everything below runs as written. Copy any block into a Python file or a notebook and it works. Start at the top. Each section builds on the previous one.
Setup
Bare install
pip install loomflowFor a local-only zero-key experience, you don’t need anything else.
The echo model selector exercises the full loop without an API key.
1. Hello, agent
No API keys, no infrastructure:
import asyncio
from loomflow import Agent
async def main():
agent = Agent("You are a helpful assistant.", model="echo")
result = await agent.run("Tell me a joke.")
print(result.output)
asyncio.run(main())model="echo" picks the EchoModel. It echoes the prompt back so
you can verify the loop runs without burning tokens.
model= is required. Forgetting it raises a ConfigError with a
list of suggested values; the harness no longer silently picks a
fake model.
result is a RunResult with output, turns, tokens_in,
tokens_out, cost_usd, started_at, finished_at, interrupted,
interruption_reason.
2. Real models
Strings dispatch by prefix:
from loomflow import Agent
agent = Agent("You are helpful.", model="claude-opus-4-7") # → AnthropicModel
agent = Agent("You are helpful.", model="gpt-4o") # → OpenAIModel
agent = Agent("You are helpful.", model="echo") # → EchoModelOr pass an instance for full control:
from loomflow.model.anthropic import AnthropicModel
agent = Agent(
"You are helpful.",
model=AnthropicModel(
"claude-opus-4-7",
api_key="...",
max_tokens=8192,
),
)Tone (0.9.32+)
Pass response_tone= to steer how the agent phrases its output
without rewriting instructions. Preset name or any free-form
string:
from loomflow import Agent, Tuning
agent = Agent("...", model="gpt-4.1-mini", tuning=Tuning(response_tone="legal"))
agent = Agent("...", tuning=Tuning(response_tone="warm but precise, like a doctor"))
# Per-call override beats the agent default
r = await agent.run("...", response_tone="casual")Shipped presets: casual, professional, technical, legal,
finance, executive, academic. Anything else passes through
verbatim. The preset map is convenience, not a gatekeeper.
Workflow(response_tone=...) propagates ambient tone to every
nested agent step that didn’t bring its own. See
Agent reference: response_tone
for the resolution order and the full preset effects.
3. Tools
The @tool decorator takes any Python callable. Sync or async, and
derives its JSON schema from type hints.
from loomflow import Agent, tool
@tool
async def get_weather(city: str) -> str:
"""Look up the current weather for a city."""
return f"Sunny, 72°F in {city}."
@tool(destructive=True)
def delete_file(path: str) -> str:
"""Delete a file. Marked destructive so default permissions ask first."""
import os
os.remove(path)
return f"deleted {path}"
agent = Agent(
"You are a productivity assistant.",
model="claude-opus-4-7",
tools=[get_weather, delete_file],
)Sync functions are dispatched to a worker thread via
anyio.to_thread.run_sync, so they never block the event loop. Tool
calls in the same model turn run in parallel through an
anyio.create_task_group.
4. Streaming events
agent.stream() yields events as they happen, with backpressure.
async for event in agent.stream("plan a Tokyo trip"):
if event.kind == "model_chunk":
chunk = event.payload["chunk"]
if chunk["kind"] == "text":
print(chunk["text"], end="", flush=True)
elif event.kind == "tool_call":
print(f"\n[calling {event.payload['call']['tool']}]")
elif event.kind == "tool_result":
print(f"[got result]")Events: STARTED, MODEL_CHUNK, TOOL_CALL, TOOL_RESULT,
BUDGET_WARNING, BUDGET_EXCEEDED, ERROR, COMPLETED. Workflow
runs additionally emit WORKFLOW_STARTED / WORKFLOW_STEP_STARTED
/ WORKFLOW_STEP_COMPLETED / WORKFLOW_STEP_FAILED /
WORKFLOW_COMPLETED.
5. Structured outputs
Pass any Pydantic BaseModel as output_schema= and the framework
constrains the model to produce valid JSON, parses it, and returns
the validated typed instance:
from pydantic import BaseModel
from loomflow import Agent
class WeatherReport(BaseModel):
city: str
temp_c: float
conditions: str
agent = Agent("Be precise.", model="gpt-4o")
result = await agent.run(
"Weather in Tokyo: sunny, 22°C. Extract.",
output_schema=WeatherReport,
)
report: WeatherReport = result.parsed
print(report.city, report.temp_c)Native provider support. On supported providers, the framework uses the provider’s native structured-output API to constrain the model at decode time:
- OpenAI,
response_format={"type": "json_schema", ..., "strict": True} - Anthropic. Synthetic
__output__tool with the schema asinput_schemaplustool_choiceforcing the model to call it. - LiteLLM. Passthrough where the underlying provider supports it.
Adapters without native support fall back to prompt augmentation + validate-with-retry. Either way the API is identical.
(0.9.30+) When the adapter has native support, the framework also skips appending the JSON-schema directive to the system prompt , ~64% input-token reduction on structured-output calls. The retry path still injects the schema for the rare invalid-JSON case, so the safety net stays on.
RunResult.value smart accessor. Returns parsed when a
schema validated, else the raw output string. Removes the
“did the schema even fire?” footgun:
r = await agent.run("...", output_schema=WeatherReport)
r.value # → WeatherReport (same as r.parsed)
r = await agent.run("Hello")
r.value # → str (same as r.output)Agent-bound default schema. Pass output_schema= once on
construction and every agent.run() applies it automatically:
extractor = Agent(
"Extract weather facts.",
model="gpt-4o",
output_schema=WeatherReport, # default for every run
)
result = await extractor.run("Weather in Berlin: 12°C, light rain.")Tagged-union schemas, output_schema=A | B lets a run return
one of multiple shapes. Validation tries each member in declaration
order:
class Found(BaseModel):
invoice_no: str
total_cents: int
class NotFound(BaseModel):
reason: str
agent = Agent("Look up.", model="gpt-4o", output_schema=Found | NotFound)
r = await agent.run("Pull invoice #INV-2026-0042")
match r.value:
case Found(invoice_no=no, total_cents=tot):
...
case NotFound(reason=why):
...For the full reference see Agent.
6. MCP servers
Plug an MCP server in directly:
from loomflow import Agent
from loomflow.mcp import MCPRegistry, MCPServerSpec
registry = MCPRegistry([
MCPServerSpec.stdio(
name="git",
command="uvx",
args=["mcp-server-git", "--repo", "/Users/me/code/myrepo"],
),
MCPServerSpec.http(
name="hosted",
url="https://example.com/mcp/",
headers={"Authorization": "Bearer ..."},
),
])
agent = Agent(
"You are a coding assistant.",
model="claude-opus-4-7",
tools=registry,
)Tool name conflicts across servers are auto-disambiguated:
git.commit and github.commit if both servers expose commit;
just commit if only one does. Either form is accepted at call time.
7. Workflow. The peer primitive
from loomflow import Agent, Workflow
billing = Agent("Handle billing.", model="claude-opus-4-7", tools=[...])
tech = Agent("Handle tech.", model="claude-opus-4-7", tools=[...])
async def classify(text: str) -> str:
return (await Agent(
"Reply 'billing' or 'tech'.", model="claude-haiku-4-5",
).run(text)).output
support = Workflow.route(classify, {"billing": billing, "tech": tech})
result = await support.run("My card was charged twice.", user_id="alice")
print(result.visited) # ['classify', 'route_billing']Workflow is a developer-controlled DAG. The peer of Agent (which
is LLM-controlled). They share one observability spine; pass an
Agent as a workflow node, or call wf.as_tool() to expose a
workflow inside an agent. See Workflow for the
three sugar constructors (chain / route / parallel), the
explicit graph builder with cycle support, and the
Agent vs Workflow decision rubric.
8. Memory: pick a backend
The simplest way is the memory= resolver. Pass a URL and the
framework picks the backend:
from loomflow import Agent
agent = Agent("...", memory="inmemory") # default; lost on restart
agent = Agent("...", memory="sqlite:./bot.db") # single-file, persistent
agent = Agent("...", memory="chroma") # ephemeral
agent = Agent("...", memory="chroma:./chroma-db") # persistent
agent = Agent("...", memory="postgres://user:pw@localhost/jeeves") # pgvector
agent = Agent("...", memory="redis://localhost:6379/0") # optional RediSearch HNSWWhat you get by default:
- Auto fact extraction. Every
agent.run()runs a small Consolidator pass that pulls structured(subject, predicate, object)claims from the conversation into the fact store. Default ON for OpenAI / Anthropic / LiteLLM models. - Auto-attached fact store. The resolver wires the bi-temporal
fact store automatically (pass
with_facts=Falsein the dict form to skip). - Auto-picked embedder,
OpenAIEmbedder("text-embedding-3-small")ifOPENAI_API_KEYis set,HashEmbedder()otherwise. user_idpartition. Every backend honours the multi-tenant contract.- Lazy connect for async backends. Postgres / Redis URLs return a
LazyMemoryproxy; the connection opens on the firstagent.run.
For non-default tweaks, use the dict form:
agent = Agent("...", memory={
"backend": "chroma",
"path": "./chroma-db",
"namespace": "tenant_a",
"embedder": "openai-large",
"with_facts": True,
})For full control, pass an explicit instance:
from loomflow.memory.chroma import ChromaMemory
from loomflow.memory.embedder import OpenAIEmbedder
memory = ChromaMemory.local(
"./chroma-db", with_facts=True, embedder=OpenAIEmbedder()
)
agent = Agent("...", memory=memory)9. Auto fact extraction
Every agent.run() against a real model auto-extracts structured
(subject, predicate, object) facts from the conversation into the
bi-temporal fact store, partitioned by user_id. No Consolidator
construction; no manual consolidate() call.
from loomflow import Agent
agent = Agent(
"You are a personal assistant.",
model="claude-opus-4-7",
memory="sqlite:./bot.db",
)
await agent.run("Hi, I'm Alice and I live in Tokyo.", user_id="alice")
# Fact(user_id="alice", subject="alice", predicate="lives_in",
# object="Tokyo") is now in memory.facts.
profile = await agent.memory.profile(user_id="alice")
print(profile.fact_count) # > 0
print(profile.sample_facts) # includes the lives_in fact
# Days later, fresh process, same db:
result = await agent.run("Where do I live?", user_id="alice")
# → "Tokyo" — the fact gets recalled into the seed messages.Defaults: ON for OpenAIModel / AnthropicModel / LiteLLMModel;
OFF for ScriptedModel / EchoModel / unrecognised custom Models.
Override with Agent(..., auto_extract=True/False).
Facts use bi-temporal validity: when a new claim contradicts an
existing one (same subject + predicate, different object), the old
fact’s valid_until is set to the new fact’s valid_from.
Historical facts aren’t deleted, just closed off.
from datetime import datetime, UTC
facts_at_jan_2026 = await agent.memory.facts.query(
user_id="alice",
subject="alice",
valid_at=datetime(2026, 1, 1, tzinfo=UTC),
)You can also write facts manually via agent.memory.facts.append(Fact(...)).
10. Durable replay
from loomflow import Agent
from loomflow.runtime import SqliteRuntime
agent = Agent(
"...",
model="claude-opus-4-7",
runtime=SqliteRuntime("./journal.db"),
)The runtime journals every model call and tool dispatch by
(session_id, step_name). On a fresh SqliteRuntime against the same
DB file, replaying the same session returns cached results without
re-executing anything.
To resume an interrupted run explicitly:
# First run — interrupted by Ctrl-C / OOM / power outage:
result = await agent.run("complex task", session_id="my-task-2026-05-01")
# Later, after the process restarted — same session_id picks up
# where the journal left off.
result = await agent.resume("my-task-2026-05-01", "complex task")resume(session_id, prompt) is sugar for run(prompt, session_id=session_id).
11. Telemetry
from loomflow import Agent
from loomflow.observability import ConsoleTelemetry
agent = Agent("...", telemetry=ConsoleTelemetry())
await agent.run("hi")
# Span lines print to stderr live, with nested-trace indentation.Four built-in sinks ship with the framework. No collector required:
InMemoryTelemetry (lists for tests), ConsoleTelemetry (live in
stderr), FileTelemetry (JSONL on disk, jq-queryable),
MultiTelemetry (fan-out). Swap to OTelTelemetry for Honeycomb /
Datadog / OTLP without touching agent code.
Spans emitted: loom.run, loom.turn, loom.model.stream,
loom.tool, loom.workflow.step. Metrics: loom.tokens.input/output,
loom.cost.usd, loom.tool.duration_ms,
loom.session.duration_ms, loom.budget.exceeded.
See Telemetry for the full sink matrix.
12. Audit log
from loomflow import Agent
from loomflow.security import FileAuditLog
audit = FileAuditLog("./audit.jsonl", secret="prod-secret")
agent = Agent("...", audit_log=audit)
await agent.run("anything")
# audit.jsonl now has run_started + tool_call + tool_result +
# run_completed entries, each HMAC-signed.
entries = await audit.query(session_id="sess_...")13. Permissions + hooks
from loomflow import Agent, Mode, StandardPermissions
agent = Agent(
"...",
permissions=StandardPermissions(
mode=Mode.DEFAULT,
denied_tools=["delete_file", "format_disk"],
),
)
@agent.before_tool
async def review(call):
if call.tool == "send_email" and "@enemy.com" in str(call.args):
from loomflow.core.types import PermissionDecision
return PermissionDecision.deny_("blocked by reviewer")
return None # allow
@agent.after_tool
async def log(call, result):
print(f"{call.tool} → ok={result.ok}")For destructive tools (@tool(destructive=True)) the default
permissions policy returns Decision.ask_(...). Wire an approval
handler to route the decision to a human:
async def approve(call, user_id: str | None) -> bool:
return await my_slack_app.request_approval(call.tool, user_id)
agent = Agent(
"...",
permissions=StandardPermissions(mode=Mode.DEFAULT),
approval_handler=approve,
)Without an approval handler, ask falls back to deny so the
agent never silently bypasses the gate. See Approval handlers.
14. Sandbox (filesystem)
from loomflow import Agent, tool
from loomflow.security import FilesystemSandbox
from loomflow.tools import InProcessToolHost
@tool
def read_file(path: str) -> str:
"""Read file contents."""
return open(path).read()
host = InProcessToolHost([read_file])
sandbox = FilesystemSandbox(host, roots=["/Users/me/safe-workspace"])
agent = Agent("...", tools=sandbox)
# Now any path arg outside ~/safe-workspace is denied (symlinks resolved).15. Budget
from datetime import timedelta
from loomflow import Agent
from loomflow.governance.budget import BudgetConfig, StandardBudget
agent = Agent(
"...",
budget=StandardBudget(BudgetConfig(
max_tokens=200_000,
max_cost_usd=5.0,
max_wall_clock=timedelta(minutes=10),
soft_warning_at=0.8,
)),
)When the budget is exceeded, the run terminates cleanly with
result.interrupted = True and
interruption_reason = "budget:max_tokens".
Putting it all together
import asyncio
from datetime import timedelta
from loomflow import Agent, Mode, StandardPermissions, tool
from loomflow.observability import OTelTelemetry
from loomflow.runtime import SqliteRuntime
from loomflow.security import FileAuditLog
from loomflow.governance.budget import BudgetConfig, StandardBudget
from loomflow.mcp import MCPRegistry, MCPServerSpec
@tool
async def web_search(query: str) -> str:
"""Search the web."""
...
mcp_servers = MCPRegistry([
MCPServerSpec.stdio("git", "uvx", ["mcp-server-git", "--repo", "."]),
MCPServerSpec.stdio("fs", "uvx", ["mcp-server-filesystem", "--root", "."]),
])
async def main():
agent = Agent(
"You are a research assistant. Cite your sources.",
model="claude-opus-4-7",
memory="postgres://user:pw@db.internal/loom",
runtime=SqliteRuntime("./journal.db"),
tools=[web_search, *mcp_servers.list_tools_sync()],
permissions=StandardPermissions(mode=Mode.DEFAULT),
budget=StandardBudget(BudgetConfig(
max_tokens=200_000,
max_cost_usd=5.0,
max_wall_clock=timedelta(minutes=10),
)),
audit_log=FileAuditLog("./audit.jsonl", secret="prod-secret"),
telemetry=OTelTelemetry(),
auto_extract=True,
)
async for event in agent.stream(
"research recent advances in agent harnesses",
user_id="user_42",
session_id="research_2026_05_08",
):
print(f"[{event.kind}]", event.payload.get("chunk", {}).get("text", ""), end="")
asyncio.run(main())A production-shaped agent in ~25 lines. Memory persists facts across
runs (auto-extracted from each conversation), the runtime can recover
from crashes, every step lands in the audit log, every span shows up
in your OTel exporter, and the budget enforces hard limits.
Multi-tenancy is built in via the user_id kwarg.
Next:
- Recipes. Copy-paste patterns for common shapes.
- Production hardening. Multi-tenancy, secrets, bounded state.
- Architecture. How the loop works under the hood.