Async process orchestrator and telemetry for Elixir.

Arrea is an OTP-based library that provides parallel process execution, worker management, circuit breaker protection, command validation, and built-in telemetry for monitoring your Elixir applications.

Quick Start

Add Arrea to your mix.exs:

def deps do
  [
    {:arrea, "~> 0.1.0"}
  ]
end

Arrea starts its supervision tree automatically when added as a dependency. No manual setup required.

Execute a single command

# Shell command
{:ok, result} = Arrea.execute("echo hello")

# Or a function
{:ok, result} = Arrea.execute(fn -> :work end)

Run commands in parallel

{:ok, result} = Arrea.run(
  [
    fn -> Process.sleep(100); 1 end,
    fn -> Process.sleep(100); 2 end,
    fn -> Process.sleep(100); 3 end
  ],
  workers: 2
)

Subscribe to events

:ok = Arrea.subscribe()

receive do
  {:leader_event, %{type: :finished, worker_id: id}} ->
    IO.puts("Worker #{id} finished")
  {:leader_event, event} ->
    IO.inspect(event, label: "Event")
end

:ok = Arrea.unsubscribe()

Get stats

{:ok, stats} = Arrea.stats()
# => %{
#      total_workers: 10,
#      active_workers: 3,
#      completed_tasks: 42,
#      failed_tasks: 2
#    }

Features

  • Parallel execution — Run commands and functions concurrently with configurable worker pools via Arrea.run/2
  • Synchronous execution — Execute single commands with Arrea.execute/2, including shell integration with real timeout cancellation
  • Circuit breaker — Protect external calls with automatic open/close/half-open state transitions to prevent cascading failures
  • Command validation — Built-in validation rules blocking dangerous commands (rm -rf, sudo, mkfs, fork bombs, injection patterns)
  • Telemetry — Rich event system with worker lifecycle, task progress, system metrics, and circuit breaker state tracking
  • Error policies — Configurable error handling: retry, stop, continue, or custom handlers with retry counts and delays
  • Worker monitoring — Subscribe to real-time events: worker start, completion, failure, and progress updates
  • Batch execution — Submit command batches with worker limits and per-worker timeouts
  • ASDF/mise integration — Runtime version management via asdf or mise with support for --asdf-<runtime> CLI flags and mise exec wrapping
  • Custom shell — Configurable shell per-command (--shell), via config (Arrea.Config.set(:shell, ...)), or auto-detected from $SHELL with automatic config file sourcing
  • Structured resultsArrea.Result and Arrea.Error structs for consistent return types

CLI

Arrea includes a command-line interface built with Alaja:

# Build the escript
mix escript.build

# Run locally
./arrea run --command "echo hello"

# Install to ~/bin
mix install

arrea run

Execute shell commands in parallel with progress tracking.

# Single command
arrea run --command "echo hello"

# Multiple commands (parallel)
arrea run --command "echo a" --command "echo b"

# With worker limit
arrea run --command "sleep 1" --command "sleep 2" --parallel 2

# Custom timeout (ms)
arrea run --command "sleep 10" --timeout 5000

# Quiet mode (suppress progress)
arrea run --command "echo done" --quiet

# Custom shell
arrea run --command "echo $0" --shell zsh

# With ASDF version
arrea run --command "mix test" --asdf-elixir 1.18.0

# With mise version
arrea run --command "node -v" --mise-node 20.0.0

arrea config

Manage Arrea engine configuration at runtime.

# Show all config
arrea config --show

# Get a value
arrea config get max_workers

# Set a value
arrea config set max_workers 200
arrea config set default_policy stop
arrea config set asdf_enabled true
arrea config set log_level debug

arrea action

Execute Arrea commands from JSON input (stdin, file, or inline).

# From stdin
echo '{"command":"run","args":["--command","echo hello"]}' | arrea action

# From file
arrea action --file ./pipeline.json

# Inline JSON
arrea action --data '{"command":"run","args":["--command","echo hi","--quiet"]}'

# Batch actions
arrea action --data '{
  "actions": [
    {"command": "run", "args": ["--command", "echo first"], "order": 0},
    {"command": "run", "args": ["--command", "echo second", "--quiet"], "order": 1}
  ]
}'

Architecture


                    Arrea (Facade)                    

                        

              Arrea.Leader (GenServer)                
     Coordinates execution, manages workers,          
     broadcasts {:leader_event, event} to subscribers 

                        

       Arrea.WorkerSupervisor (DynamicSupervisor)     
              Spawns ephemeral workers                

                        

            Arrea.Worker (GenServer)                  
     Executes individual tasks, handles policies,     
     reports progress via Leader                      


  Arrea.Monitor (GenServer)    Tracks worker lifecycle, provides stats
  Arrea.CircuitBreaker         Fault tolerance for external dependencies

