This document covers the lifecycle of a single council run: how to start one, wait on it, cancel or terminate it, look up a run by id, validate before spawning, group runs under a caller-owned supervisor, and tune the per-member retry policy.
For integration guides see:
docs/RUNNING_IN_PHOENIX.md— LiveView, channels, controllers, PubSub adapter, supervisor patterns.docs/RUNNING_WITH_OBAN.md— background jobs, linking semantics, whole-job retry strategy.
Async start and await
CouncilEx.start/3 spawns a RunServer GenServer unsupervised and
returns {:ok, pid} immediately. The pid is the primary handle:
{:ok, pid} = CouncilEx.start(MyCouncil, input)
# Block until the run finishes:
{:ok, result} = CouncilEx.await(pid)To subscribe to PubSub progress events before blocking, pass
subscribe: true and read the run_id from the pid. This subscribes
the calling process to the run topic before the RunServer starts,
eliminating the race where early events fire before a later subscribe
call lands:
{:ok, pid} = CouncilEx.start(MyCouncil, input, subscribe: true)
run_id = CouncilEx.RunServer.run_id(pid)
receive do
{:round_completed, ^run_id, round_name, _rr} ->
IO.puts("round done: #{round_name}")
end
{:ok, result} = CouncilEx.await(pid)The synchronous CouncilEx.run/3 is a thin start_link + await
wrapper. Use it when you want a single blocking call:
{:ok, result} = CouncilEx.run(MyCouncil, input)start/3 vs start_link/3
start/3 and start_link/3 mirror GenServer.start/3 and
GenServer.start_link/3:
start/3— unsupervised, unlinked. Caller owns the pid. Lose the reference and the run leaks silently. Returns{:ok, pid}.start_link/3— linked to the calling process. Caller dies → run dies; run crashes → caller gets an exit signal (unless trapping). Returns{:ok, pid}. This is whatCouncilEx.run/3uses internally.
The pid is the primary handle. Communicate with the run via message passing or the public API functions below.
# Unsupervised — caller is responsible for the pid
{:ok, pid} = CouncilEx.start(MyCouncil, %{question: q})
ref = Process.monitor(pid)
# Get the run_id when needed (PubSub subscribe, persistence, recovery):
run_id = CouncilEx.RunServer.run_id(pid)
# Or supply your own run_id at start time:
{:ok, pid} = CouncilEx.start(MyCouncil, input, run_id: "my-id")Handle forms: pid vs run_id
await/2, cancel/2, and terminate_run/2 each accept either form:
- pid — direct call, no Registry lookup.
- run_id (binary) — looks up the pid via the Registry. Use this when recovering a handle from persistence or a different process.
pid_for/1 looks up the runner pid for a known run_id:
{:ok, pid} = CouncilEx.pid_for(run_id)
# {:error, :unknown_run} — no registry entry
# {:error, :runner_dead} — stale entry, process already goneawait/2
Block the calling process until the run finishes. Returns the
CouncilEx.Result:
{:ok, result} = CouncilEx.await(pid)
{:error, result} = CouncilEx.await(run_id) # on failure
{:error, :unknown_run} = CouncilEx.await(run_id) # run never started or already reaped
{:error, :timeout} = CouncilEx.await(pid, await_timeout: 5_000)The default await_timeout is :infinity. Pass a millisecond integer
to bound the wait (mainly useful with run/3):
{:ok, result} = CouncilEx.run(MyCouncil, input, await_timeout: :timer.minutes(3))Cancellation (cooperative)
cancel/2 sends a cast to the RunServer. The runner drains the
in-flight round, finalizes the run with status: :error, and appends
a %CouncilEx.Error{kind: :cancelled} as the last entry in
result.errors:
:ok = CouncilEx.cancel(pid)
{:error, %CouncilEx.Result{errors: errs}} = CouncilEx.await(pid)
true = Enum.any?(errs, &(&1.kind == :cancelled))The pid form (GenServer.cast(pid, :cancel)) is preferred. The
run_id form resolves through the Registry. Both return:
:ok— signal delivered.{:error, :unknown_run}— no registry entry (run_id form only).{:error, :runner_dead}— stale entry or dead pid.
Cancellation cascades: the RunServer also calls CouncilEx.cancel/1
on any tracked sub-runs before finalizing.
Termination (non-cooperative)
terminate_run/2 calls Process.exit(pid, :shutdown). The process
dies immediately:
:ok = CouncilEx.terminate_run(pid)No :run_completed or :run_failed PubSub event is emitted. Monitors
that held a ref to the pid receive a :DOWN message. await/2 on
the same run_id returns {:error, :unknown_run} once the Registry
entry is cleaned up.
Use terminate_run/2 when the runner is wedged in a non-responsive
provider call or when a tenant session shuts down and you need the
process gone immediately. For user-facing cancellation (where partial
results and a :run_failed event matter), use cancel/2.
Pre-run validation
CouncilEx.validate/1 runs structural checks on any council —
module-form or %DynamicCouncil{} — without spawning a process or
charging provider tokens. start/3 runs the same check internally;
invalid councils return {:error, {:invalid_council, errs}} before
any RunServer is started.
Call it explicitly to surface config errors early (builder UIs, form-submission handlers):
case CouncilEx.validate(council) do
:ok ->
{:ok, pid} = CouncilEx.start(council, input)
{:error, errs} ->
# errs :: [%{path: [...], code: atom(), message: String.t()}]
# JSON-serializable; ready for inline form errors.
Enum.each(errs, &IO.inspect/1)
endError codes for module-form councils include :missing_provider,
:missing_model, :unknown_provider, :invalid_provider, and
:not_a_council. %DynamicCouncil{} validation adds :empty,
:duplicate_id, :unknown, :conflict,
:required_when_member_unspecified, and related structural codes.
Run lifecycle summary
start/3 → RunServer.init/1
→ broadcast :run_started
→ handle_continue {:run_next_round, 0}
each round: → broadcast :round_started
→ broadcast :member_started (per member)
→ Task.Supervisor.async_nolink (round task)
→ broadcast :member_completed (per member)
→ broadcast :round_completed
→ handle_continue {:run_next_round, idx+1}
no more rounds: → finalize :completed
→ broadcast :run_completed
→ reply to awaiters
→ Process.send_after self(), :reap, 60_000
any member errors: → finalize :failed (or continue per failure_mode)
→ broadcast :run_failed
cancel/2: → finalize :cancelled
→ broadcast :run_failed (with %Error{kind: :cancelled})
terminate_run/2: → Process.exit(:shutdown) — no terminal broadcastAfter a run finalizes, the RunServer stays alive for ~60 seconds to
serve late fetch_result calls from await/2 before auto-reaping.
Optional run grouping with CouncilEx.Supervisor
By default start/3 is unsupervised — the caller owns the pid. For
group management (bulk-terminate, tenant isolation, in-flight
visibility), place an instance of CouncilEx.Supervisor in your
supervision tree:
# In your Application children list:
{CouncilEx.Supervisor, name: MyApp.RunSup}Then start runs against it:
{:ok, pid} =
CouncilEx.Supervisor.start_link(MyApp.RunSup, MyCouncil, input,
subscribe: true
)
run_id = CouncilEx.RunServer.run_id(pid)
# Bulk teardown — terminates every run under the supervisor:
:ok = CouncilEx.Supervisor.terminate_all(MyApp.RunSup)Children are :temporary — the supervisor does not restart a crashed
run. Re-running a partially-completed council from scratch wastes
tokens and produces unpredictable state.
cancel/2 and terminate_run/2 work the same regardless of where a
run is hosted — they resolve by pid or run_id, not by supervisor.
For per-tenant isolation (each tenant gets their own supervisor and
registry), see the Tenant isolation section in
docs/RUNNING_WITH_OBAN.md.
Retry policy
Retry is applied per member call, not per run. The policy resolves in this priority order (highest first):
- Per-member
retry:opt declared on the member line. - Runtime
retry:opt passed torun/3orstart/3. - App-wide config:
config :council_ex, default_retry: {max, base_ms}. - Built-in default:
{2, 1000}(2 attempts, 1 second base backoff).
# Per-member:
member :analyst, MyAnalyst,
provider: :open_router,
model: "openai/gpt-4o",
retry: {3, 500}
# Runtime override (applies to all members in the run):
CouncilEx.run(MyCouncil, input, retry: {1, 0})
# App-wide:
config :council_ex, default_retry: {2, 1000}The policy format is {max_attempts, base_backoff_ms} where
max_attempts >= 1 and base_backoff_ms >= 0. Backoff uses
exponential jitter: base * 2^(attempt-1) + rand(base/2).
Only :transient errors retry. CouncilEx.Error.retryable?/1 is
the canonical predicate:
CouncilEx.Error.retryable?(%CouncilEx.Error{kind: :transient}) # true
CouncilEx.Error.retryable?(%CouncilEx.Error{kind: :permanent}) # false
CouncilEx.Error.retryable?(%CouncilEx.Error{kind: :validation}) # false
CouncilEx.Error.retryable?(%CouncilEx.Error{kind: :cancelled}) # false| Kind | Retryable? | Examples |
|---|---|---|
:transient | yes | HTTP 5xx, 429 rate limit, transport errors |
:permanent | no | Auth failures, 4xx (other than 429), config bugs |
:validation | no | Output schema validation failures |
:cancelled | no | cancel/2 was called |
:timeout (legacy) | no | M1-era; use :transient for new code |
Whole-run retry (e.g. re-enqueueing an Oban job) is a separate
concern. When deciding whether to retry at the job level, inspect
result.errors and consult CouncilEx.Error.retryable?/1 per error —
see docs/RUNNING_WITH_OBAN.md for the
recommended pattern.
See also
CouncilEx:start/3,start_link/3,run/3,await/2,cancel/2,terminate_run/2,validate/1CouncilEx.RunServer:run_id/1,pid_for/2,state/2CouncilEx.Supervisor: group lifecycleCouncilEx.Error:retryable?/1, error kindsdocs/RUNNING_IN_PHOENIX.md: LiveView, channels, PubSub, anti-patternsdocs/RUNNING_WITH_OBAN.md: background jobs, linking, whole-job retry