Workflow
from loomflow import Workflow, WorkflowResult, step, START, ENDDeveloper-controlled DAG. The peer primitive to
Agent. Construct with the explicit graph builder
(add_node / add_edge / set_start) for full control, or use one
of the sugar classmethods for common shapes.
For the conceptual page see Workflow.
Class signature
class Workflow:
name: str
def __init__(
self,
name: str = "workflow",
*,
memory: Memory | None = None,
telemetry: Telemetry | None = None,
audit_log: AuditLog | str | Path | None = None,
max_steps: int = 100,
max_visits_per_node: int = 25,
response_tone: str | None = None,
) -> None: ...Constructor parameters
name
| Type | str |
| Default | "workflow" |
Used in telemetry spans and audit entries. Pick something descriptive , it shows up in traces and dashboards.
memory (0.9.19+)
| Type | Memory | None |
| Default | None |
Workflow-owned memory shared across every nested Agent step that
didn’t bring its own. Episodes / facts written by one agent become
recall-able by the next without per-agent wiring.
from loomflow import Workflow, Agent, InMemoryMemory
mem = InMemoryMemory()
agent_a = Agent("...", model="gpt-4.1-mini")
agent_b = Agent("...", model="gpt-4.1-mini")
wf = Workflow.chain([agent_a, agent_b], memory=mem)
await wf.run("hi", user_id="alice", session_id="conv-1")
# Both agents wrote to / read from `mem`.Resolution order. Explicit always wins:
Agent(memory=my_mem)keeps usingmy_memeven inside a workflow that hasmemory=set.- Otherwise
Workflow(memory=mem)is used as the fallback. - Otherwise the agent’s per-instance default (in-memory).
Implemented via a contextvar that the workflow installs at run start
and resets in finally, so memory does not leak across workflow runs.
telemetry
| Type | Telemetry | None |
| Default | None (no spans) |
Wire any Telemetry sink to emit loom.workflow.run and per-step
loom.workflow.step spans tagged with step / user_id /
session_id / pattern="workflow". See Telemetry.
audit_log
| Type | AuditLog | str | Path | None |
| Default | None |
Wire FileAuditLog(...) to record step_started / step_completed
/ step_failed entries with user_id attribution. Same audit log
the rest of the framework writes to. Agent and workflow entries
interleave under the same session_id.
Convenience (0.9.14+): pass a str or Path and the framework
auto-wraps as FileAuditLog, so you don’t need to import the backend
just to write to a file:
wf = Workflow.chain([...], audit_log="run.log")
# equivalent to: audit_log=FileAuditLog("run.log")Anything else (e.g. a bare list) is rejected at construction time
with a TypeError listing the four valid forms (InMemoryAuditLog,
FileAuditLog, path string / Path, None).
max_steps
| Type | int |
| Default | 100 |
Hard cap on total step executions per run / stream call. Linear
chains visit each step once; cyclic flows pay this budget per
iteration. Must be >= 1. Hitting the cap raises RuntimeError
naming the offending node.
max_visits_per_node
| Type | int |
| Default | 25 |
Per-node visit cap. Tighter than max_steps because most runaways
are one node looping on itself. Must be >= 1.
RuntimeError: workflow 'refine_until_good' re-entered 'review'
more than max_visits_per_node=25 times; the router controlling the
loop probably never picks the termination branchresponse_tone (0.9.32+)
| Type | str | None |
| Default | None |
Workflow-level ambient tone that propagates to every nested Agent
step that didn’t bring its own response_tone=. Preset name
("casual" / "professional" / "technical" / "legal" /
"finance" / "executive" / "academic") or any free-form string.
from loomflow import Workflow
# Every agent step inherits "executive" unless it overrode response_tone= itself.
wf = Workflow.chain([researcher, writer, reviewer], response_tone="executive")Implemented via a contextvar (_ambient_response_tone_var in
loomflow.core.context) the workflow installs at the start of
run / stream and resets in finally. Same pattern as
Workflow(memory=...). Tones do not leak across workflow runs.
Resolution order for any nested Agent.run:
agent.run(..., response_tone=...)per-call.Agent(tuning=Tuning(response_tone=...))agent default.Workflow(response_tone=...)ambient (this kwarg).None. No tone directive.
See Agent.response_tone for
the full preset table and the instructions / persona / tone
positioning.
Builder methods
add_node
def add_node(self, name: str, fn: StepLike) -> Workflow: ...Register a node. Returns self for chaining. Re-registering the
same name raises ValueError.
| Parameter | Type | Description |
|---|---|---|
name | str | Unique node identifier within the workflow. |
fn | StepLike | An async def, sync function, Agent, or nested Workflow. |
StepLike coercion at run time:
| Input shape | How the framework calls it |
|---|---|
async def | await fn(prev_output) |
Sync def | await anyio.to_thread.run_sync(lambda: fn(prev_output)) |
Agent instance | await agent.run(prev_output, user_id=ctx.user_id, session_id=ctx.session_id) then .output |
Workflow instance | await wf.run(prev_output) then .output |
| Anything else | TypeError at registration |
add_edge
def add_edge(
self,
source: str | _Sentinel,
target: str | _Sentinel,
) -> Workflow: ...Unconditional edge. Target can be a registered node name or END;
source can be a registered node OR the START sentinel.
| Parameter | Type | Description |
|---|---|---|
source | str | _Sentinel | A registered node name, or the START sentinel (alias for set_start(target)). Anything else raises ValueError. |
target | str | _Sentinel | A node name or END. Forward references work, target is not validated against _nodes until run time. |
add_edge(START, "node") (0.9.18+). Equivalent to
set_start("node"). Lets graphs read symmetrically with the END
sentinel and matches the LangGraph mental model:
wf.add_edge(START, "first") # alias for set_start("first")
wf.add_edge("first", "second")
wf.add_edge("second", END)add_edge(START, END) and add_edge(END, ...) are rejected with
messages that point at the right method.
add_router
def add_router(
self,
source: str | _Sentinel,
fn: Callable[[Any], Any] | Callable[[Any], Awaitable[Any]],
routes: Mapping[str, str | _Sentinel],
*,
default: str | _Sentinel | None = None,
) -> Workflow: ...Conditional edge. At run time fn(input_value) is called; the
returned key is str()-cast and looked up in routes.
| Parameter | Type | Description |
|---|---|---|
source | str | _Sentinel | A registered node, OR the START sentinel for an entry-point router (0.9.20+). |
fn | Callable[[Any], Any | Awaitable[Any]] | Returns the routing key. Can be async def (0.9.24+). Awaited if it returns a coroutine. |
routes | Mapping[str, str | _Sentinel] | Routing key → next-node name (or END). Targets are validated at build time when source=START so typos raise before the run starts. |
default | str | _Sentinel | None | Fallback when no key matches. When None, an unmatched key raises RuntimeError at run time. |
add_edge(source, ...) and add_router(source, ...) on the same
non-START source are mutually exclusive. The second call overwrites
the first. set_start("node") and add_router(START, ...) are also
mutually exclusive. Whichever is called last wins.
add_router(START, ...) (0.9.20+) lets you classify the
workflow’s input and dispatch to one of N first nodes without a
synthetic passthrough “entry” step. Mirrors LangGraph’s
add_conditional_edges(START, ...):
from loomflow import Workflow, START, END
wf = Workflow()
wf.add_node("step_1", step_1)
wf.add_node("step_3", step_3)
wf.add_router(
START,
fn=lambda q: "step_1" if "work" in q else "step_3",
routes={"step_1": "step_1", "step_3": "step_3"},
default=END, # optional fallback
)
wf.add_edge("step_1", END)
wf.add_edge("step_3", END)The to_mermaid / to_dot renders show START -->|key| node
directly. No synthetic entry node in the diagram.
set_start
def set_start(self, node: str) -> Workflow: ...Mark a registered node as the entry point. Required before run /
stream. Raises ValueError for unregistered names.
Sugar constructors
Workflow.chain
@classmethod
def chain(
cls,
steps: list[StepLike],
*,
name: str = "chain",
telemetry: Telemetry | None = None,
audit_log: AuditLog | str | Path | None = None,
memory: Memory | None = None,
max_steps: int = 100,
max_visits_per_node: int = 25,
response_tone: str | None = None,
) -> Workflow: ...Linear sequence. See Workflow.chain.
Workflow.route
@classmethod
def route(
cls,
classifier: StepLike,
routes: Mapping[str, StepLike],
*,
default: StepLike | None = None,
name: str = "route",
telemetry: Telemetry | None = None,
audit_log: AuditLog | str | Path | None = None,
memory: Memory | None = None,
max_steps: int = 100,
max_visits_per_node: int = 25,
response_tone: str | None = None,
) -> Workflow: ...Classify-and-dispatch. See Workflow.route. Handlers receive the original input, not the classifier’s output.
Workflow.parallel
@classmethod
def parallel(
cls,
steps: list[StepLike],
*,
merge: Callable[[list[Any]], Any] | None = None,
name: str = "parallel",
telemetry: Telemetry | None = None,
audit_log: AuditLog | str | Path | None = None,
memory: Memory | None = None,
max_steps: int = 100,
max_visits_per_node: int = 25,
response_tone: str | None = None,
) -> Workflow: ...Fan-out then merge. See Workflow.parallel.
Steps run concurrently via anyio.create_task_group.
Running
run
async def run(
self,
input: Any = None,
*,
user_id: str | None = None,
session_id: str | None = None,
metadata: dict[str, Any] | None = None,
) -> WorkflowResult: ...Execute the graph. Returns a WorkflowResult.
| Parameter | Type | Default | Description |
|---|---|---|---|
input | Any | None | The first node’s input. |
user_id | str | None | None | Multi-tenant partition key. Threaded into the live RunContext; nested Agent runs inherit it. |
session_id | str | None | None | Session id. Auto-generated as wf_session_<ulid> when not provided. |
metadata | dict[str, Any] | None | None | Free-form bag merged into RunContext.metadata. |
stream
async def stream(
self,
input: Any = None,
*,
user_id: str | None = None,
session_id: str | None = None,
metadata: dict[str, Any] | None = None,
) -> AsyncIterator[Event]: ...Same execution as run, exposed as an async generator of Events.
Yields:
Event.kind | When |
|---|---|
WORKFLOW_STARTED | At the start of the run. |
WORKFLOW_STEP_STARTED | Before each node executes. |
WORKFLOW_STEP_COMPLETED | After each node returns successfully. |
WORKFLOW_STEP_FAILED | If a node raises (the run also raises after cleanup). |
WORKFLOW_COMPLETED | At the end of the run. |
Consumers can break out of the iterator early to cancel the run.
as_tool
def as_tool(
self,
*,
name: str | None = None,
description: str | None = None,
input_arg: str = "input",
) -> Tool: ...Expose this workflow as a Tool an Agent can invoke.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | None | None (uses self.name) | Tool name shown to the model. |
description | str | None | None (auto-generated) | Tool description. The default is f"Run the {self.name!r} workflow." |
input_arg | str | "input" | Names the single string parameter the tool accepts. The framework forwards that value to wf.run(...) and returns result.output. |
See Composition. Direction 2.
Visualization (0.9.16+)
to_mermaid
def to_mermaid(self) -> str: ...Returns a Mermaid flowchart TD rendering of the graph. Pastes
directly into GitHub Markdown (renders inline) or
mermaid.live for PNG / SVG export.
- Solid arrows are unconditional edges.
- Labelled solid arrows are router branches.
- Dotted arrows are router defaults.
STARTandENDrender as stadium-shaped nodes.
print(wf.to_mermaid())```mermaid
flowchart TD
START([START]) --> classify
classify -->|billing| route_billing
classify -->|tech| route_tech
classify -.-> route_default
route_billing --> END([END])
route_tech --> END
route_default --> END
```to_dot
def to_dot(self) -> str: ...Same picture as to_mermaid, emitted in Graphviz DOT for users who
prefer the Graphviz toolchain:
with open("graph.dot", "w") as f:
f.write(wf.to_dot())
# dot -Tpng graph.dot -o graph.pngOptional. Mermaid is the recommended path since it renders inline on GitHub / GitLab / Notion without any tool install.
_repr_markdown_
Jupyter / VS Code / JupyterLab auto-render the diagram inline when
you type wf into a cell. No imports, no extra calls. The markdown
representation is the Mermaid flowchart wrapped in a
```mermaid fence.
>>> wf
# In a Jupyter cell, displays the rendered diagram inline.Related types
WorkflowResult
@dataclass
class WorkflowResult:
output: Any
visited: list[str] = field(default_factory=list)
per_step: dict[str, Any] = field(default_factory=dict)| Field | Description |
|---|---|
output | The final node’s return value. Type matches the last step’s return type. |
visited | Node names in execution order, with repeats preserved. Linear flows visit each node once; cyclic flows show the full trace. Use set(result.visited) for “which nodes touched”, Counter(result.visited) for per-node counts. |
per_step | Mapping of node name → its last value. With cycles, intermediate values for revisits are NOT preserved here (use stream() for the full per-iteration history). |
START / END sentinels
from loomflow import START, ENDDistinct sentinel tokens compared by identity. Used as edge targets:
wf.add_edge("ship", END) # terminate after 'ship'
wf.add_router("classify", fn, {"a": "node_a", "b": END}) # 'b' terminatesSTART is a placeholder for the conceptual entry of a graph. Used
for documentation / introspection, not as an add_edge argument.
step decorator
def step(
fn: Callable[..., Awaitable[Any]] | None = None,
*,
name: str | None = None,
) -> Any: ...Adds telemetry + audit hooks to plain Python control flow.
Transparent outside a live RunContext. See
The @step decorator.
@step
async def classify(text: str) -> str: ...
@step(name="ml-classifier-v2")
async def classify_v2(text: str) -> str: ...Events emitted
Each WORKFLOW_STEP_COMPLETED event has payload:
{
"workflow": <workflow name>,
"node": <node name>,
"output": <node's return value>,
}WORKFLOW_STEP_FAILED payload includes error: str(exc) instead
of output.
Concurrency model
Workflow instances are safe to share across concurrent calls. Each
run() / stream() constructs its own per-run state; there’s no
cross-call mutation on the Workflow instance itself. Two wf.run()
calls in flight concurrently against the same workflow are safe ,
each gets its own RunContext.
The graph (nodes / edges / routers) is read-only at run time;
calling add_node / add_edge / add_router mid-run is undefined
(and not protected by a lock. Call them from a single thread before
the first run).
Source
Workflow and Agent are siblings. Neither inherits from the
other. Composition works because both expose a uniform internal
shape (async run(input, ...) → result with .output). Either can be
wrapped by the other; the framework detects the type and dispatches
correctly.