This guide covers the canonical pattern for executing council runs from inside a job worker (Oban, Exq, Broadway producer, or your own queue).
The TL;DR:
Use
CouncilEx.run/3(synchronous) from inside the workerperform/1callback. It callsstart_link/3internally, so worker timeout / kill propagates to the runner — no orphan tokens.
Why linking matters
Oban (and most job systems) implement timeouts by killing the worker process. The runner must be linked to the worker pid so the kill propagates — otherwise the runner keeps running long after the worker is gone, burning provider tokens for a result no one will ever read.
CouncilEx.run/3 (sync) and CouncilEx.start_link/3 (async) both
satisfy this. CouncilEx.start/3 does not — it spawns unlinked, so
worker death leaves the runner orphaned. Don't use start/3 from
inside an Oban worker unless you handle cleanup yourself.
Canonical Oban worker
defmodule MyApp.Workers.CouncilJob do
use Oban.Worker,
queue: :councils,
# Timeout = max wall-clock for the run. Leave a buffer over your
# member_timeout_ms × max_attempts × round_count.
timeout: :timer.minutes(10),
# Don't auto-retry by default; councils are expensive. Let Oban
# surface the error and decide manually whether to re-enqueue.
max_attempts: 1
@impl Oban.Worker
def perform(%Oban.Job{args: %{"council_id" => cid, "input" => input} = args}) do
council = MyApp.Councils.lookup!(cid)
# Validate before spawning. `start/3` would also gate on this,
# but doing it here lets us return a clean Oban discard for
# config bugs that should never be retried.
case CouncilEx.validate(council) do
:ok -> :continue
{:error, errs} -> {:cancel, {:invalid_council, errs}}
end
|> case do
:continue -> do_run(council, input, args)
cancel -> cancel
end
end
defp do_run(council, input, args) do
# runner. Without this, killing the job leaves the runner alive.
case CouncilEx.run(council, input,
await_timeout: :timer.minutes(8),
verbose: false
) do
{:ok, %CouncilEx.Result{} = result} ->
MyApp.Persistence.save_run(args, result)
{:ok, result.run_id}
{:error, %CouncilEx.Result{errors: errors} = result} ->
# Run started but rounds failed. Persist what we have so the UI
# can show partial state.
MyApp.Persistence.save_run(args, result)
# Decide whether to retry based on the error kinds. See the
# "When to retry" section below.
if Enum.any?(errors, &CouncilEx.Error.retryable?/1) do
{:error, "transient failure: #{inspect(errors)}"}
else
{:cancel, {:permanent, errors}}
end
{:error, {:invalid_council, errs}} ->
# Should be caught by the pre-validate above, but belt-and-suspenders.
{:cancel, {:invalid_council, errs}}
{:error, :timeout} ->
# Hit our await_timeout but the run might still be working.
# `CouncilEx.run/3` calls `start_link/3` internally, so the
# runner dies with us. If you want to keep it alive past worker
# exit, switch to `CouncilEx.start/3` (unlinked) and call
# `CouncilEx.terminate_run/1` here instead.
{:error, :timeout}
end
end
endWhen to retry
CouncilEx.Error.retryable?/1 already classifies errors:
| Kind | Retryable? | Source |
|---|---|---|
:transient | yes | HTTP 5xx, 429, transport errors |
:permanent | no | Auth failures, 4xx (other than 429), config bugs |
:validation | no | Output schema validation failures |
:cancelled | no | cancel/1 was called |
:timeout (legacy) | no | M1-era; use :transient for new code |
Runner-level retry already kicks in inside a single member call (per
the :retry policy, default {2, 1000}). What Error.retryable?/1
tells you here is whether to re-enqueue the whole job. Typically you
only want this on :transient.
Don't set
Oban.Worker max_attempts: 5and call it a day. That retries the entire council on any error, which is expensive and often non-deterministic. Decide per-error.
Cancelling an in-flight Oban job
Oban.cancel_job(job_id) sends a kill signal to the worker pid.
Because CouncilEx.run/3 calls start_link/3 internally, the runner
dies with the worker. No extra cleanup needed.
If you've persisted the run_id and want to cancel from elsewhere
(e.g., a "Stop" button in your admin UI that cancels by run_id,
not by Oban job id):
# Cooperative: waits for in-flight round to drain, finalizes :cancelled.
:ok = CouncilEx.cancel(run_id)
# Or non-cooperative: kill via supervisor.
:ok = CouncilEx.terminate_run(run_id)Both work regardless of the Oban worker still being alive.
Tenant isolation
If you run councils for multiple tenants and want per-tenant bulk
cancel / in-flight visibility, pair Oban with a per-tenant
CouncilEx.Supervisor plus its own Registry:
defmodule MyApp.Tenants.RunSup do
# One CouncilEx.Supervisor + one Registry per tenant, registered by name.
def child_spec(tenant_id) do
%{
id: {__MODULE__, tenant_id},
start: {Supervisor, :start_link,
[[
{Registry, keys: :unique, name: registry_for(tenant_id)},
{CouncilEx.Supervisor, name: sup_for(tenant_id)}
], [strategy: :one_for_one]]},
type: :supervisor
}
end
def sup_for(tid), do: :"MyApp.Tenants.RunSup.#{tid}"
def registry_for(tid), do: :"MyApp.Tenants.RunReg.#{tid}"
end
defmodule MyApp.Workers.TenantCouncilJob do
use Oban.Worker, queue: :councils, timeout: :timer.minutes(10), max_attempts: 1
def perform(%Oban.Job{args: %{"tenant_id" => tid, "council_id" => cid, "input" => input}}) do
council = MyApp.Councils.lookup!(cid)
# Ensure the per-tenant supervisor + registry exist — typically
# started when the tenant logs in, but idempotent here.
{:ok, _} = MyApp.Tenants.ensure_started(tid)
sup = MyApp.Tenants.RunSup.sup_for(tid)
registry = MyApp.Tenants.RunSup.registry_for(tid)
# `CouncilEx.Supervisor.start_link/4` is the supervised analogue of
# `CouncilEx.start_link/3`. The runner is linked to the per-tenant
# supervisor, not the worker — so cancelling the supervisor cleans
# up every active tenant run at once. The worker waits on the
# result via `CouncilEx.await/2`.
case CouncilEx.Supervisor.start_link(sup, council, input, registry: registry) do
{:ok, pid} ->
run_id = CouncilEx.RunServer.run_id(pid)
case CouncilEx.await(pid, await_timeout: :timer.minutes(8)) do
{:ok, _result} -> {:ok, run_id}
{:error, _} = err -> err
end
err ->
err
end
end
endWhen the tenant logs out:
# Stop every active run for this tenant in one shot.
:ok = CouncilEx.Supervisor.terminate_all(MyApp.Tenants.RunSup.sup_for(tenant_id))Cross-tenant aggregation: list everything under one tenant's registry:
runs =
CouncilEx.list_active_runs(
registry: MyApp.Tenants.RunSup.registry_for(tid)
)Resuming a run after a worker crash
Out of scope. Runs are in-memory; if the BEAM node restarts mid-run, the in-flight provider call dies, and nothing in the runner checkpoints state. A durable execution layer is on the roadmap.
For now, the workaround is application-level: persist the council spec
- input keyed by
run_idon enqueue, and on Oban retry, build a fresh runner from the same inputs. You'll re-do work that already happened. That's the cost of in-memory runs.
Anti-patterns
- Don't call
CouncilEx.start/3(async, unlinked) and return immediately fromperform/1. The job will be marked complete while the run is still going, breaking Oban's job-state model, and worker death will leave the runner orphaned. Userun/3(sync) soperform/1actually blocks on the run. - Don't rely on Oban's
max_attempts: Nfor retry. The runner has per-member retry already; whole-council retry is expensive and usually wrong. Decide per-error usingError.retryable?/1. - Don't use the same
run_idacross tenants if you've enabled caller-owned:registry. Within a single registry,run_idis the unique key; across registries, the runtime has no idea who owns what.
See also
docs/RUNNING_IN_PHOENIX.md§10: caller-owned supervision viaCouncilEx.Supervisor.README.md— Running councilsCouncilEx.Error: error classification contract;retryable?/1is the canonical predicate.