Skip to Content

Workflow

from loomflow import Workflow, WorkflowResult, step, START, END

Developer-controlled DAG. The peer primitive to Agent. Construct with the explicit graph builder (add_node / add_edge / set_start) for full control, or use one of the sugar classmethods for common shapes.

For the conceptual page see Workflow.


Class signature

class Workflow: name: str def __init__( self, name: str = "workflow", *, memory: Memory | None = None, telemetry: Telemetry | None = None, audit_log: AuditLog | str | Path | None = None, max_steps: int = 100, max_visits_per_node: int = 25, response_tone: str | None = None, ) -> None: ...

Constructor parameters

name

Typestr
Default"workflow"

Used in telemetry spans and audit entries. Pick something descriptive , it shows up in traces and dashboards.

memory (0.9.19+)

TypeMemory | None
DefaultNone

Workflow-owned memory shared across every nested Agent step that didn’t bring its own. Episodes / facts written by one agent become recall-able by the next without per-agent wiring.

from loomflow import Workflow, Agent, InMemoryMemory mem = InMemoryMemory() agent_a = Agent("...", model="gpt-4.1-mini") agent_b = Agent("...", model="gpt-4.1-mini") wf = Workflow.chain([agent_a, agent_b], memory=mem) await wf.run("hi", user_id="alice", session_id="conv-1") # Both agents wrote to / read from `mem`.

Resolution order. Explicit always wins:

  1. Agent(memory=my_mem) keeps using my_mem even inside a workflow that has memory= set.
  2. Otherwise Workflow(memory=mem) is used as the fallback.
  3. Otherwise the agent’s per-instance default (in-memory).

Implemented via a contextvar that the workflow installs at run start and resets in finally, so memory does not leak across workflow runs.

telemetry

TypeTelemetry | None
DefaultNone (no spans)

Wire any Telemetry sink to emit loom.workflow.run and per-step loom.workflow.step spans tagged with step / user_id / session_id / pattern="workflow". See Telemetry.

audit_log

TypeAuditLog | str | Path | None
DefaultNone

Wire FileAuditLog(...) to record step_started / step_completed / step_failed entries with user_id attribution. Same audit log the rest of the framework writes to. Agent and workflow entries interleave under the same session_id.

Convenience (0.9.14+): pass a str or Path and the framework auto-wraps as FileAuditLog, so you don’t need to import the backend just to write to a file:

wf = Workflow.chain([...], audit_log="run.log") # equivalent to: audit_log=FileAuditLog("run.log")

Anything else (e.g. a bare list) is rejected at construction time with a TypeError listing the four valid forms (InMemoryAuditLog, FileAuditLog, path string / Path, None).

max_steps

Typeint
Default100

Hard cap on total step executions per run / stream call. Linear chains visit each step once; cyclic flows pay this budget per iteration. Must be >= 1. Hitting the cap raises RuntimeError naming the offending node.

max_visits_per_node

Typeint
Default25

Per-node visit cap. Tighter than max_steps because most runaways are one node looping on itself. Must be >= 1.

RuntimeError: workflow 'refine_until_good' re-entered 'review' more than max_visits_per_node=25 times; the router controlling the loop probably never picks the termination branch

response_tone (0.9.32+)

Typestr | None
DefaultNone

Workflow-level ambient tone that propagates to every nested Agent step that didn’t bring its own response_tone=. Preset name ("casual" / "professional" / "technical" / "legal" / "finance" / "executive" / "academic") or any free-form string.

from loomflow import Workflow # Every agent step inherits "executive" unless it overrode response_tone= itself. wf = Workflow.chain([researcher, writer, reviewer], response_tone="executive")

Implemented via a contextvar (_ambient_response_tone_var in loomflow.core.context) the workflow installs at the start of run / stream and resets in finally. Same pattern as Workflow(memory=...). Tones do not leak across workflow runs.

Resolution order for any nested Agent.run:

  1. agent.run(..., response_tone=...) per-call.
  2. Agent(tuning=Tuning(response_tone=...)) agent default.
  3. Workflow(response_tone=...) ambient (this kwarg).
  4. None. No tone directive.

See Agent.response_tone for the full preset table and the instructions / persona / tone positioning.


Builder methods

add_node

def add_node(self, name: str, fn: StepLike) -> Workflow: ...

Register a node. Returns self for chaining. Re-registering the same name raises ValueError.

ParameterTypeDescription
namestrUnique node identifier within the workflow.
fnStepLikeAn async def, sync function, Agent, or nested Workflow.

StepLike coercion at run time:

Input shapeHow the framework calls it
async defawait fn(prev_output)
Sync defawait anyio.to_thread.run_sync(lambda: fn(prev_output))
Agent instanceawait agent.run(prev_output, user_id=ctx.user_id, session_id=ctx.session_id) then .output
Workflow instanceawait wf.run(prev_output) then .output
Anything elseTypeError at registration

