Skip to Content
DocsWorkflowExplicit graph builder

Explicit graph builder

For cases where the graph IS the artifact (compliance flows, BPMN-like approval chains), drop the sugar constructors and use the builder directly.

from loomflow import Workflow, END wf = Workflow("triage") wf.add_node("classify", classify) wf.add_node("billing", billing_agent) wf.add_node("tech", tech_agent) wf.set_start("classify") wf.add_router( "classify", lambda r: r.lower(), {"billing": "billing", "tech": "tech"}, ) wf.add_edge("billing", END) wf.add_edge("tech", END)

Constructor

class Workflow: def __init__( self, name: str = "workflow", *, telemetry: Telemetry | None = None, audit_log: AuditLog | None = None, max_steps: int = 100, max_visits_per_node: int = 25, ) -> None: ...
ParameterTypeDefaultDescription
namestr"workflow"Used in telemetry spans and audit entries.
telemetryTelemetry | NoneNoneWire OTelTelemetry(...) to emit loom.workflow.step spans.
audit_logAuditLog | NoneNoneWire FileAuditLog(...) for per-step audit attribution.
max_stepsint100Total steps executed in one run. Linear flows visit each node once; cyclic flows pay this budget per iteration. Must be >= 1.
max_visits_per_nodeint25Cap on any single node’s re-entries. Tighter than max_steps because most runaways are one node looping on itself. Must be >= 1.

Both safety caps raise RuntimeError when exceeded, naming the offending node. The canonical signal that your routing logic never picks the termination branch.


Builder methods

add_node(name, fn)

def add_node(self, name: str, fn: StepLike) -> Workflow: ...

Register a node. fn can be an async def, sync function, Agent, or nested Workflow. Returns self for chaining.

ParameterTypeDescription
namestrUnique identifier within the workflow. Re-registering raises ValueError.
fnStepLikeThe work the node performs.
wf.add_node("extract", extract_fn) wf.add_node("score", scoring_agent) # Agent works wf.add_node("subflow", another_workflow) # nested Workflow works

add_edge(source, target)

def add_edge( self, source: str | _Sentinel, target: str | _Sentinel, ) -> Workflow: ...

Unconditional edge from source to target.

ParameterTypeDescription
sourcestr | _SentinelA registered node name, or the START sentinel (alias for set_start(target)).
targetstr | _SentinelA registered node name or END. The framework doesn’t validate the string against _nodes until run time, so forward references work.
wf.add_edge("extract", "score") wf.add_edge("score", END)

add_edge(START, "node") (0.9.18+) is an alias for set_start("node"). Lets graphs read symmetrically with the END sentinel:

from loomflow import Workflow, START, END wf.add_edge(START, "classify") # alias for set_start("classify") wf.add_router(...) wf.add_edge("billing", END)

add_edge(START, END) and add_edge(END, ...) are rejected with messages pointing at the right method. set_start("node") continues to work. Pick whichever form reads better.

add_router(source, fn, routes, *, default=None)

def add_router( self, source: str | _Sentinel, fn: Callable[[Any], Any] | Callable[[Any], Awaitable[Any]], routes: Mapping[str, str | _Sentinel], *, default: str | _Sentinel | None = None, ) -> Workflow: ...

Conditional edge. At run time fn(input_value) is called; the returned key is looked up in routes to pick the next node.

ParameterTypeDescription
sourcestr | _SentinelA registered node, OR the START sentinel for a router at the entry of the graph (0.9.20+).
fnCallable[[Any], Any | Awaitable[Any]]Returns a routing key. The result is str()-cast before lookup. async def is supported (0.9.24+). Awaited automatically when the call returns a coroutine.
routesMapping[str, str | _Sentinel]Routing key → next-node name (or END).
defaultstr | _Sentinel | NoneFallback when no key matches. When None, an unmatched key raises RuntimeError at run time.

Routers and edges are mutually exclusive on the same non-START source. Calling add_edge(s, ...) after add_router(s, ...) overwrites the router, and vice versa. set_start("node") and add_router(START, ...) are also mutually exclusive.

add_router(START, ...). Branch at the entry of the graph (0.9.20+). Lets you classify the workflow’s input and dispatch to one of N first nodes without a synthetic passthrough “entry” step. Mirrors LangGraph’s add_conditional_edges(START, ...):

from loomflow import Workflow, START, END wf = Workflow() wf.add_node("step_1", step_1) wf.add_node("step_3", step_3) wf.add_router( START, fn=lambda q: "step_1" if "work" in q else "step_3", routes={"step_1": "step_1", "step_3": "step_3"}, default=END, ) wf.add_edge("step_1", END) wf.add_edge("step_3", END)

