Running councils — core API reference

Copy Markdown View Source

This document covers the lifecycle of a single council run: how to start one, wait on it, cancel or terminate it, look up a run by id, validate before spawning, group runs under a caller-owned supervisor, and tune the per-member retry policy.

For integration guides see:


Async start and await

CouncilEx.start/3 spawns a RunServer GenServer unsupervised and returns {:ok, pid} immediately. The pid is the primary handle:

{:ok, pid} = CouncilEx.start(MyCouncil, input)

# Block until the run finishes:
{:ok, result} = CouncilEx.await(pid)

To subscribe to PubSub progress events before blocking, pass subscribe: true and read the run_id from the pid. This subscribes the calling process to the run topic before the RunServer starts, eliminating the race where early events fire before a later subscribe call lands:

{:ok, pid} = CouncilEx.start(MyCouncil, input, subscribe: true)
run_id = CouncilEx.RunServer.run_id(pid)

receive do
  {:round_completed, ^run_id, round_name, _rr} ->
    IO.puts("round done: #{round_name}")
end

{:ok, result} = CouncilEx.await(pid)

The synchronous CouncilEx.run/3 is a thin start_link + await wrapper. Use it when you want a single blocking call:

{:ok, result} = CouncilEx.run(MyCouncil, input)

start/3 and start_link/3 mirror GenServer.start/3 and GenServer.start_link/3:

  • start/3 — unsupervised, unlinked. Caller owns the pid. Lose the reference and the run leaks silently. Returns {:ok, pid}.
  • start_link/3 — linked to the calling process. Caller dies → run dies; run crashes → caller gets an exit signal (unless trapping). Returns {:ok, pid}. This is what CouncilEx.run/3 uses internally.

The pid is the primary handle. Communicate with the run via message passing or the public API functions below.

# Unsupervised — caller is responsible for the pid
{:ok, pid} = CouncilEx.start(MyCouncil, %{question: q})
ref = Process.monitor(pid)

# Get the run_id when needed (PubSub subscribe, persistence, recovery):
run_id = CouncilEx.RunServer.run_id(pid)

# Or supply your own run_id at start time:
{:ok, pid} = CouncilEx.start(MyCouncil, input, run_id: "my-id")

Handle forms: pid vs run_id

await/2, cancel/2, and terminate_run/2 each accept either form:

  • pid — direct call, no Registry lookup.
  • run_id (binary) — looks up the pid via the Registry. Use this when recovering a handle from persistence or a different process.

pid_for/1 looks up the runner pid for a known run_id:

{:ok, pid} = CouncilEx.pid_for(run_id)
# {:error, :unknown_run}  — no registry entry
# {:error, :runner_dead}  — stale entry, process already gone

await/2

Block the calling process until the run finishes. Returns the CouncilEx.Result:

{:ok, result}           = CouncilEx.await(pid)
{:error, result}        = CouncilEx.await(run_id)   # on failure
{:error, :unknown_run}  = CouncilEx.await(run_id)   # run never started or already reaped
{:error, :timeout}      = CouncilEx.await(pid, await_timeout: 5_000)

The default await_timeout is :infinity. Pass a millisecond integer to bound the wait (mainly useful with run/3):

{:ok, result} = CouncilEx.run(MyCouncil, input, await_timeout: :timer.minutes(3))

Cancellation (cooperative)

cancel/2 sends a cast to the RunServer. The runner drains the in-flight round, finalizes the run with status: :error, and appends a %CouncilEx.Error{kind: :cancelled} as the last entry in result.errors:

:ok = CouncilEx.cancel(pid)
{:error, %CouncilEx.Result{errors: errs}} = CouncilEx.await(pid)
true = Enum.any?(errs, &(&1.kind == :cancelled))

The pid form (GenServer.cast(pid, :cancel)) is preferred. The run_id form resolves through the Registry. Both return:

  • :ok — signal delivered.
  • {:error, :unknown_run} — no registry entry (run_id form only).
  • {:error, :runner_dead} — stale entry or dead pid.

Cancellation cascades: the RunServer also calls CouncilEx.cancel/1 on any tracked sub-runs before finalizing.


Termination (non-cooperative)

terminate_run/2 calls Process.exit(pid, :shutdown). The process dies immediately:

:ok = CouncilEx.terminate_run(pid)

No :run_completed or :run_failed PubSub event is emitted. Monitors that held a ref to the pid receive a :DOWN message. await/2 on the same run_id returns {:error, :unknown_run} once the Registry entry is cleaned up.

Use terminate_run/2 when the runner is wedged in a non-responsive provider call or when a tenant session shuts down and you need the process gone immediately. For user-facing cancellation (where partial results and a :run_failed event matter), use cancel/2.


Pre-run validation

