Skip to Content
DocsQuickstart

Quickstart

Everything below runs as written. Copy any block into a Python file or a notebook and it works. Start at the top. Each section builds on the previous one.

Setup

pip install loomflow

For a local-only zero-key experience, you don’t need anything else. The echo model selector exercises the full loop without an API key.

1. Hello, agent

No API keys, no infrastructure:

import asyncio from loomflow import Agent async def main(): agent = Agent("You are a helpful assistant.", model="echo") result = await agent.run("Tell me a joke.") print(result.output) asyncio.run(main())

model="echo" picks the EchoModel. It echoes the prompt back so you can verify the loop runs without burning tokens.

model= is required. Forgetting it raises a ConfigError with a list of suggested values; the harness no longer silently picks a fake model.

result is a RunResult with output, turns, tokens_in, tokens_out, cost_usd, started_at, finished_at, interrupted, interruption_reason.

2. Real models

Strings dispatch by prefix:

from loomflow import Agent agent = Agent("You are helpful.", model="claude-opus-4-7") # → AnthropicModel agent = Agent("You are helpful.", model="gpt-4o") # → OpenAIModel agent = Agent("You are helpful.", model="echo") # → EchoModel

Or pass an instance for full control:

from loomflow.model.anthropic import AnthropicModel agent = Agent( "You are helpful.", model=AnthropicModel( "claude-opus-4-7", api_key="...", max_tokens=8192, ), )

Tone (0.9.32+)

Pass response_tone= to steer how the agent phrases its output without rewriting instructions. Preset name or any free-form string:

from loomflow import Agent, Tuning agent = Agent("...", model="gpt-4.1-mini", tuning=Tuning(response_tone="legal")) agent = Agent("...", tuning=Tuning(response_tone="warm but precise, like a doctor")) # Per-call override beats the agent default r = await agent.run("...", response_tone="casual")

Shipped presets: casual, professional, technical, legal, finance, executive, academic. Anything else passes through verbatim. The preset map is convenience, not a gatekeeper.

Workflow(response_tone=...) propagates ambient tone to every nested agent step that didn’t bring its own. See Agent reference: response_tone for the resolution order and the full preset effects.

3. Tools

The @tool decorator takes any Python callable. Sync or async, and derives its JSON schema from type hints.

from loomflow import Agent, tool @tool async def get_weather(city: str) -> str: """Look up the current weather for a city.""" return f"Sunny, 72°F in {city}." @tool(destructive=True) def delete_file(path: str) -> str: """Delete a file. Marked destructive so default permissions ask first.""" import os os.remove(path) return f"deleted {path}" agent = Agent( "You are a productivity assistant.", model="claude-opus-4-7", tools=[get_weather, delete_file], )

Sync functions are dispatched to a worker thread via anyio.to_thread.run_sync, so they never block the event loop. Tool calls in the same model turn run in parallel through an anyio.create_task_group.

4. Streaming events

agent.stream() yields events as they happen, with backpressure.

async for event in agent.stream("plan a Tokyo trip"): if event.kind == "model_chunk": chunk = event.payload["chunk"] if chunk["kind"] == "text": print(chunk["text"], end="", flush=True) elif event.kind == "tool_call": print(f"\n[calling {event.payload['call']['tool']}]") elif event.kind == "tool_result": print(f"[got result]")

Events: STARTED, MODEL_CHUNK, TOOL_CALL, TOOL_RESULT, BUDGET_WARNING, BUDGET_EXCEEDED, ERROR, COMPLETED. Workflow runs additionally emit WORKFLOW_STARTED / WORKFLOW_STEP_STARTED / WORKFLOW_STEP_COMPLETED / WORKFLOW_STEP_FAILED / WORKFLOW_COMPLETED.

5. Structured outputs

Pass any Pydantic BaseModel as output_schema= and the framework constrains the model to produce valid JSON, parses it, and returns the validated typed instance:

from pydantic import BaseModel from loomflow import Agent class WeatherReport(BaseModel): city: str temp_c: float conditions: str agent = Agent("Be precise.", model="gpt-4o") result = await agent.run( "Weather in Tokyo: sunny, 22°C. Extract.", output_schema=WeatherReport, ) report: WeatherReport = result.parsed print(report.city, report.temp_c)