Route targets are validated at build time when source=START , typos raise a clear error before the run starts. The to_mermaid / to_dot renders show START -->|key| node directly (no synthetic entry node in the diagram).

wf.add_router( "score", lambda r: "high" if r["score"] >= 0.8 else "low", {"high": "ship", "low": "revise"}, default="revise", )

set_start(node)

def set_start(self, node: str) -> Workflow: ...

Mark a registered node as the entry point. Required before run / stream. Calling them without a start raises RuntimeError.

wf.set_start("extract")

Running

run(input, *, user_id=None, session_id=None, metadata=None)

async def run( self, input: Any = None, *, user_id: str | None = None, session_id: str | None = None, metadata: dict[str, Any] | None = None, ) -> WorkflowResult: ...

Execute the graph, return a WorkflowResult.

ParameterTypeDefaultDescription
inputAnyNoneThe first node’s input.
user_idstr | NoneNoneMulti-tenant partition key. Threaded into the live RunContext; nested Agent runs inherit it.
session_idstr | NoneNoneConversation continuity / replay key. Auto-generated when not provided (a wf_session_<ulid> value).
metadatadict[str, Any] | NoneNoneFree-form bag merged into the RunContext.metadata.

Returns, WorkflowResult with output, visited, per_step.

stream(input, *, user_id=None, session_id=None, metadata=None)

async def stream( self, input: Any = None, *, user_id: str | None = None, session_id: str | None = None, metadata: dict[str, Any] | None = None, ) -> AsyncIterator[Event]: ...

Same execution as run, exposed as an async generator of Events. Yields:

  • WORKFLOW_STARTED. Once at the start.
  • WORKFLOW_STEP_STARTED / WORKFLOW_STEP_COMPLETED. Per visited node.
  • WORKFLOW_STEP_FAILED. If a node raises (the run also raises after cleanup).
  • WORKFLOW_COMPLETED. Once at the end.

Consumers can break out of the iterator early to cancel the run.


WorkflowResult

@dataclass class WorkflowResult: output: Any visited: list[str] per_step: dict[str, Any]
FieldDescription
outputThe final node’s return value. Type matches the last step’s return type; users wanting stronger typing can use a Pydantic model as the per-step value.
visitedNode names in execution order, with repeats preserved. A linear flow visits each node once; a cyclic flow shows the full trace so callers can see iteration counts. Use set(result.visited) for “which nodes were touched”, Counter(result.visited) for per-node counts.
per_stepMapping of node name → its last value. With cycles, intermediate values for revisits are NOT preserved here (the event stream from stream() has the full per-iteration history).

Cycles

Loom’s workflow supports cycles. Feedback loops (A → B → classify → (C \| D \| END) → B and similar refinement patterns) are built-in. The two safety caps (max_steps / max_visits_per_node) keep buggy routers from looping forever.

from loomflow import Workflow, END wf = Workflow("refine_until_good", max_visits_per_node=5) wf.add_node("draft", draft_fn) wf.add_node("review", review_fn) wf.add_node("judge", judge_fn) wf.add_node("revise", revise_fn) wf.set_start("draft") wf.add_edge("draft", "review") wf.add_edge("review", "judge") wf.add_router( "judge", lambda r: "accept" if r["score"] >= 0.85 else "revise", {"accept": END, "revise": "revise"}, ) wf.add_edge("revise", "review") # ← back-edge: closes the cycle

If the judge keeps picking “revise”, the workflow stops at max_visits_per_node=5 re-entries of review and raises:

RuntimeError: workflow 'refine_until_good' re-entered 'review' more than max_visits_per_node=5 times; the router controlling the loop probably never picks the termination branch

For cycle-handling examples see examples/08_workflow_loop.py (in the framework repo).


Composition with Agent

Pass an Agent to add_node; the framework calls .run(input).output and threads the live RunContext through. Pass another Workflow to nest. See Composition.


When to reach for the explicit builder

  • Compliance flows. Multi-stage approval chains, BPMN-like shapes, sign-off gates. The graph is the contract.
  • Cycles. Refinement loops, retry-with-feedback, escalation chains.
  • Non-linear branching beyond “one classifier, then dispatch” , multi-stage routing, conditional skips, parallel-then-merge inside a larger flow.

For linear pipelines use Workflow.chain. For one-shot classify-and-dispatch use Workflow.route. The explicit builder is worth its ceremony when the graph shape is the documentation.

The graph as code. A common pattern: keep the graph builder calls as the top-of-file declaration, paired with a small wf.to_mermaid() helper that emits the diagram for your README / runbook. The builder output IS the canonical description of the flow.

Last updated on