Nous.Workflow.Engine.ParallelExecutor (nous v0.15.4)

View Source

Parallel fan-out/fan-in execution for workflow nodes.

Handles two parallelism patterns:

  • Static parallel (:parallel nodes) — runs named branches concurrently, each branch is a subgraph starting from a known node ID.
  • Dynamic parallel (:parallel_map nodes) — maps over a runtime-computed list, spawning one task per item.

Uses Task.Supervisor.async_stream_nolink/4 on Nous.TaskSupervisor, matching the pattern from Nous.Plugins.SubAgent.

Summary

Functions

Execute a :parallel node — static fan-out to named branches.

Execute a :parallel_map node — dynamic fan-out over runtime data.

Functions

execute_parallel(node, state, graph_nodes)

@spec execute_parallel(Nous.Workflow.Node.t(), Nous.Workflow.State.t(), map()) ::
  {:ok, term(), Nous.Workflow.State.t()} | {:error, term()}

Execute a :parallel node — static fan-out to named branches.

Each branch ID in config.branches is executed as an independent node. Results are merged using the configured strategy.

Config Keys

  • :branches — list of node IDs to run in parallel (required)
  • :merge — merge strategy: :deep_merge, :list_collect, or function (default: :list_collect)
  • :max_concurrency — max concurrent branches (default: 5)
  • :timeout — per-branch timeout in ms (default: 120000)
  • :on_branch_error:continue_others or :fail_fast (default: :continue_others)
  • :result_key — key for :list_collect results (default: :parallel_results)

execute_parallel_map(node, state)

@spec execute_parallel_map(Nous.Workflow.Node.t(), Nous.Workflow.State.t()) ::
  {:ok, term(), Nous.Workflow.State.t()} | {:error, term()}

Execute a :parallel_map node — dynamic fan-out over runtime data.

The items function extracts a list from the current state. Each item is processed by the handler function in parallel.

Config Keys

  • :items — function (state -> list) producing items at runtime (required)
  • :handler — function (item, state -> result) processing each item (required)
  • :max_concurrency — max concurrent tasks (default: 5)
  • :timeout — per-item timeout in ms (default: 120000)
  • :on_error:collect or :fail_fast (default: :collect)
  • :result_key — key to store results under in state.data (default: :map_results)