CouncilEx.validate/1 runs structural checks on any council — module-form or %DynamicCouncil{} — without spawning a process or charging provider tokens. start/3 runs the same check internally; invalid councils return {:error, {:invalid_council, errs}} before any RunServer is started.

Call it explicitly to surface config errors early (builder UIs, form-submission handlers):

case CouncilEx.validate(council) do
  :ok ->
    {:ok, pid} = CouncilEx.start(council, input)

  {:error, errs} ->
    # errs :: [%{path: [...], code: atom(), message: String.t()}]
    # JSON-serializable; ready for inline form errors.
    Enum.each(errs, &IO.inspect/1)
end

Error codes for module-form councils include :missing_provider, :missing_model, :unknown_provider, :invalid_provider, and :not_a_council. %DynamicCouncil{} validation adds :empty, :duplicate_id, :unknown, :conflict, :required_when_member_unspecified, and related structural codes.


Run lifecycle summary

start/3                    RunServer.init/1
                           broadcast :run_started
                           handle_continue {:run_next_round, 0}

each round:                broadcast :round_started
                           broadcast :member_started (per member)
                           Task.Supervisor.async_nolink (round task)
                           broadcast :member_completed (per member)
                           broadcast :round_completed
                           handle_continue {:run_next_round, idx+1}

no more rounds:            finalize :completed
                           broadcast :run_completed
                           reply to awaiters
                           Process.send_after self(), :reap, 60_000

any member errors:         finalize :failed  (or continue per failure_mode)
                           broadcast :run_failed

cancel/2:                  finalize :cancelled
                           broadcast :run_failed (with %Error{kind: :cancelled})

terminate_run/2:           Process.exit(:shutdown)  no terminal broadcast

After a run finalizes, the RunServer stays alive for ~60 seconds to serve late fetch_result calls from await/2 before auto-reaping.


Optional run grouping with CouncilEx.Supervisor

By default start/3 is unsupervised — the caller owns the pid. For group management (bulk-terminate, tenant isolation, in-flight visibility), place an instance of CouncilEx.Supervisor in your supervision tree:

# In your Application children list:
{CouncilEx.Supervisor, name: MyApp.RunSup}

Then start runs against it:

{:ok, pid} =
  CouncilEx.Supervisor.start_link(MyApp.RunSup, MyCouncil, input,
    subscribe: true
  )

run_id = CouncilEx.RunServer.run_id(pid)

# Bulk teardown — terminates every run under the supervisor:
:ok = CouncilEx.Supervisor.terminate_all(MyApp.RunSup)

Children are :temporary — the supervisor does not restart a crashed run. Re-running a partially-completed council from scratch wastes tokens and produces unpredictable state.

cancel/2 and terminate_run/2 work the same regardless of where a run is hosted — they resolve by pid or run_id, not by supervisor.

For per-tenant isolation (each tenant gets their own supervisor and registry), see the Tenant isolation section in docs/RUNNING_WITH_OBAN.md.


Retry policy

Retry is applied per member call, not per run. The policy resolves in this priority order (highest first):

  1. Per-member retry: opt declared on the member line.
  2. Runtime retry: opt passed to run/3 or start/3.
  3. App-wide config: config :council_ex, default_retry: {max, base_ms}.
  4. Built-in default: {2, 1000} (2 attempts, 1 second base backoff).
# Per-member:
member :analyst, MyAnalyst,
  provider: :open_router,
  model: "openai/gpt-4o",
  retry: {3, 500}

# Runtime override (applies to all members in the run):
CouncilEx.run(MyCouncil, input, retry: {1, 0})

# App-wide:
config :council_ex, default_retry: {2, 1000}

The policy format is {max_attempts, base_backoff_ms} where max_attempts >= 1 and base_backoff_ms >= 0. Backoff uses exponential jitter: base * 2^(attempt-1) + rand(base/2).

Only :transient errors retry. CouncilEx.Error.retryable?/1 is the canonical predicate:

CouncilEx.Error.retryable?(%CouncilEx.Error{kind: :transient})  # true
CouncilEx.Error.retryable?(%CouncilEx.Error{kind: :permanent})  # false
CouncilEx.Error.retryable?(%CouncilEx.Error{kind: :validation}) # false
CouncilEx.Error.retryable?(%CouncilEx.Error{kind: :cancelled})  # false
KindRetryable?Examples
:transientyesHTTP 5xx, 429 rate limit, transport errors
:permanentnoAuth failures, 4xx (other than 429), config bugs
:validationnoOutput schema validation failures
:cancellednocancel/2 was called
:timeout (legacy)noM1-era; use :transient for new code

Whole-run retry (e.g. re-enqueueing an Oban job) is a separate concern. When deciding whether to retry at the job level, inspect result.errors and consult CouncilEx.Error.retryable?/1 per error — see docs/RUNNING_WITH_OBAN.md for the recommended pattern.


See also