Explicit graph builder
For cases where the graph IS the artifact (compliance flows, BPMN-like approval chains), drop the sugar constructors and use the builder directly.
from loomflow import Workflow, END
wf = Workflow("triage")
wf.add_node("classify", classify)
wf.add_node("billing", billing_agent)
wf.add_node("tech", tech_agent)
wf.set_start("classify")
wf.add_router(
"classify",
lambda r: r.lower(),
{"billing": "billing", "tech": "tech"},
)
wf.add_edge("billing", END)
wf.add_edge("tech", END)Constructor
class Workflow:
def __init__(
self,
name: str = "workflow",
*,
telemetry: Telemetry | None = None,
audit_log: AuditLog | None = None,
max_steps: int = 100,
max_visits_per_node: int = 25,
) -> None: ...| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | "workflow" | Used in telemetry spans and audit entries. |
telemetry | Telemetry | None | None | Wire OTelTelemetry(...) to emit loom.workflow.step spans. |
audit_log | AuditLog | None | None | Wire FileAuditLog(...) for per-step audit attribution. |
max_steps | int | 100 | Total steps executed in one run. Linear flows visit each node once; cyclic flows pay this budget per iteration. Must be >= 1. |
max_visits_per_node | int | 25 | Cap on any single node’s re-entries. Tighter than max_steps because most runaways are one node looping on itself. Must be >= 1. |
Both safety caps raise RuntimeError when exceeded, naming the
offending node. The canonical signal that your routing logic never
picks the termination branch.
Builder methods
add_node(name, fn)
def add_node(self, name: str, fn: StepLike) -> Workflow: ...Register a node. fn can be an async def, sync function, Agent,
or nested Workflow. Returns self for chaining.
| Parameter | Type | Description |
|---|---|---|
name | str | Unique identifier within the workflow. Re-registering raises ValueError. |
fn | StepLike | The work the node performs. |
wf.add_node("extract", extract_fn)
wf.add_node("score", scoring_agent) # Agent works
wf.add_node("subflow", another_workflow) # nested Workflow worksadd_edge(source, target)
def add_edge(
self,
source: str | _Sentinel,
target: str | _Sentinel,
) -> Workflow: ...Unconditional edge from source to target.
| Parameter | Type | Description |
|---|---|---|
source | str | _Sentinel | A registered node name, or the START sentinel (alias for set_start(target)). |
target | str | _Sentinel | A registered node name or END. The framework doesn’t validate the string against _nodes until run time, so forward references work. |
wf.add_edge("extract", "score")
wf.add_edge("score", END)add_edge(START, "node") (0.9.18+) is an alias for
set_start("node"). Lets graphs read symmetrically with the END
sentinel:
from loomflow import Workflow, START, END
wf.add_edge(START, "classify") # alias for set_start("classify")
wf.add_router(...)
wf.add_edge("billing", END)add_edge(START, END) and add_edge(END, ...) are rejected with
messages pointing at the right method. set_start("node") continues
to work. Pick whichever form reads better.
add_router(source, fn, routes, *, default=None)
def add_router(
self,
source: str | _Sentinel,
fn: Callable[[Any], Any] | Callable[[Any], Awaitable[Any]],
routes: Mapping[str, str | _Sentinel],
*,
default: str | _Sentinel | None = None,
) -> Workflow: ...Conditional edge. At run time fn(input_value) is called; the
returned key is looked up in routes to pick the next node.
| Parameter | Type | Description |
|---|---|---|
source | str | _Sentinel | A registered node, OR the START sentinel for a router at the entry of the graph (0.9.20+). |
fn | Callable[[Any], Any | Awaitable[Any]] | Returns a routing key. The result is str()-cast before lookup. async def is supported (0.9.24+). Awaited automatically when the call returns a coroutine. |
routes | Mapping[str, str | _Sentinel] | Routing key → next-node name (or END). |
default | str | _Sentinel | None | Fallback when no key matches. When None, an unmatched key raises RuntimeError at run time. |
Routers and edges are mutually exclusive on the same non-START
source. Calling add_edge(s, ...) after add_router(s, ...)
overwrites the router, and vice versa. set_start("node") and
add_router(START, ...) are also mutually exclusive.
add_router(START, ...). Branch at the entry of the graph
(0.9.20+). Lets you classify the workflow’s input and dispatch
to one of N first nodes without a synthetic passthrough “entry”
step. Mirrors LangGraph’s add_conditional_edges(START, ...):
from loomflow import Workflow, START, END
wf = Workflow()
wf.add_node("step_1", step_1)
wf.add_node("step_3", step_3)
wf.add_router(
START,
fn=lambda q: "step_1" if "work" in q else "step_3",
routes={"step_1": "step_1", "step_3": "step_3"},
default=END,
)
wf.add_edge("step_1", END)
wf.add_edge("step_3", END)Route targets are validated at build time when source=START ,
typos raise a clear error before the run starts. The
to_mermaid / to_dot renders show START -->|key| node directly
(no synthetic entry node in the diagram).
wf.add_router(
"score",
lambda r: "high" if r["score"] >= 0.8 else "low",
{"high": "ship", "low": "revise"},
default="revise",
)set_start(node)
def set_start(self, node: str) -> Workflow: ...Mark a registered node as the entry point. Required before run /
stream. Calling them without a start raises RuntimeError.
wf.set_start("extract")Running
run(input, *, user_id=None, session_id=None, metadata=None)
async def run(
self,
input: Any = None,
*,
user_id: str | None = None,
session_id: str | None = None,
metadata: dict[str, Any] | None = None,
) -> WorkflowResult: ...Execute the graph, return a WorkflowResult.
| Parameter | Type | Default | Description |
|---|---|---|---|
input | Any | None | The first node’s input. |
user_id | str | None | None | Multi-tenant partition key. Threaded into the live RunContext; nested Agent runs inherit it. |
session_id | str | None | None | Conversation continuity / replay key. Auto-generated when not provided (a wf_session_<ulid> value). |
metadata | dict[str, Any] | None | None | Free-form bag merged into the RunContext.metadata. |
Returns, WorkflowResult with output, visited, per_step.
stream(input, *, user_id=None, session_id=None, metadata=None)
async def stream(
self,
input: Any = None,
*,
user_id: str | None = None,
session_id: str | None = None,
metadata: dict[str, Any] | None = None,
) -> AsyncIterator[Event]: ...Same execution as run, exposed as an async generator of Events.
Yields:
WORKFLOW_STARTED. Once at the start.WORKFLOW_STEP_STARTED/WORKFLOW_STEP_COMPLETED. Per visited node.WORKFLOW_STEP_FAILED. If a node raises (the run also raises after cleanup).WORKFLOW_COMPLETED. Once at the end.
Consumers can break out of the iterator early to cancel the run.
WorkflowResult
@dataclass
class WorkflowResult:
output: Any
visited: list[str]
per_step: dict[str, Any]| Field | Description |
|---|---|
output | The final node’s return value. Type matches the last step’s return type; users wanting stronger typing can use a Pydantic model as the per-step value. |
visited | Node names in execution order, with repeats preserved. A linear flow visits each node once; a cyclic flow shows the full trace so callers can see iteration counts. Use set(result.visited) for “which nodes were touched”, Counter(result.visited) for per-node counts. |
per_step | Mapping of node name → its last value. With cycles, intermediate values for revisits are NOT preserved here (the event stream from stream() has the full per-iteration history). |
Cycles
Loom’s workflow supports cycles. Feedback loops (A → B → classify → (C \| D \| END) → B and similar refinement patterns) are
built-in. The two safety caps (max_steps /
max_visits_per_node) keep buggy routers from looping forever.
from loomflow import Workflow, END
wf = Workflow("refine_until_good", max_visits_per_node=5)
wf.add_node("draft", draft_fn)
wf.add_node("review", review_fn)
wf.add_node("judge", judge_fn)
wf.add_node("revise", revise_fn)
wf.set_start("draft")
wf.add_edge("draft", "review")
wf.add_edge("review", "judge")
wf.add_router(
"judge",
lambda r: "accept" if r["score"] >= 0.85 else "revise",
{"accept": END, "revise": "revise"},
)
wf.add_edge("revise", "review") # ← back-edge: closes the cycleIf the judge keeps picking “revise”, the workflow stops at
max_visits_per_node=5 re-entries of review and raises:
RuntimeError: workflow 'refine_until_good' re-entered 'review'
more than max_visits_per_node=5 times; the router controlling the
loop probably never picks the termination branchFor cycle-handling examples see examples/08_workflow_loop.py (in
the framework repo).
Composition with Agent
Pass an Agent to add_node; the framework calls .run(input).output
and threads the live RunContext through. Pass another Workflow to
nest. See Composition.
When to reach for the explicit builder
- Compliance flows. Multi-stage approval chains, BPMN-like shapes, sign-off gates. The graph is the contract.
- Cycles. Refinement loops, retry-with-feedback, escalation chains.
- Non-linear branching beyond “one classifier, then dispatch” , multi-stage routing, conditional skips, parallel-then-merge inside a larger flow.
For linear pipelines use Workflow.chain.
For one-shot classify-and-dispatch use
Workflow.route. The explicit builder is
worth its ceremony when the graph shape is the documentation.
The graph as code. A common pattern: keep the graph builder calls
as the top-of-file declaration, paired with a small wf.to_mermaid()
helper that emits the diagram for your README / runbook. The builder
output IS the canonical description of the flow.