All processes run under Arrea.Supervisor with :rest_for_one strategy, using two Registries (Arrea.Registry for workers, Arrea.CircuitBreaker.Registry for circuit breakers). With :rest_for_one, only the processes that depend on a failed process are restarted, minimizing disruption to active batches.

API

Arrea.execute/2

Execute a single command (binary shell command or zero-arity function).

@spec execute(binary() | (-> term()), keyword()) ::
        {:ok, Arrea.Result.t()} | {:error, Arrea.Error.t()}

Options:

  • :timeout — Timeout in ms (default: 30_000). Real timeout: cancels execution if exceeded.
  • :retry — Whether to retry on failure
  • :shell — Shell to use — highest priority, overrides config and $SHELL
  • :shell_config — Path to shell config file to source (auto-detected by default)
  • :asdf_<runtime> — Pin runtime version via asdf/mise (e.g. asdf_elixir: "1.18.0")
  • :mise_<runtime> — Use mise exec wrapping (e.g. mise_node: "20.0.0")

Arrea.run/2

Execute multiple commands in parallel.

@spec run([binary() | (-> term())], keyword()) ::
        {:ok, Arrea.Result.t()} | {:error, Arrea.Error.t()}

Options:

  • :workers — Max parallel workers (default: max_workers())
  • :timeout — Total timeout in ms

Arrea.subscribe/0 / Arrea.unsubscribe/0

Subscribe (or unsubscribe) the calling process to Leader events.

Messages received are {:leader_event, event} where event is a map with at least a :type key:

TypeExtra keys
:worker_startedworker_id
:progressworker_id, percent, task_index, total
:finishedworker_id
:errorworker_id, reason
:resultworker_id, data
@spec subscribe() :: :ok
@spec unsubscribe() :: :ok

Arrea.stats/0

Get current engine statistics (provided by Arrea.Monitor).

@spec stats() :: {:ok, map()} | {:error, :monitor_unavailable}

Arrea.max_workers/0

Get the configured max workers.

@spec max_workers() :: non_neg_integer()

Configuration

Priority (lowest to highest)

For use as a library:

  1. @default in Arrea.Config — compile-time baseline
  2. config :arrea, :engine, [...] in the consuming project's config.exs — overrides defaults
  3. Arrea.Config.set/2 at runtime — overrides static config for the current session
  4. Opts passed directly to functions — highest priority, per-call

For use as a CLI binary:

  1. @default baseline
  2. Application env (from config.exs if applicable)
  3. arrea config set KEY VALUE — session-level, persists while the binary process is running
  4. CLI args — highest priority, per-invocation only

config.exs example

Accepts both keyword list and map:

config :arrea, :engine,
  max_workers: 100,
  max_commands_per_batch: 500,
  default_policy: :retry,
  max_retries: 3,
  retry_delay: 1_000,
  restart_limit: 3,
  circuit_breaker_threshold: 5,
  circuit_breaker_timeout: 60_000,
  validation_rules: [:no_rm_rf, :no_sudo, :no_dd, :no_mkfs, :no_fork_bomb],
  telemetry_enabled: true,
  log_level: :info
KeyTypeDefaultDescription
max_workersinteger100Maximum parallel workers
max_commands_per_batchinteger500Max commands per batch
default_policyatom:retryDefault error policy for workers
max_retriesinteger3Max retry attempts
retry_delayinteger1_000Delay between retries (ms)
restart_limitinteger3Worker restart limit
circuit_breaker_thresholdinteger5Failures before circuit opens
circuit_breaker_timeoutinteger60_000Time before half-open attempt (ms)
validation_ruleslistsee belowBlocked command patterns
asdf_enabledbooleantrueEnable ASDF version management
telemetry_enabledbooleantrueEnable telemetry
log_levelatom:infoLog verbosity
shellstringnilDefault shell (e.g. "/bin/zsh")

Validation rules (default):

  • :no_rm_rf — blocks rm -rf
  • :no_sudo — blocks sudo
  • :no_dd — blocks dd
  • :no_mkfs — blocks mkfs
  • :no_fork_bomb — blocks fork bombs

Runtime config

Arrea.Config.get(:max_workers)      # => 100
Arrea.Config.set(:max_workers, 50)  # persists for the current VM session
Arrea.Config.all()                  # => full effective config map

Telemetry Events

Arrea emits the following :telemetry events:

Worker events

