Skip to Content
DocsWorkflowWorkflow.parallel

Workflow.parallel

from loomflow import Workflow wf = Workflow.parallel([fn_a, fn_b, fn_c], merge=combine)

Fan-out to N steps with the same input, then merge the results. Steps run concurrently via anyio.create_task_group.


Signature

@classmethod def parallel( cls, steps: list[StepLike], *, merge: Callable[[list[Any]], Any] | None = None, name: str = "parallel", telemetry: Telemetry | None = None, audit_log: AuditLog | None = None, max_steps: int = 100, max_visits_per_node: int = 25, ) -> Workflow: ...
ParameterTypeDefaultDescription
stepslist[StepLike]requiredNon-empty list. Each step receives the same input (the workflow’s input value). Steps run concurrently.
mergeCallable[[list[Any]], Any] | NoneNoneCombines results into the final output. Receives a list of results in input order. When None, the final output IS the list of results.
namestr"parallel"Workflow name.
telemetry / audit_log / max_steps / max_visits_per_node.,Same as Workflow.chain.

Each step can be an async def, a sync function, an Agent, or a nested Workflow. Same coercion rules as Workflow.chain.


Example. Fan-out / merge

from loomflow import Agent, Workflow researcher = Agent("Find sources.", model="gpt-4.1-mini", tools=[search]) critic = Agent("Identify weak claims.", model="gpt-4.1-mini") budgeter = Agent("Estimate effort.", model="gpt-4.1-mini") def combine(results: list[str]) -> dict: return { "research": results[0], "criticism": results[1], "budget": results[2], } wf = Workflow.parallel( [researcher, critic, budgeter], merge=combine, name="brainstorm_pass", ) result = await wf.run("Should we adopt agent harnesses?", user_id="alice") print(result.output) # {'research': '...', 'criticism': '...', 'budget': '...'}

The three Agents see the same prompt and run concurrently. The merge function builds a structured result.


Example. No merge (default returns the list)

wf = Workflow.parallel([fn_a, fn_b, fn_c]) result = await wf.run(input) print(result.output) # [<fn_a result>, <fn_b result>, <fn_c result>]

When merge=None, the workflow’s output is just the list of results in input order. Useful when downstream code (or a wrapping chain step) handles the aggregation.


Concurrency semantics

  • Steps run inside one anyio.create_task_group. If any step raises, the task group cancels the others and the workflow propagates the exception.
  • Result order matches input order, regardless of completion order. results[i] is the i-th step’s return value.
  • A slow step blocks the merge. Total wall-clock is max(step_durations), not sum(step_durations).

For “best-of-3” style patterns where you want the fastest result and don’t need the others, use anyio.fail_after(...) or implement a custom async def step that races with anyio.create_task_group.


Sub-Agents inside a parallel run

When the steps are Agent instances, each agent runs as a separate sub-run with the same user_id / session_id. The audit log records each agent’s run_started / run_completed independently. Same user_id, same session_id, but distinct sub-run_ids nested under the workflow’s session.

wf = Workflow.parallel([agent_a, agent_b, agent_c]) result = await wf.run("...", user_id="alice", session_id="bb_42") # Audit log will have: # workflow_started | user_id=alice session_id=bb_42 # step_started node=fan_out # run_started | user_id=alice session_id=bb_42 (agent_a's sub-run) # run_started | user_id=alice session_id=bb_42 (agent_b's sub-run) # run_started | user_id=alice session_id=bb_42 (agent_c's sub-run) # ... (per-agent tool calls) # run_completed × 3 # step_completed node=fan_out # workflow_completed

When to reach for parallel

  • Brainstorm passes. Get N perspectives on the same prompt then synthesize.
  • Multi-source retrieval. Fan out to N search backends, merge hits.
  • Validation cross-check. Run a result through N validators (regex / schema / LLM) and aggregate.

For “debate” / “actor-critic” / “tree-of-thoughts” style multi-pass exploration, use the architectures (MultiAgentDebate / ActorCritic / TreeOfThoughts). They ship the iteration logic. Workflow.parallel is the right primitive when you want a one-shot fan-out + merge.

Steps that share state break the contract. Workflow.parallel assumes each step is independent given the input. If two steps mutate a shared mutable, results are nondeterministic. For shared state, use Workflow.chain (sequential) or serialize the access yourself.

Last updated on