Skip to Content
DocsObservabilityTelemetry

Telemetry

Every Loom primitive emits typed spans and metrics through the Telemetry protocol. Four built-in sinks let you see those events without deploying a collector; OTelTelemetry ships them to any OpenTelemetry-compatible backend when you’re ready for production.

from loomflow import Agent from loomflow.observability import ConsoleTelemetry agent = Agent( "...", model="claude-opus-4-7", telemetry=ConsoleTelemetry(), )

Run the agent and span lines print to stderr as they happen. No collector, no SDK, no extras to install.

The four built-in sinks

SinkWhat it doesBest for
InMemoryTelemetryAccumulates spans + metrics in lists. Inspect via .spans() / .metrics().Unit tests, exploration
ConsoleTelemetryPrints each span + metric to a stream (sys.stderr by default) with nested-trace indentation.”Tail my agent in dev”
FileTelemetryAppend-only JSONL on disk. One JSON record per span / metric. Parseable by jq, Splunk, Datadog log pipelines.Long-running dev / staging, post-mortems
MultiTelemetryFan-out. Forwards every span + metric to every sink in declaration order. Trace IDs are shared.Watch live AND inspect after
from loomflow.observability import ( InMemoryTelemetry, ConsoleTelemetry, FileTelemetry, MultiTelemetry, )

All four share the same Telemetry protocol as OTelTelemetry, so swapping sinks never touches agent code.

Spans

SpanOpensAttributes
loom.runagent.run()session_id, user_id, architecture
loom.turneach iteration of the loopturn, session_id
loom.model.streamevery model callmodel, tokens_in, tokens_out, retries
loom.toolevery tool dispatchtool, ok, duration_ms
loom.workflow.stepevery node in a Workflowstep, node, pattern

For multi-agent architectures, additional spans nest naturally:

loom.run └── loom.turn (turn=0) └── loom.model.stream └── loom.turn (turn=1) ├── loom.tool (tool=delegate, worker=researcher) │ └── loom.run (sub-agent) │ └── loom.turn ... (recursive) └── loom.tool (tool=delegate, worker=writer) └── loom.run

The nested loom.run spans link sub-agent traces to their parent supervisor naturally, parent_span_id linkage is preserved across every sink, including FileTelemetry’s JSONL output.

Metrics

MetricTypeTags
loom.tokens.inputcountermodel, user_id
loom.tokens.outputcountermodel, user_id
loom.cost.usdcountermodel, user_id
loom.tool.duration_mshistogramtool, ok, user_id
loom.session.duration_mshistogramarchitecture, user_id
loom.budget.exceededcounterkind (max_tokens / max_cost_usd / max_wall_clock)
loom.auto_extract.duration_mshistogramuser_id, status
loom.auto_extract.invocationscounteruser_id, status

user_id is added as a tag whenever a RunContext is active. Every metric is automatically attributed without manual plumbing.

Histogram-vs-counter dispatch is automatic by metric name suffix: _ms / _seconds / _bytes → histogram; everything else → counter. One emit_metric() API, the right instrument under the hood. And the in-memory / console / file sinks tag each captured metric with the same instrument_kind so assertions match what OTLP would have emitted.

NoTelemetry. The default

from loomflow import Agent, NoTelemetry agent = Agent("...", model="...", telemetry=NoTelemetry()) # explicit agent = Agent("...", model="...") # implicit (same)

NoTelemetry makes every span / metric call a no-op. The agent loop detects this at construction and skips the async with telemetry.trace(...) context managers entirely on the hot path.

Worked example. Inspect spans + metrics in-process

The simplest way to see what an agent emits is InMemoryTelemetry. No collector, no SDK, no log file. Just the captured records as plain dataclasses:

import asyncio from loomflow import Agent, ScriptedModel, ScriptedTurn, ToolCall, tool from loomflow.observability import InMemoryTelemetry @tool async def add(a: int, b: int) -> int: """Add two integers.""" return a + b async def main(): telemetry = InMemoryTelemetry() # ScriptedModel keeps the run deterministic — no network call model = ScriptedModel([ ScriptedTurn(tool_calls=[ ToolCall(id="c1", tool="add", args={"a": 2, "b": 3}) ]), ScriptedTurn(text="The sum is 5."), ]) agent = Agent( "Arithmetic assistant.", model=model, tools=[add], telemetry=telemetry, ) await agent.run("What is 2 + 3?", user_id="alice") # Spans carry timing + parent-span linkage for s in telemetry.spans(): attrs = ", ".join(f"{k}={v}" for k, v in s.attributes.items()) print(f"{s.name:<22} ({s.duration_ms:5.1f}ms) [{attrs}]") # Metrics carry their auto-detected instrument kind for m in telemetry.metrics(): print(f"{m.name:<28} {m.value:<8} ({m.instrument_kind})") asyncio.run(main())

Output looks like:

