Arrea.Parallel (Arrea v1.0.0)

Copy Markdown View Source

Parallel execution module for commands and functions.

Provides a high-level API for concurrent execution with worker management, coordination via Leader, and monitoring.

Shell options (:shell, :shell_config) and asdf/mise options (asdf_<language>, mise_<language>) are automatically inherited from Arrea.Command.resolve_shell/1 and Arrea.Command.build_full_command/2, allowing the use of the same custom shell or runtime versions in parallel executions.

Summary

Functions

Executes a single command synchronously.

Gets the current state of the Monitor.

Executes multiple commands in parallel via Leader.

Executes multiple commands in parallel returning a stream of individual results as they complete, including the duration of each.

Executes multiple commands and waits for all results synchronously.

Executes multiple commands in parallel returning a list of {idx, cmd, task} where each task is a Task that can be monitored with Task.yield_many/2.

Subscribes the current process to Monitor updates.

Functions

execute(cmd, opts \\ [])

@spec execute(
  binary(),
  keyword()
) :: {:ok, map()} | {:error, term()}

Executes a single command synchronously.

Options

  • :timeout — Timeout in milliseconds (default: 30_000)
  • :shell — Shell to use (default: resolved automatically)
  • :shell_config — Path to shell config file
  • :asdf_elixir, :asdf_erlang, etc. — Versions via ASDF (environment variable)
  • :mise_node, :mise_elixir, etc. — Versions via mise exec

Examples

iex> Arrea.Parallel.execute("echo hello")
{:ok, %{stdout: "hello\n", stderr: "", exit_code: 0}}

monitor_state()

@spec monitor_state() :: map()

Gets the current state of the Monitor.

run(commands, opts \\ [])

@spec run(
  [binary() | function()],
  keyword()
) :: {:ok, binary()} | {:ok, binary(), map()} | {:error, term()}

Executes multiple commands in parallel via Leader.

Options

  • :workers — Number of parallel workers (default: 4)
  • :timeout — Timeout in milliseconds (default: 30_000)

Examples

iex> Arrea.Parallel.run(["echo a", "echo b", "echo c"], workers: 2)
{:ok, batch_id}

run_stream(commands, opts \\ [])

@spec run_stream(
  [binary() | function()],
  keyword()
) :: Enumerable.t()

Executes multiple commands in parallel returning a stream of individual results as they complete, including the duration of each.

Each stream element is {index, {:ok, result} | {:error, reason}}.

Options

  • :workers — Number of parallel workers (default: 4)
  • :timeout — Timeout per command in milliseconds (default: 30_000)

run_sync(commands, opts \\ [])

@spec run_sync(
  [binary() | function() | tuple()],
  keyword()
) :: [map()]

Executes multiple commands and waits for all results synchronously.

Uses Task.async_stream/3 with max_concurrency for real sliding window: a new task starts as soon as one finishes, without waiting for the whole chunk.

Supports per-task timeout via tuple-input:

  • {command, timeout_ms} — per-task timeout
  • {:tag, command} — tag the task (tag appears in the result)
  • {:tag, command, timeout_ms} — tag + per-task timeout

Results are returned in the same order as input commands. When a tag is provided, the result is wrapped as {:tagged, tag, result} so callers can identify tasks without relying on position.

Options

  • :workers — Number of parallel workers (default: 4)
  • :timeout — Default timeout per command in ms (default: 30_000)
  • :ordered — Return results in input order (default: true)

Examples

iex> Arrea.Parallel.run_sync([fn -> 1 end, fn -> 2 end])
[{:ok, %{result: 1, exit_code: 0}}, {:ok, %{result: 2, exit_code: 0}}]

iex> Arrea.Parallel.run_sync([{:vector, fn -> 1 end}, {:bm25, fn -> 2 end}])
[{:tagged, :vector, {:ok, %{result: 1, exit_code: 0}}},
 {:tagged, :bm25, {:ok, %{result: 2, exit_code: 0}}}]

run_tasks(commands, opts \\ [])

@spec run_tasks(
  [binary()],
  keyword()
) :: [{non_neg_integer(), String.t(), Task.t()}]

Executes multiple commands in parallel returning a list of {idx, cmd, task} where each task is a Task that can be monitored with Task.yield_many/2.

Each task manages its own timeout internally.

Options

  • :workers — Number of parallel workers (default: 4)
  • :timeout — Timeout per command in milliseconds (default: 30_000)

subscribe_monitor()

@spec subscribe_monitor() :: :ok

Subscribes the current process to Monitor updates.