EventMeasurementsMetadata
[:arrea, :worker, :started]worker_id
[:arrea, :worker, :completed]durationworker_id
[:arrea, :worker, :error]worker_id, reason
[:arrea, :worker, :message]worker_id

Task events

EventMeasurementsMetadata
[:arrea, :task, :started]
[:arrea, :task, :completed]duration
[:arrea, :task, :error]worker_id, reason

Engine events

EventMeasurementsMetadata
[:arrea, :engine, :execute, :start]command
[:arrea, :engine, :execute, :stop]durationcommand, success
[:arrea, :engine, :execute, :error]durationcommand, reason
[:arrea, :engine, :run, :start]count, workers
[:arrea, :engine, :run, :stop]batch_id

Circuit breaker events

EventMeasurementsMetadata
[:arrea, :circuit_breaker, :open]breaker_id
[:arrea, :circuit_breaker, :closed]breaker_id
[:arrea, :circuit_breaker, :trip]breaker_id, failure_count

Communication events

EventMeasurementsMetadata
[:arrea, :communication, :message_sent]
[:arrea, :communication, :message_received]
[:arrea, :communication, :error]
[:arrea, :communication, :retry]

UI events (CLI / alaja components)

EventMeasurementsMetadata
[:arrea, :ui, :render]
[:arrea, :ui, :keypress]
[:arrea, :ui, :focus_change]

Validation / Execution / System events

EventMeasurementsMetadata
[:arrea, :validation, :passed]
[:arrea, :validation, :failed]
[:arrea, :execution, :started]
[:arrea, :execution, :completed]
[:arrea, :execution, :failed]
[:arrea, :system, :started]
[:arrea, :system, :stopped]

Attaching a custom handler

:telemetry.attach(
  "my-handler",
  [:arrea, :worker, :completed],
  fn _event, measurements, metadata, _config ->
    IO.puts("Worker #{metadata.worker_id} finished in #{measurements.duration}ms")
  end,
  nil
)

Built-in metrics and debug

# Setup built-in ETS metrics (worker/task/circuit breaker counters)
Arrea.Telemetry.setup()

# Get current metrics snapshot
Arrea.Telemetry.get_current()

# Attach debug handler for development
Arrea.Telemetry.attach()

# Measure a function with telemetry
Arrea.Telemetry.measure(fn -> do_work() end, metadata: %{tag: "batch-1"})

Policies

Arrea provides configurable error policies for workers:

# Default policy (retry 3 times with 1s delay)
policy = Arrea.Policies.default()

# Strict policy (stop on first error)
policy = Arrea.Policies.strict()

# Tolerant policy (retry up to 10 times with 2s delay)
policy = Arrea.Policies.tolerant(max_retries: 10, retry_delay: 2000)

# Custom handler
policy = Arrea.Policies.custom(fn error, retry_count, context ->
  if retry_count < 5, do: :retry, else: :stop
end)

Workers without an explicit policy fall back to Arrea.Config.get(:default_policy), which defaults to :retry.

Policy maps support the following fields:

FieldTypeDefaultDescription
on_error:retry | :stop | :continue | function:retryAction on task error
on_warning:log | :notify | :continue | :promote_to_error:logAction on warning
on_timeout:retry | :stop | :continue:retryAction on timeout
max_retriesinteger3Maximum retry attempts
retry_delayinteger1000Delay between retries (ms)
timeoutinteger30000Per-task timeout (ms)

Command Validation

Arrea validates all shell commands before execution, blocking dangerous patterns:

iex> Arrea.Validation.Validator.validate_command("echo hello")
{:ok, "echo hello"}

iex> Arrea.Validation.Validator.validate_command("rm -rf /")
{:error, {:dangerous_command, "rm -rf"}}

iex> Arrea.Validation.Validator.validate_command("$(whoami)")
{:error, :possible_injection}

Inter-worker Messaging

Workers can send messages to each other:

# Structured message
Arrea.Worker.send_message(:worker_1, %{type: :ping})

# Route a message to another worker
Arrea.Worker.send_message(:worker_1, {:send_to_worker, :worker_2, %{type: :data, value: 42}})

Dependencies

  • alaja — Internal UI/CLI utility library (powers the Arrea CLI)
  • jason — JSON encoding/decoding
  • telemetry — Event emission and handling
  • telemetry_metrics — Metric definitions
  • telemetry_poller — Periodic metric collection

Installation

Add arrea to your mix.exs dependencies:

def deps do
  [
    {:arrea, "~> 0.1.0"}
  ]
end

Then run:

mix deps.get

License

MIT License. See the source repository for details.