add_edge

def add_edge( self, source: str | _Sentinel, target: str | _Sentinel, ) -> Workflow: ...

Unconditional edge. Target can be a registered node name or END; source can be a registered node OR the START sentinel.

ParameterTypeDescription
sourcestr | _SentinelA registered node name, or the START sentinel (alias for set_start(target)). Anything else raises ValueError.
targetstr | _SentinelA node name or END. Forward references work, target is not validated against _nodes until run time.

add_edge(START, "node") (0.9.18+). Equivalent to set_start("node"). Lets graphs read symmetrically with the END sentinel and matches the LangGraph mental model:

wf.add_edge(START, "first") # alias for set_start("first") wf.add_edge("first", "second") wf.add_edge("second", END)

add_edge(START, END) and add_edge(END, ...) are rejected with messages that point at the right method.

add_router

def add_router( self, source: str | _Sentinel, fn: Callable[[Any], Any] | Callable[[Any], Awaitable[Any]], routes: Mapping[str, str | _Sentinel], *, default: str | _Sentinel | None = None, ) -> Workflow: ...

Conditional edge. At run time fn(input_value) is called; the returned key is str()-cast and looked up in routes.

ParameterTypeDescription
sourcestr | _SentinelA registered node, OR the START sentinel for an entry-point router (0.9.20+).
fnCallable[[Any], Any | Awaitable[Any]]Returns the routing key. Can be async def (0.9.24+). Awaited if it returns a coroutine.
routesMapping[str, str | _Sentinel]Routing key → next-node name (or END). Targets are validated at build time when source=START so typos raise before the run starts.
defaultstr | _Sentinel | NoneFallback when no key matches. When None, an unmatched key raises RuntimeError at run time.

add_edge(source, ...) and add_router(source, ...) on the same non-START source are mutually exclusive. The second call overwrites the first. set_start("node") and add_router(START, ...) are also mutually exclusive. Whichever is called last wins.

add_router(START, ...) (0.9.20+) lets you classify the workflow’s input and dispatch to one of N first nodes without a synthetic passthrough “entry” step. Mirrors LangGraph’s add_conditional_edges(START, ...):