loom.run ( 4.2ms) [session_id=run_..., user_id=alice] loom.turn ( 2.1ms) [turn=0, session_id=run_...] loom.model.stream ( 1.8ms) [model=ScriptedModel, ...] loom.tool ( 0.2ms) [tool=add, ok=true] loom.turn ( 0.8ms) [turn=1, session_id=run_...] loom.model.stream ( 0.7ms) [model=ScriptedModel, ...] loom.tokens.input 42 (counter) loom.tokens.output 12 (counter) loom.cost.usd 0.00012 (counter) loom.session.duration_ms 4.2 (histogram)

CapturedSpan carries name, trace_id, span_id, parent_span_id, started_at, ended_at, duration_ms, attributes, and an exception repr if the body raised. CapturedMetric carries name, value, instrument_kind, attributes, and emitted_at. Both are frozen dataclasses. Assert on them directly in tests.

Tail in stderr with ConsoleTelemetry

Swap the sink to see spans appear live as the agent runs, with nested-trace indentation:

import sys from loomflow.observability import ConsoleTelemetry agent = Agent( "...", model="...", telemetry=ConsoleTelemetry(stream=sys.stderr, show_metrics=False), )
• loom.run (4.2ms) session_id=run_xyz, user_id=alice • loom.turn (2.1ms) turn=0, session_id=run_xyz • loom.model.stream (1.8ms) model=ScriptedModel • loom.tool (0.2ms) tool=add, ok=true • loom.turn (0.8ms) turn=1 • loom.model.stream (0.7ms)

Indentation tracks the active parent span automatically. Even when spans open inside anyio.create_task_group() (the framework’s parallel-tool-dispatch path).

Write JSONL with FileTelemetry

from loomflow.observability import FileTelemetry agent = Agent( "...", model="...", telemetry=FileTelemetry("./traces.jsonl"), )

Each line is a structured JSON record with "kind": "span" or "kind": "metric" discriminator. Span records carry the parent_span_id linkage needed to reconstruct the trace tree offline. Writes go through anyio.to_thread.run_sync so the event loop never blocks on disk I/O; an internal lock serialises concurrent writes from parallel tool dispatches.

Query offline with jq:

# Spans that took longer than 1s jq -c 'select(.kind=="span" and .duration_ms > 1000)' traces.jsonl # One user's session jq -c 'select(.attributes.user_id=="alice")' traces.jsonl # All cost metrics jq -c 'select(.kind=="metric" and .name=="loom.cost.usd")' traces.jsonl

No rotation built in. Use logrotate / journald / your platform’s log management to cap file size. The framework deliberately stays out of that policy decision.

FileTelemetry ≠ FileAuditLog. They capture different things: audit log = business events for compliance (“did Alice’s refund go through?”); telemetry = performance / diagnostic spans (“why was this run slow?”). Run both together in production.

Fan out with MultiTelemetry

Watch live AND assert in tests by stacking sinks:

from loomflow.observability import ( ConsoleTelemetry, InMemoryTelemetry, MultiTelemetry, ) in_mem = InMemoryTelemetry() telemetry = MultiTelemetry([ConsoleTelemetry(), in_mem]) agent = Agent(..., telemetry=telemetry) await agent.run("...") # Watched live in stderr — and still assertable assert any(s.name == "loom.tool" for s in in_mem.spans())

Span IDs are minted by the first sink and shared with the others, so trace hierarchy stays consistent across every capture. Exceptions inside one sink’s trace propagate after every other sink has had a chance to record its cleanup (finally blocks fire even on exceptional exit thanks to AsyncExitStack).

The framework ships examples/13_telemetry.py running all four sinks end-to-end against a ScriptedModel. No API key required.

Production: OTelTelemetry

For Honeycomb / Datadog / Grafana Tempo / Jaeger / any OTLP-compatible backend, wire the OpenTelemetry SDK and pass it to OTelTelemetry:

from loomflow.observability import OTelTelemetry from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter tracer_provider = TracerProvider() tracer_provider.add_span_processor( BatchSpanProcessor(OTLPSpanExporter(endpoint="https://otlp.your-vendor")) ) metric_reader = PeriodicExportingMetricReader( OTLPMetricExporter(endpoint="https://otlp.your-vendor") ) meter_provider = MeterProvider(metric_readers=[metric_reader]) telemetry = OTelTelemetry( tracer_provider=tracer_provider, meter_provider=meter_provider, ) agent = Agent("...", model="...", telemetry=telemetry)

Install the OTel extras:

pip install 'loomflow[otel]'

The adapter is the same Telemetry protocol. Swapping sinks never touches agent code.

Custom telemetry

Any class with two methods satisfies the Telemetry protocol:

from collections.abc import AsyncIterator from contextlib import asynccontextmanager class MyTelemetry: @asynccontextmanager async def trace(self, name: str, **attrs): # Span open try: yield finally: # Span close ... async def emit_metric(self, name: str, value: float, **attrs): ...

Useful when you have a custom metric backend not covered by OTel (e.g. a homegrown StatsD or Prometheus pushgateway). Wrap it in MultiTelemetry alongside one of the built-in sinks to keep the live-tail / file-on-disk affordances while still sending to your backend.

Telemetry vs the audit log. Spans are for operational observability. Latency, error rates, dependencies. The audit log is for compliance observability. Who did what, with HMAC signatures. Different lifecycles, different retention, different consumers. Wire both; they don’t overlap.

Last updated on