Skip to Content
DocsRecipesStreaming UI integration

Streaming UI integration

The stream() API yields events with backpressure. Wire it into a WebSocket / SSE / Server-Sent Events handler:

from fastapi import FastAPI from sse_starlette.sse import EventSourceResponse from loomflow import Agent app = FastAPI() agent = Agent("...", model="claude-opus-4-7") @app.get("/chat") async def chat(prompt: str): async def event_source(): async for event in agent.stream(prompt): yield { "event": event.kind.value, "data": event.model_dump_json(), } return EventSourceResponse(event_source())

Breaking out of the iteration cancels the producer cleanly. Even if a tool call is mid-flight, it’ll be cancelled within the cancel scope.

Last updated on