Workflow.chain
from loomflow import Workflow
wf = Workflow.chain([step1, step2, step3])The simplest workflow shape: a list of steps that run in order. Each step receives the previous step’s return value; the final step’s return is the workflow’s output.
Signature
@classmethod
def chain(
cls,
steps: list[StepLike],
*,
name: str = "chain",
telemetry: Telemetry | None = None,
audit_log: AuditLog | None = None,
max_steps: int = 100,
max_visits_per_node: int = 25,
) -> Workflow: ...| Parameter | Type | Default | Description |
|---|---|---|---|
steps | list[StepLike] | required | Non-empty list. Each entry can be an async def, a sync function, an Agent, or another Workflow. The framework coerces each to an awaitable. |
name | str | "chain" | Workflow name used in telemetry / audit. |
telemetry | Telemetry | None | None | Wire OTelTelemetry(...) to emit loom.workflow.step spans per step. |
audit_log | AuditLog | None | None | Wire FileAuditLog(...) to record step_started / step_completed entries with user_id attribution. |
max_steps | int | 100 | Total steps executed in one run / stream call. Linear chains visit each step once. |
max_visits_per_node | int | 25 | Per-node visit cap. Linear chains are 1; the cap matters when chains are composed inside cyclic graphs. |
StepLike accepts:
- An
async defcallable. Awaited directly with the previous output. - A sync function. Dispatched to a worker thread via
anyio.to_thread.run_sync. - An
Agentinstance. The framework calls.run(input).outputand threads the liveRunContext(user_id/session_id) through. - Another
Workflowinstance. Calls.run(input).outputand merges the inner trace.
Example. Pure Python steps
import asyncio
from loomflow import Workflow
async def normalize(text: str) -> str:
return text.strip().lower()
async def summarize(text: str) -> str:
return f"Summary: {text[:60]}"
async def emit(text: str) -> dict:
return {"summary": text, "len": len(text)}
wf = Workflow.chain([normalize, summarize, emit], name="summarize_pipeline")
result = await wf.run(" HELLO WORLD ", user_id="alice")
print(result.output) # {'summary': 'Summary: hello world', 'len': 21}
print(result.visited) # ['normalize', 'summarize', 'emit']Example. Agents in the chain
from loomflow import Agent, Workflow
extract = Agent("Extract dates and amounts.", model="gpt-4.1-mini")
classify = Agent("Tag the document type.", model="gpt-4.1-mini")
respond = Agent("Draft a reply.", model="gpt-4.1-mini")
wf = Workflow.chain([extract, classify, respond])
# Each Agent's `.run(input).output` is the next step's input.
# user_id / session_id / metadata are forwarded automatically.
result = await wf.run(invoice_text, user_id="alice")Mixing types
import asyncio
from loomflow import Agent, Workflow
async def fetch(url: str) -> str:
return await client.get(url)
def normalize(text: str) -> str: # sync — auto-threadpooled
return text.strip()
extractor = Agent("Extract key facts.", model="gpt-4.1-mini")
inner_wf = Workflow.chain([extractor, lambda x: f"FACTS: {x}"])
wf = Workflow.chain([
fetch, # async function
normalize, # sync function
extractor, # Agent
inner_wf, # nested Workflow
])The framework coerces each step uniformly. There’s no special “this
is an agent” mode. Passing an Agent to add_node (or here to
chain) is the supported way to put LLM reasoning inside a workflow.
Duplicate step names
Steps’ display names are derived from __name__ / name attribute /
fallback step_N. When the same callable is passed twice, the second
occurrence is renamed with a numeric suffix:
wf = Workflow.chain([fn, fn, fn])
# Nodes: 'fn', 'fn_1', 'fn_2'This keeps the graph well-formed. Node names must be unique.
Inspecting the run
WorkflowResult has three fields populated by every run:
@dataclass
class WorkflowResult:
output: Any # the last node's return value
visited: list[str] # node names in execution order (with repeats)
per_step: dict[str, Any] # node name → its last outputFor the full per-iteration trace use wf.stream(...). See
The @step decorator and the events
section in Composition.
When to reach for chain
- Linear pipelines. Extract → transform → load.
- Pre-/post-processing around an Agent. Normalize input, run agent, post-process output.
- Trivial composition. Wrap two existing functions or Agents to
inherit Loom’s observability +
user_idpropagation.
For branching, use Workflow.route. For
fan-out, use Workflow.parallel. For
cycles, drop to the explicit graph builder.
Lower-friction than the explicit builder. chain is a thin
wrapper over add_node + add_edge + set_start that just spells
out the graph for you. If your pipeline is genuinely linear, prefer
chain. The call site is shorter and the intent reads at a glance.