Telemetry
Every Loom primitive emits typed spans and metrics through the
Telemetry protocol. Four built-in sinks let you see those events
without deploying a collector; OTelTelemetry ships them to any
OpenTelemetry-compatible backend when you’re ready for production.
from loomflow import Agent
from loomflow.observability import ConsoleTelemetry
agent = Agent(
"...",
model="claude-opus-4-7",
telemetry=ConsoleTelemetry(),
)Run the agent and span lines print to stderr as they happen. No collector, no SDK, no extras to install.
The four built-in sinks
| Sink | What it does | Best for |
|---|---|---|
InMemoryTelemetry | Accumulates spans + metrics in lists. Inspect via .spans() / .metrics(). | Unit tests, exploration |
ConsoleTelemetry | Prints each span + metric to a stream (sys.stderr by default) with nested-trace indentation. | ”Tail my agent in dev” |
FileTelemetry | Append-only JSONL on disk. One JSON record per span / metric. Parseable by jq, Splunk, Datadog log pipelines. | Long-running dev / staging, post-mortems |
MultiTelemetry | Fan-out. Forwards every span + metric to every sink in declaration order. Trace IDs are shared. | Watch live AND inspect after |
from loomflow.observability import (
InMemoryTelemetry,
ConsoleTelemetry,
FileTelemetry,
MultiTelemetry,
)All four share the same Telemetry protocol as OTelTelemetry, so
swapping sinks never touches agent code.
Spans
| Span | Opens | Attributes |
|---|---|---|
loom.run | agent.run() | session_id, user_id, architecture |
loom.turn | each iteration of the loop | turn, session_id |
loom.model.stream | every model call | model, tokens_in, tokens_out, retries |
loom.tool | every tool dispatch | tool, ok, duration_ms |
loom.workflow.step | every node in a Workflow | step, node, pattern |
For multi-agent architectures, additional spans nest naturally:
loom.run
└── loom.turn (turn=0)
└── loom.model.stream
└── loom.turn (turn=1)
├── loom.tool (tool=delegate, worker=researcher)
│ └── loom.run (sub-agent)
│ └── loom.turn ... (recursive)
└── loom.tool (tool=delegate, worker=writer)
└── loom.runThe nested loom.run spans link sub-agent traces to their parent
supervisor naturally, parent_span_id linkage is preserved across
every sink, including FileTelemetry’s JSONL output.
Metrics
| Metric | Type | Tags |
|---|---|---|
loom.tokens.input | counter | model, user_id |
loom.tokens.output | counter | model, user_id |
loom.cost.usd | counter | model, user_id |
loom.tool.duration_ms | histogram | tool, ok, user_id |
loom.session.duration_ms | histogram | architecture, user_id |
loom.budget.exceeded | counter | kind (max_tokens / max_cost_usd / max_wall_clock) |
loom.auto_extract.duration_ms | histogram | user_id, status |
loom.auto_extract.invocations | counter | user_id, status |
user_id is added as a tag whenever a RunContext is active. Every
metric is automatically attributed without manual plumbing.
Histogram-vs-counter dispatch is automatic by metric name suffix:
_ms / _seconds / _bytes → histogram; everything else →
counter. One emit_metric() API, the right instrument under the
hood. And the in-memory / console / file sinks tag each captured
metric with the same instrument_kind so assertions match what OTLP
would have emitted.
NoTelemetry. The default
from loomflow import Agent, NoTelemetry
agent = Agent("...", model="...", telemetry=NoTelemetry()) # explicit
agent = Agent("...", model="...") # implicit (same)NoTelemetry makes every span / metric call a no-op. The agent loop
detects this at construction and skips the async with telemetry.trace(...)
context managers entirely on the hot path.
Worked example. Inspect spans + metrics in-process
The simplest way to see what an agent emits is InMemoryTelemetry.
No collector, no SDK, no log file. Just the captured records as
plain dataclasses:
import asyncio
from loomflow import Agent, ScriptedModel, ScriptedTurn, ToolCall, tool
from loomflow.observability import InMemoryTelemetry
@tool
async def add(a: int, b: int) -> int:
"""Add two integers."""
return a + b
async def main():
telemetry = InMemoryTelemetry()
# ScriptedModel keeps the run deterministic — no network call
model = ScriptedModel([
ScriptedTurn(tool_calls=[
ToolCall(id="c1", tool="add", args={"a": 2, "b": 3})
]),
ScriptedTurn(text="The sum is 5."),
])
agent = Agent(
"Arithmetic assistant.",
model=model,
tools=[add],
telemetry=telemetry,
)
await agent.run("What is 2 + 3?", user_id="alice")
# Spans carry timing + parent-span linkage
for s in telemetry.spans():
attrs = ", ".join(f"{k}={v}" for k, v in s.attributes.items())
print(f"{s.name:<22} ({s.duration_ms:5.1f}ms) [{attrs}]")
# Metrics carry their auto-detected instrument kind
for m in telemetry.metrics():
print(f"{m.name:<28} {m.value:<8} ({m.instrument_kind})")
asyncio.run(main())Output looks like:
loom.run ( 4.2ms) [session_id=run_..., user_id=alice]
loom.turn ( 2.1ms) [turn=0, session_id=run_...]
loom.model.stream ( 1.8ms) [model=ScriptedModel, ...]
loom.tool ( 0.2ms) [tool=add, ok=true]
loom.turn ( 0.8ms) [turn=1, session_id=run_...]
loom.model.stream ( 0.7ms) [model=ScriptedModel, ...]
loom.tokens.input 42 (counter)
loom.tokens.output 12 (counter)
loom.cost.usd 0.00012 (counter)
loom.session.duration_ms 4.2 (histogram)CapturedSpan carries name, trace_id, span_id,
parent_span_id, started_at, ended_at, duration_ms,
attributes, and an exception repr if the body raised.
CapturedMetric carries name, value, instrument_kind,
attributes, and emitted_at. Both are frozen dataclasses. Assert
on them directly in tests.
Tail in stderr with ConsoleTelemetry
Swap the sink to see spans appear live as the agent runs, with nested-trace indentation:
import sys
from loomflow.observability import ConsoleTelemetry
agent = Agent(
"...",
model="...",
telemetry=ConsoleTelemetry(stream=sys.stderr, show_metrics=False),
)• loom.run (4.2ms) session_id=run_xyz, user_id=alice
• loom.turn (2.1ms) turn=0, session_id=run_xyz
• loom.model.stream (1.8ms) model=ScriptedModel
• loom.tool (0.2ms) tool=add, ok=true
• loom.turn (0.8ms) turn=1
• loom.model.stream (0.7ms)Indentation tracks the active parent span automatically. Even when
spans open inside anyio.create_task_group() (the framework’s
parallel-tool-dispatch path).
Write JSONL with FileTelemetry
from loomflow.observability import FileTelemetry
agent = Agent(
"...",
model="...",
telemetry=FileTelemetry("./traces.jsonl"),
)Each line is a structured JSON record with "kind": "span" or
"kind": "metric" discriminator. Span records carry the
parent_span_id linkage needed to reconstruct the trace tree
offline. Writes go through anyio.to_thread.run_sync so the event
loop never blocks on disk I/O; an internal lock serialises
concurrent writes from parallel tool dispatches.
Query offline with jq:
# Spans that took longer than 1s
jq -c 'select(.kind=="span" and .duration_ms > 1000)' traces.jsonl
# One user's session
jq -c 'select(.attributes.user_id=="alice")' traces.jsonl
# All cost metrics
jq -c 'select(.kind=="metric" and .name=="loom.cost.usd")' traces.jsonlNo rotation built in. Use logrotate / journald / your platform’s
log management to cap file size. The framework deliberately stays
out of that policy decision.
FileTelemetry ≠ FileAuditLog. They capture different things: audit log = business events for compliance (“did Alice’s refund go through?”); telemetry = performance / diagnostic spans (“why was this run slow?”). Run both together in production.
Fan out with MultiTelemetry
Watch live AND assert in tests by stacking sinks:
from loomflow.observability import (
ConsoleTelemetry, InMemoryTelemetry, MultiTelemetry,
)
in_mem = InMemoryTelemetry()
telemetry = MultiTelemetry([ConsoleTelemetry(), in_mem])
agent = Agent(..., telemetry=telemetry)
await agent.run("...")
# Watched live in stderr — and still assertable
assert any(s.name == "loom.tool" for s in in_mem.spans())Span IDs are minted by the first sink and shared with the others, so
trace hierarchy stays consistent across every capture. Exceptions
inside one sink’s trace propagate after every other sink has had a
chance to record its cleanup (finally blocks fire even on
exceptional exit thanks to AsyncExitStack).
The framework ships examples/13_telemetry.py
running all four sinks end-to-end against a ScriptedModel. No API
key required.
Production: OTelTelemetry
For Honeycomb / Datadog / Grafana Tempo / Jaeger / any OTLP-compatible
backend, wire the OpenTelemetry SDK and pass it to OTelTelemetry:
from loomflow.observability import OTelTelemetry
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint="https://otlp.your-vendor"))
)
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint="https://otlp.your-vendor")
)
meter_provider = MeterProvider(metric_readers=[metric_reader])
telemetry = OTelTelemetry(
tracer_provider=tracer_provider,
meter_provider=meter_provider,
)
agent = Agent("...", model="...", telemetry=telemetry)Install the OTel extras:
pip install 'loomflow[otel]'The adapter is the same Telemetry protocol. Swapping sinks never
touches agent code.
Custom telemetry
Any class with two methods satisfies the Telemetry protocol:
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
class MyTelemetry:
@asynccontextmanager
async def trace(self, name: str, **attrs):
# Span open
try:
yield
finally:
# Span close
...
async def emit_metric(self, name: str, value: float, **attrs):
...Useful when you have a custom metric backend not covered by OTel
(e.g. a homegrown StatsD or Prometheus pushgateway). Wrap it in
MultiTelemetry alongside one of the built-in sinks to keep the
live-tail / file-on-disk affordances while still sending to your
backend.
Telemetry vs the audit log. Spans are for operational observability. Latency, error rates, dependencies. The audit log is for compliance observability. Who did what, with HMAC signatures. Different lifecycles, different retention, different consumers. Wire both; they don’t overlap.