Skip to Content
DocsWorkflowWorkflow.chain

Workflow.chain

from loomflow import Workflow wf = Workflow.chain([step1, step2, step3])

The simplest workflow shape: a list of steps that run in order. Each step receives the previous step’s return value; the final step’s return is the workflow’s output.


Signature

@classmethod def chain( cls, steps: list[StepLike], *, name: str = "chain", telemetry: Telemetry | None = None, audit_log: AuditLog | None = None, max_steps: int = 100, max_visits_per_node: int = 25, ) -> Workflow: ...
ParameterTypeDefaultDescription
stepslist[StepLike]requiredNon-empty list. Each entry can be an async def, a sync function, an Agent, or another Workflow. The framework coerces each to an awaitable.
namestr"chain"Workflow name used in telemetry / audit.
telemetryTelemetry | NoneNoneWire OTelTelemetry(...) to emit loom.workflow.step spans per step.
audit_logAuditLog | NoneNoneWire FileAuditLog(...) to record step_started / step_completed entries with user_id attribution.
max_stepsint100Total steps executed in one run / stream call. Linear chains visit each step once.
max_visits_per_nodeint25Per-node visit cap. Linear chains are 1; the cap matters when chains are composed inside cyclic graphs.

StepLike accepts:

  • An async def callable. Awaited directly with the previous output.
  • A sync function. Dispatched to a worker thread via anyio.to_thread.run_sync.
  • An Agent instance. The framework calls .run(input).output and threads the live RunContext (user_id / session_id) through.
  • Another Workflow instance. Calls .run(input).output and merges the inner trace.

Example. Pure Python steps

import asyncio from loomflow import Workflow async def normalize(text: str) -> str: return text.strip().lower() async def summarize(text: str) -> str: return f"Summary: {text[:60]}" async def emit(text: str) -> dict: return {"summary": text, "len": len(text)} wf = Workflow.chain([normalize, summarize, emit], name="summarize_pipeline") result = await wf.run(" HELLO WORLD ", user_id="alice") print(result.output) # {'summary': 'Summary: hello world', 'len': 21} print(result.visited) # ['normalize', 'summarize', 'emit']

Example. Agents in the chain

from loomflow import Agent, Workflow extract = Agent("Extract dates and amounts.", model="gpt-4.1-mini") classify = Agent("Tag the document type.", model="gpt-4.1-mini") respond = Agent("Draft a reply.", model="gpt-4.1-mini") wf = Workflow.chain([extract, classify, respond]) # Each Agent's `.run(input).output` is the next step's input. # user_id / session_id / metadata are forwarded automatically. result = await wf.run(invoice_text, user_id="alice")

Mixing types

import asyncio from loomflow import Agent, Workflow async def fetch(url: str) -> str: return await client.get(url) def normalize(text: str) -> str: # sync — auto-threadpooled return text.strip() extractor = Agent("Extract key facts.", model="gpt-4.1-mini") inner_wf = Workflow.chain([extractor, lambda x: f"FACTS: {x}"]) wf = Workflow.chain([ fetch, # async function normalize, # sync function extractor, # Agent inner_wf, # nested Workflow ])

The framework coerces each step uniformly. There’s no special “this is an agent” mode. Passing an Agent to add_node (or here to chain) is the supported way to put LLM reasoning inside a workflow.


Duplicate step names

Steps’ display names are derived from __name__ / name attribute / fallback step_N. When the same callable is passed twice, the second occurrence is renamed with a numeric suffix:

wf = Workflow.chain([fn, fn, fn]) # Nodes: 'fn', 'fn_1', 'fn_2'

This keeps the graph well-formed. Node names must be unique.


Inspecting the run

WorkflowResult has three fields populated by every run:

@dataclass class WorkflowResult: output: Any # the last node's return value visited: list[str] # node names in execution order (with repeats) per_step: dict[str, Any] # node name → its last output

For the full per-iteration trace use wf.stream(...). See The @step decorator and the events section in Composition.


When to reach for chain

  • Linear pipelines. Extract → transform → load.
  • Pre-/post-processing around an Agent. Normalize input, run agent, post-process output.
  • Trivial composition. Wrap two existing functions or Agents to inherit Loom’s observability + user_id propagation.

For branching, use Workflow.route. For fan-out, use Workflow.parallel. For cycles, drop to the explicit graph builder.

Lower-friction than the explicit builder. chain is a thin wrapper over add_node + add_edge + set_start that just spells out the graph for you. If your pipeline is genuinely linear, prefer chain. The call site is shorter and the intent reads at a glance.

Last updated on