Streaming UI integration
The stream() API yields events with backpressure. Wire it into a
WebSocket / SSE / Server-Sent Events handler:
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
from loomflow import Agent
app = FastAPI()
agent = Agent("...", model="claude-opus-4-7")
@app.get("/chat")
async def chat(prompt: str):
async def event_source():
async for event in agent.stream(prompt):
yield {
"event": event.kind.value,
"data": event.model_dump_json(),
}
return EventSourceResponse(event_source())Breaking out of the iteration cancels the producer cleanly. Even if a tool call is mid-flight, it’ll be cancelled within the cancel scope.
Last updated on