Workflow.parallel
from loomflow import Workflow
wf = Workflow.parallel([fn_a, fn_b, fn_c], merge=combine)Fan-out to N steps with the same input, then merge the results.
Steps run concurrently via anyio.create_task_group.
Signature
@classmethod
def parallel(
cls,
steps: list[StepLike],
*,
merge: Callable[[list[Any]], Any] | None = None,
name: str = "parallel",
telemetry: Telemetry | None = None,
audit_log: AuditLog | None = None,
max_steps: int = 100,
max_visits_per_node: int = 25,
) -> Workflow: ...| Parameter | Type | Default | Description |
|---|---|---|---|
steps | list[StepLike] | required | Non-empty list. Each step receives the same input (the workflow’s input value). Steps run concurrently. |
merge | Callable[[list[Any]], Any] | None | None | Combines results into the final output. Receives a list of results in input order. When None, the final output IS the list of results. |
name | str | "parallel" | Workflow name. |
telemetry / audit_log / max_steps / max_visits_per_node | . | , | Same as Workflow.chain. |
Each step can be an async def, a sync function, an Agent, or a
nested Workflow. Same coercion rules as Workflow.chain.
Example. Fan-out / merge
from loomflow import Agent, Workflow
researcher = Agent("Find sources.", model="gpt-4.1-mini", tools=[search])
critic = Agent("Identify weak claims.", model="gpt-4.1-mini")
budgeter = Agent("Estimate effort.", model="gpt-4.1-mini")
def combine(results: list[str]) -> dict:
return {
"research": results[0],
"criticism": results[1],
"budget": results[2],
}
wf = Workflow.parallel(
[researcher, critic, budgeter],
merge=combine,
name="brainstorm_pass",
)
result = await wf.run("Should we adopt agent harnesses?", user_id="alice")
print(result.output)
# {'research': '...', 'criticism': '...', 'budget': '...'}The three Agents see the same prompt and run concurrently. The merge function builds a structured result.
Example. No merge (default returns the list)
wf = Workflow.parallel([fn_a, fn_b, fn_c])
result = await wf.run(input)
print(result.output) # [<fn_a result>, <fn_b result>, <fn_c result>]When merge=None, the workflow’s output is just the list of results
in input order. Useful when downstream code (or a wrapping chain
step) handles the aggregation.
Concurrency semantics
- Steps run inside one
anyio.create_task_group. If any step raises, the task group cancels the others and the workflow propagates the exception. - Result order matches input order, regardless of completion order.
results[i]is the i-th step’s return value. - A slow step blocks the merge. Total wall-clock is
max(step_durations), notsum(step_durations).
For “best-of-3” style patterns where you want the fastest result and
don’t need the others, use anyio.fail_after(...) or implement a
custom async def step that races with anyio.create_task_group.
Sub-Agents inside a parallel run
When the steps are Agent instances, each agent runs as a separate
sub-run with the same user_id / session_id. The audit log records
each agent’s run_started / run_completed independently. Same
user_id, same session_id, but distinct sub-run_ids nested under
the workflow’s session.
wf = Workflow.parallel([agent_a, agent_b, agent_c])
result = await wf.run("...", user_id="alice", session_id="bb_42")
# Audit log will have:
# workflow_started | user_id=alice session_id=bb_42
# step_started node=fan_out
# run_started | user_id=alice session_id=bb_42 (agent_a's sub-run)
# run_started | user_id=alice session_id=bb_42 (agent_b's sub-run)
# run_started | user_id=alice session_id=bb_42 (agent_c's sub-run)
# ... (per-agent tool calls)
# run_completed × 3
# step_completed node=fan_out
# workflow_completedWhen to reach for parallel
- Brainstorm passes. Get N perspectives on the same prompt then synthesize.
- Multi-source retrieval. Fan out to N search backends, merge hits.
- Validation cross-check. Run a result through N validators (regex / schema / LLM) and aggregate.
For “debate” / “actor-critic” / “tree-of-thoughts” style multi-pass
exploration, use the architectures
(MultiAgentDebate /
ActorCritic /
TreeOfThoughts). They
ship the iteration logic. Workflow.parallel is the right primitive
when you want a one-shot fan-out + merge.
Steps that share state break the contract. Workflow.parallel
assumes each step is independent given the input. If two steps
mutate a shared mutable, results are nondeterministic. For shared
state, use Workflow.chain (sequential) or
serialize the access yourself.