Native provider support. On supported providers, the framework uses the provider’s native structured-output API to constrain the model at decode time:

  • OpenAI, response_format={"type": "json_schema", ..., "strict": True}
  • Anthropic. Synthetic __output__ tool with the schema as input_schema plus tool_choice forcing the model to call it.
  • LiteLLM. Passthrough where the underlying provider supports it.

Adapters without native support fall back to prompt augmentation + validate-with-retry. Either way the API is identical.

(0.9.30+) When the adapter has native support, the framework also skips appending the JSON-schema directive to the system prompt , ~64% input-token reduction on structured-output calls. The retry path still injects the schema for the rare invalid-JSON case, so the safety net stays on.

RunResult.value smart accessor. Returns parsed when a schema validated, else the raw output string. Removes the “did the schema even fire?” footgun:

r = await agent.run("...", output_schema=WeatherReport) r.value # → WeatherReport (same as r.parsed) r = await agent.run("Hello") r.value # → str (same as r.output)

Agent-bound default schema. Pass output_schema= once on construction and every agent.run() applies it automatically:

extractor = Agent( "Extract weather facts.", model="gpt-4o", output_schema=WeatherReport, # default for every run ) result = await extractor.run("Weather in Berlin: 12°C, light rain.")

Tagged-union schemas, output_schema=A | B lets a run return one of multiple shapes. Validation tries each member in declaration order:

class Found(BaseModel): invoice_no: str total_cents: int class NotFound(BaseModel): reason: str agent = Agent("Look up.", model="gpt-4o", output_schema=Found | NotFound) r = await agent.run("Pull invoice #INV-2026-0042") match r.value: case Found(invoice_no=no, total_cents=tot): ... case NotFound(reason=why): ...

For the full reference see Agent.

6. MCP servers

Plug an MCP server in directly:

from loomflow import Agent from loomflow.mcp import MCPRegistry, MCPServerSpec registry = MCPRegistry([ MCPServerSpec.stdio( name="git", command="uvx", args=["mcp-server-git", "--repo", "/Users/me/code/myrepo"], ), MCPServerSpec.http( name="hosted", url="https://example.com/mcp/", headers={"Authorization": "Bearer ..."}, ), ]) agent = Agent( "You are a coding assistant.", model="claude-opus-4-7", tools=registry, )

Tool name conflicts across servers are auto-disambiguated: git.commit and github.commit if both servers expose commit; just commit if only one does. Either form is accepted at call time.

7. Workflow. The peer primitive

from loomflow import Agent, Workflow billing = Agent("Handle billing.", model="claude-opus-4-7", tools=[...]) tech = Agent("Handle tech.", model="claude-opus-4-7", tools=[...]) async def classify(text: str) -> str: return (await Agent( "Reply 'billing' or 'tech'.", model="claude-haiku-4-5", ).run(text)).output support = Workflow.route(classify, {"billing": billing, "tech": tech}) result = await support.run("My card was charged twice.", user_id="alice") print(result.visited) # ['classify', 'route_billing']

Workflow is a developer-controlled DAG. The peer of Agent (which is LLM-controlled). They share one observability spine; pass an Agent as a workflow node, or call wf.as_tool() to expose a workflow inside an agent. See Workflow for the three sugar constructors (chain / route / parallel), the explicit graph builder with cycle support, and the Agent vs Workflow decision rubric.

8. Memory: pick a backend

The simplest way is the memory= resolver. Pass a URL and the framework picks the backend:

from loomflow import Agent agent = Agent("...", memory="inmemory") # default; lost on restart agent = Agent("...", memory="sqlite:./bot.db") # single-file, persistent agent = Agent("...", memory="chroma") # ephemeral agent = Agent("...", memory="chroma:./chroma-db") # persistent agent = Agent("...", memory="postgres://user:pw@localhost/jeeves") # pgvector agent = Agent("...", memory="redis://localhost:6379/0") # optional RediSearch HNSW

What you get by default:

  • Auto fact extraction. Every agent.run() runs a small Consolidator pass that pulls structured (subject, predicate, object) claims from the conversation into the fact store. Default ON for OpenAI / Anthropic / LiteLLM models.
  • Auto-attached fact store. The resolver wires the bi-temporal fact store automatically (pass with_facts=False in the dict form to skip).
  • Auto-picked embedder, OpenAIEmbedder("text-embedding-3-small") if OPENAI_API_KEY is set, HashEmbedder() otherwise.
  • user_id partition. Every backend honours the multi-tenant contract.
  • Lazy connect for async backends. Postgres / Redis URLs return a LazyMemory proxy; the connection opens on the first agent.run.

For non-default tweaks, use the dict form:

agent = Agent("...", memory={ "backend": "chroma", "path": "./chroma-db", "namespace": "tenant_a", "embedder": "openai-large", "with_facts": True, })

For full control, pass an explicit instance:

from loomflow.memory.chroma import ChromaMemory from loomflow.memory.embedder import OpenAIEmbedder memory = ChromaMemory.local( "./chroma-db", with_facts=True, embedder=OpenAIEmbedder() ) agent = Agent("...", memory=memory)

9. Auto fact extraction

Every agent.run() against a real model auto-extracts structured (subject, predicate, object) facts from the conversation into the bi-temporal fact store, partitioned by user_id. No Consolidator construction; no manual consolidate() call.

from loomflow import Agent agent = Agent( "You are a personal assistant.", model="claude-opus-4-7", memory="sqlite:./bot.db", ) await agent.run("Hi, I'm Alice and I live in Tokyo.", user_id="alice") # Fact(user_id="alice", subject="alice", predicate="lives_in", # object="Tokyo") is now in memory.facts. profile = await agent.memory.profile(user_id="alice") print(profile.fact_count) # > 0 print(profile.sample_facts) # includes the lives_in fact # Days later, fresh process, same db: result = await agent.run("Where do I live?", user_id="alice") # → "Tokyo" — the fact gets recalled into the seed messages.

Defaults: ON for OpenAIModel / AnthropicModel / LiteLLMModel; OFF for ScriptedModel / EchoModel / unrecognised custom Models. Override with Agent(..., auto_extract=True/False).

Facts use bi-temporal validity: when a new claim contradicts an existing one (same subject + predicate, different object), the old fact’s valid_until is set to the new fact’s valid_from. Historical facts aren’t deleted, just closed off.

from datetime import datetime, UTC facts_at_jan_2026 = await agent.memory.facts.query( user_id="alice", subject="alice", valid_at=datetime(2026, 1, 1, tzinfo=UTC), )

You can also write facts manually via agent.memory.facts.append(Fact(...)).

10. Durable replay

from loomflow import Agent from loomflow.runtime import SqliteRuntime agent = Agent( "...", model="claude-opus-4-7", runtime=SqliteRuntime("./journal.db"), )

The runtime journals every model call and tool dispatch by (session_id, step_name). On a fresh SqliteRuntime against the same DB file, replaying the same session returns cached results without re-executing anything.

To resume an interrupted run explicitly:

# First run — interrupted by Ctrl-C / OOM / power outage: result = await agent.run("complex task", session_id="my-task-2026-05-01") # Later, after the process restarted — same session_id picks up # where the journal left off. result = await agent.resume("my-task-2026-05-01", "complex task")

resume(session_id, prompt) is sugar for run(prompt, session_id=session_id).

11. Telemetry

from loomflow import Agent from loomflow.observability import ConsoleTelemetry agent = Agent("...", telemetry=ConsoleTelemetry()) await agent.run("hi") # Span lines print to stderr live, with nested-trace indentation.

Four built-in sinks ship with the framework. No collector required: InMemoryTelemetry (lists for tests), ConsoleTelemetry (live in stderr), FileTelemetry (JSONL on disk, jq-queryable), MultiTelemetry (fan-out). Swap to OTelTelemetry for Honeycomb / Datadog / OTLP without touching agent code.

Spans emitted: loom.run, loom.turn, loom.model.stream, loom.tool, loom.workflow.step. Metrics: loom.tokens.input/output, loom.cost.usd, loom.tool.duration_ms, loom.session.duration_ms, loom.budget.exceeded.

See Telemetry for the full sink matrix.

12. Audit log

from loomflow import Agent from loomflow.security import FileAuditLog audit = FileAuditLog("./audit.jsonl", secret="prod-secret") agent = Agent("...", audit_log=audit) await agent.run("anything") # audit.jsonl now has run_started + tool_call + tool_result + # run_completed entries, each HMAC-signed. entries = await audit.query(session_id="sess_...")

13. Permissions + hooks

from loomflow import Agent, Mode, StandardPermissions agent = Agent( "...", permissions=StandardPermissions( mode=Mode.DEFAULT, denied_tools=["delete_file", "format_disk"], ), ) @agent.before_tool async def review(call): if call.tool == "send_email" and "@enemy.com" in str(call.args): from loomflow.core.types import PermissionDecision return PermissionDecision.deny_("blocked by reviewer") return None # allow @agent.after_tool async def log(call, result): print(f"{call.tool} → ok={result.ok}")

For destructive tools (@tool(destructive=True)) the default permissions policy returns Decision.ask_(...). Wire an approval handler to route the decision to a human:

async def approve(call, user_id: str | None) -> bool: return await my_slack_app.request_approval(call.tool, user_id) agent = Agent( "...", permissions=StandardPermissions(mode=Mode.DEFAULT), approval_handler=approve, )

Without an approval handler, ask falls back to deny so the agent never silently bypasses the gate. See Approval handlers.

14. Sandbox (filesystem)

from loomflow import Agent, tool from loomflow.security import FilesystemSandbox from loomflow.tools import InProcessToolHost @tool def read_file(path: str) -> str: """Read file contents.""" return open(path).read() host = InProcessToolHost([read_file]) sandbox = FilesystemSandbox(host, roots=["/Users/me/safe-workspace"]) agent = Agent("...", tools=sandbox) # Now any path arg outside ~/safe-workspace is denied (symlinks resolved).

15. Budget

from datetime import timedelta from loomflow import Agent from loomflow.governance.budget import BudgetConfig, StandardBudget agent = Agent( "...", budget=StandardBudget(BudgetConfig( max_tokens=200_000, max_cost_usd=5.0, max_wall_clock=timedelta(minutes=10), soft_warning_at=0.8, )), )

When the budget is exceeded, the run terminates cleanly with result.interrupted = True and interruption_reason = "budget:max_tokens".


Putting it all together

import asyncio from datetime import timedelta from loomflow import Agent, Mode, StandardPermissions, tool from loomflow.observability import OTelTelemetry from loomflow.runtime import SqliteRuntime from loomflow.security import FileAuditLog from loomflow.governance.budget import BudgetConfig, StandardBudget from loomflow.mcp import MCPRegistry, MCPServerSpec @tool async def web_search(query: str) -> str: """Search the web.""" ... mcp_servers = MCPRegistry([ MCPServerSpec.stdio("git", "uvx", ["mcp-server-git", "--repo", "."]), MCPServerSpec.stdio("fs", "uvx", ["mcp-server-filesystem", "--root", "."]), ]) async def main(): agent = Agent( "You are a research assistant. Cite your sources.", model="claude-opus-4-7", memory="postgres://user:pw@db.internal/loom", runtime=SqliteRuntime("./journal.db"), tools=[web_search, *mcp_servers.list_tools_sync()], permissions=StandardPermissions(mode=Mode.DEFAULT), budget=StandardBudget(BudgetConfig( max_tokens=200_000, max_cost_usd=5.0, max_wall_clock=timedelta(minutes=10), )), audit_log=FileAuditLog("./audit.jsonl", secret="prod-secret"), telemetry=OTelTelemetry(), auto_extract=True, ) async for event in agent.stream( "research recent advances in agent harnesses", user_id="user_42", session_id="research_2026_05_08", ): print(f"[{event.kind}]", event.payload.get("chunk", {}).get("text", ""), end="") asyncio.run(main())

A production-shaped agent in ~25 lines. Memory persists facts across runs (auto-extracted from each conversation), the runtime can recover from crashes, every step lands in the audit log, every span shows up in your OTel exporter, and the budget enforces hard limits. Multi-tenancy is built in via the user_id kwarg.

Next:

Last updated on