Nous.Workflow.Engine.ParallelExecutor (nous v0.16.2)
View SourceParallel fan-out/fan-in execution for workflow nodes.
Handles two parallelism patterns:
- Static parallel (
:parallelnodes) — runs named branches concurrently, each branch is a subgraph starting from a known node ID. - Dynamic parallel (
:parallel_mapnodes) — maps over a runtime-computed list, spawning one task per item.
Uses Task.Supervisor.async_stream_nolink/4 on Nous.TaskSupervisor,
matching the pattern from Nous.Plugins.SubAgent.
Summary
Functions
Execute a :parallel node — static fan-out to named branches.
Execute a :parallel_map node — dynamic fan-out over runtime data.
Functions
@spec execute_parallel(Nous.Workflow.Node.t(), Nous.Workflow.State.t(), map()) :: {:ok, term(), Nous.Workflow.State.t()} | {:error, term()}
Execute a :parallel node — static fan-out to named branches.
Each branch ID in config.branches is executed as an independent node.
Results are merged using the configured strategy.
Config Keys
:branches— list of node IDs to run in parallel (required):merge— merge strategy::deep_merge,:list_collect, or function (default::list_collect):max_concurrency— max concurrent branches (default: 5):timeout— per-branch timeout in ms (default: 120000):on_branch_error—:continue_othersor:fail_fast(default::continue_others):result_key— key for:list_collectresults (default::parallel_results)
@spec execute_parallel_map(Nous.Workflow.Node.t(), Nous.Workflow.State.t()) :: {:ok, term(), Nous.Workflow.State.t()} | {:error, term()}
Execute a :parallel_map node — dynamic fan-out over runtime data.
The items function extracts a list from the current state. Each item
is processed by the handler function in parallel.
Config Keys
:items— function(state -> list)producing items at runtime (required):handler— function(item, state -> result)processing each item (required):max_concurrency— max concurrent tasks (default: 5):timeout— per-item timeout in ms (default: 120000):on_error—:collector:fail_fast(default::collect):result_key— key to store results under instate.data(default::map_results)