from loomflow import Workflow, START, END wf = Workflow() wf.add_node("step_1", step_1) wf.add_node("step_3", step_3) wf.add_router( START, fn=lambda q: "step_1" if "work" in q else "step_3", routes={"step_1": "step_1", "step_3": "step_3"}, default=END, # optional fallback ) wf.add_edge("step_1", END) wf.add_edge("step_3", END)

The to_mermaid / to_dot renders show START -->|key| node directly. No synthetic entry node in the diagram.

set_start

def set_start(self, node: str) -> Workflow: ...

Mark a registered node as the entry point. Required before run / stream. Raises ValueError for unregistered names.


Sugar constructors

Workflow.chain

@classmethod def chain( cls, steps: list[StepLike], *, name: str = "chain", telemetry: Telemetry | None = None, audit_log: AuditLog | str | Path | None = None, memory: Memory | None = None, max_steps: int = 100, max_visits_per_node: int = 25, response_tone: str | None = None, ) -> Workflow: ...

Linear sequence. See Workflow.chain.

Workflow.route

@classmethod def route( cls, classifier: StepLike, routes: Mapping[str, StepLike], *, default: StepLike | None = None, name: str = "route", telemetry: Telemetry | None = None, audit_log: AuditLog | str | Path | None = None, memory: Memory | None = None, max_steps: int = 100, max_visits_per_node: int = 25, response_tone: str | None = None, ) -> Workflow: ...

Classify-and-dispatch. See Workflow.route. Handlers receive the original input, not the classifier’s output.

Workflow.parallel

@classmethod def parallel( cls, steps: list[StepLike], *, merge: Callable[[list[Any]], Any] | None = None, name: str = "parallel", telemetry: Telemetry | None = None, audit_log: AuditLog | str | Path | None = None, memory: Memory | None = None, max_steps: int = 100, max_visits_per_node: int = 25, response_tone: str | None = None, ) -> Workflow: ...

Fan-out then merge. See Workflow.parallel. Steps run concurrently via anyio.create_task_group.


Running

run

async def run( self, input: Any = None, *, user_id: str | None = None, session_id: str | None = None, metadata: dict[str, Any] | None = None, ) -> WorkflowResult: ...

Execute the graph. Returns a WorkflowResult.

ParameterTypeDefaultDescription
inputAnyNoneThe first node’s input.
user_idstr | NoneNoneMulti-tenant partition key. Threaded into the live RunContext; nested Agent runs inherit it.
session_idstr | NoneNoneSession id. Auto-generated as wf_session_<ulid> when not provided.
metadatadict[str, Any] | NoneNoneFree-form bag merged into RunContext.metadata.

stream

async def stream( self, input: Any = None, *, user_id: str | None = None, session_id: str | None = None, metadata: dict[str, Any] | None = None, ) -> AsyncIterator[Event]: ...

Same execution as run, exposed as an async generator of Events. Yields:

Event.kindWhen
WORKFLOW_STARTEDAt the start of the run.
WORKFLOW_STEP_STARTEDBefore each node executes.
WORKFLOW_STEP_COMPLETEDAfter each node returns successfully.
WORKFLOW_STEP_FAILEDIf a node raises (the run also raises after cleanup).
WORKFLOW_COMPLETEDAt the end of the run.

Consumers can break out of the iterator early to cancel the run.

as_tool

def as_tool( self, *, name: str | None = None, description: str | None = None, input_arg: str = "input", ) -> Tool: ...

Expose this workflow as a Tool an Agent can invoke.

ParameterTypeDefaultDescription
namestr | NoneNone (uses self.name)Tool name shown to the model.
descriptionstr | NoneNone (auto-generated)Tool description. The default is f"Run the {self.name!r} workflow."
input_argstr"input"Names the single string parameter the tool accepts. The framework forwards that value to wf.run(...) and returns result.output.

See Composition. Direction 2.


Visualization (0.9.16+)

to_mermaid

def to_mermaid(self) -> str: ...

Returns a Mermaid flowchart TD rendering of the graph. Pastes directly into GitHub Markdown (renders inline) or mermaid.live  for PNG / SVG export.

  • Solid arrows are unconditional edges.
  • Labelled solid arrows are router branches.
  • Dotted arrows are router defaults.
  • START and END render as stadium-shaped nodes.
print(wf.to_mermaid())
```mermaid flowchart TD START([START]) --> classify classify -->|billing| route_billing classify -->|tech| route_tech classify -.-> route_default route_billing --> END([END]) route_tech --> END route_default --> END ```

to_dot

def to_dot(self) -> str: ...

Same picture as to_mermaid, emitted in Graphviz DOT for users who prefer the Graphviz toolchain:

with open("graph.dot", "w") as f: f.write(wf.to_dot()) # dot -Tpng graph.dot -o graph.png

Optional. Mermaid is the recommended path since it renders inline on GitHub / GitLab / Notion without any tool install.

_repr_markdown_

Jupyter / VS Code / JupyterLab auto-render the diagram inline when you type wf into a cell. No imports, no extra calls. The markdown representation is the Mermaid flowchart wrapped in a ```mermaid fence.

>>> wf # In a Jupyter cell, displays the rendered diagram inline.

WorkflowResult

@dataclass class WorkflowResult: output: Any visited: list[str] = field(default_factory=list) per_step: dict[str, Any] = field(default_factory=dict)
FieldDescription
outputThe final node’s return value. Type matches the last step’s return type.
visitedNode names in execution order, with repeats preserved. Linear flows visit each node once; cyclic flows show the full trace. Use set(result.visited) for “which nodes touched”, Counter(result.visited) for per-node counts.
per_stepMapping of node name → its last value. With cycles, intermediate values for revisits are NOT preserved here (use stream() for the full per-iteration history).

START / END sentinels

from loomflow import START, END

Distinct sentinel tokens compared by identity. Used as edge targets:

wf.add_edge("ship", END) # terminate after 'ship' wf.add_router("classify", fn, {"a": "node_a", "b": END}) # 'b' terminates

START is a placeholder for the conceptual entry of a graph. Used for documentation / introspection, not as an add_edge argument.

step decorator

def step( fn: Callable[..., Awaitable[Any]] | None = None, *, name: str | None = None, ) -> Any: ...

Adds telemetry + audit hooks to plain Python control flow. Transparent outside a live RunContext. See The @step decorator.

@step async def classify(text: str) -> str: ... @step(name="ml-classifier-v2") async def classify_v2(text: str) -> str: ...

Events emitted

Each WORKFLOW_STEP_COMPLETED event has payload:

{ "workflow": <workflow name>, "node": <node name>, "output": <node's return value>, }

WORKFLOW_STEP_FAILED payload includes error: str(exc) instead of output.


Concurrency model

Workflow instances are safe to share across concurrent calls. Each run() / stream() constructs its own per-run state; there’s no cross-call mutation on the Workflow instance itself. Two wf.run() calls in flight concurrently against the same workflow are safe , each gets its own RunContext.

The graph (nodes / edges / routers) is read-only at run time; calling add_node / add_edge / add_router mid-run is undefined (and not protected by a lock. Call them from a single thread before the first run).


Source

loomflow/workflow/__init__.py

Workflow and Agent are siblings. Neither inherits from the other. Composition works because both expose a uniform internal shape (async run(input, ...) → result with .output). Either can be wrapped by the other; the framework detects the type and dispatches correctly.

Last updated on