Running CouncilEx from Oban (or any background-job system)

Copy Markdown View Source

This guide covers the canonical pattern for executing council runs from inside a job worker (Oban, Exq, Broadway producer, or your own queue).

The TL;DR:

Use CouncilEx.run/3 (synchronous) from inside the worker perform/1 callback. It calls start_link/3 internally, so worker timeout / kill propagates to the runner — no orphan tokens.

Why linking matters

Oban (and most job systems) implement timeouts by killing the worker process. The runner must be linked to the worker pid so the kill propagates — otherwise the runner keeps running long after the worker is gone, burning provider tokens for a result no one will ever read.

CouncilEx.run/3 (sync) and CouncilEx.start_link/3 (async) both satisfy this. CouncilEx.start/3 does not — it spawns unlinked, so worker death leaves the runner orphaned. Don't use start/3 from inside an Oban worker unless you handle cleanup yourself.

Canonical Oban worker

defmodule MyApp.Workers.CouncilJob do
  use Oban.Worker,
    queue: :councils,
    # Timeout = max wall-clock for the run. Leave a buffer over your
    # member_timeout_ms × max_attempts × round_count.
    timeout: :timer.minutes(10),
    # Don't auto-retry by default; councils are expensive. Let Oban
    # surface the error and decide manually whether to re-enqueue.
    max_attempts: 1

  @impl Oban.Worker
  def perform(%Oban.Job{args: %{"council_id" => cid, "input" => input} = args}) do
    council = MyApp.Councils.lookup!(cid)

    # Validate before spawning. `start/3` would also gate on this,
    # but doing it here lets us return a clean Oban discard for
    # config bugs that should never be retried.
    case CouncilEx.validate(council) do
      :ok -> :continue
      {:error, errs} -> {:cancel, {:invalid_council, errs}}
    end
    |> case do
      :continue -> do_run(council, input, args)
      cancel -> cancel
    end
  end

  defp do_run(council, input, args) do
    # runner. Without this, killing the job leaves the runner alive.
    case CouncilEx.run(council, input,
           await_timeout: :timer.minutes(8),
           verbose: false
         ) do
      {:ok, %CouncilEx.Result{} = result} ->
        MyApp.Persistence.save_run(args, result)
        {:ok, result.run_id}

      {:error, %CouncilEx.Result{errors: errors} = result} ->
        # Run started but rounds failed. Persist what we have so the UI
        # can show partial state.
        MyApp.Persistence.save_run(args, result)

        # Decide whether to retry based on the error kinds. See the
        # "When to retry" section below.
        if Enum.any?(errors, &CouncilEx.Error.retryable?/1) do
          {:error, "transient failure: #{inspect(errors)}"}
        else
          {:cancel, {:permanent, errors}}
        end

      {:error, {:invalid_council, errs}} ->
        # Should be caught by the pre-validate above, but belt-and-suspenders.
        {:cancel, {:invalid_council, errs}}

      {:error, :timeout} ->
        # Hit our await_timeout but the run might still be working.
        # `CouncilEx.run/3` calls `start_link/3` internally, so the
        # runner dies with us. If you want to keep it alive past worker
        # exit, switch to `CouncilEx.start/3` (unlinked) and call
        # `CouncilEx.terminate_run/1` here instead.
        {:error, :timeout}
    end
  end
end

When to retry

CouncilEx.Error.retryable?/1 already classifies errors:

KindRetryable?Source
:transientyesHTTP 5xx, 429, transport errors
:permanentnoAuth failures, 4xx (other than 429), config bugs
:validationnoOutput schema validation failures
:cancellednocancel/1 was called
:timeout (legacy)noM1-era; use :transient for new code

Runner-level retry already kicks in inside a single member call (per the :retry policy, default {2, 1000}). What Error.retryable?/1 tells you here is whether to re-enqueue the whole job. Typically you only want this on :transient.

Don't set Oban.Worker max_attempts: 5 and call it a day. That retries the entire council on any error, which is expensive and often non-deterministic. Decide per-error.

Cancelling an in-flight Oban job

Oban.cancel_job(job_id) sends a kill signal to the worker pid. Because CouncilEx.run/3 calls start_link/3 internally, the runner dies with the worker. No extra cleanup needed.

If you've persisted the run_id and want to cancel from elsewhere (e.g., a "Stop" button in your admin UI that cancels by run_id, not by Oban job id):

# Cooperative: waits for in-flight round to drain, finalizes :cancelled.
:ok = CouncilEx.cancel(run_id)

# Or non-cooperative: kill via supervisor.
:ok = CouncilEx.terminate_run(run_id)

Both work regardless of the Oban worker still being alive.

Tenant isolation

If you run councils for multiple tenants and want per-tenant bulk cancel / in-flight visibility, pair Oban with a per-tenant CouncilEx.Supervisor plus its own Registry:

defmodule MyApp.Tenants.RunSup do
  # One CouncilEx.Supervisor + one Registry per tenant, registered by name.
  def child_spec(tenant_id) do
    %{
      id: {__MODULE__, tenant_id},
      start: {Supervisor, :start_link,
              [[
                {Registry, keys: :unique, name: registry_for(tenant_id)},
                {CouncilEx.Supervisor, name: sup_for(tenant_id)}
              ], [strategy: :one_for_one]]},
      type: :supervisor
    }
  end

  def sup_for(tid),      do: :"MyApp.Tenants.RunSup.#{tid}"
  def registry_for(tid), do: :"MyApp.Tenants.RunReg.#{tid}"
end

defmodule MyApp.Workers.TenantCouncilJob do
  use Oban.Worker, queue: :councils, timeout: :timer.minutes(10), max_attempts: 1

  def perform(%Oban.Job{args: %{"tenant_id" => tid, "council_id" => cid, "input" => input}}) do
    council = MyApp.Councils.lookup!(cid)

    # Ensure the per-tenant supervisor + registry exist — typically
    # started when the tenant logs in, but idempotent here.
    {:ok, _} = MyApp.Tenants.ensure_started(tid)

    sup      = MyApp.Tenants.RunSup.sup_for(tid)
    registry = MyApp.Tenants.RunSup.registry_for(tid)

    # `CouncilEx.Supervisor.start_link/4` is the supervised analogue of
    # `CouncilEx.start_link/3`. The runner is linked to the per-tenant
    # supervisor, not the worker — so cancelling the supervisor cleans
    # up every active tenant run at once. The worker waits on the
    # result via `CouncilEx.await/2`.
    case CouncilEx.Supervisor.start_link(sup, council, input, registry: registry) do
      {:ok, pid} ->
        run_id = CouncilEx.RunServer.run_id(pid)

        case CouncilEx.await(pid, await_timeout: :timer.minutes(8)) do
          {:ok, _result} -> {:ok, run_id}
          {:error, _} = err -> err
        end

      err ->
        err
    end
  end
end

When the tenant logs out:

# Stop every active run for this tenant in one shot.
:ok = CouncilEx.Supervisor.terminate_all(MyApp.Tenants.RunSup.sup_for(tenant_id))

Cross-tenant aggregation: list everything under one tenant's registry:

runs =
  CouncilEx.list_active_runs(
    registry: MyApp.Tenants.RunSup.registry_for(tid)
  )

Resuming a run after a worker crash

Out of scope. Runs are in-memory; if the BEAM node restarts mid-run, the in-flight provider call dies, and nothing in the runner checkpoints state. A durable execution layer is on the roadmap.

For now, the workaround is application-level: persist the council spec

  • input keyed by run_id on enqueue, and on Oban retry, build a fresh runner from the same inputs. You'll re-do work that already happened. That's the cost of in-memory runs.

Anti-patterns

  • Don't call CouncilEx.start/3 (async, unlinked) and return immediately from perform/1. The job will be marked complete while the run is still going, breaking Oban's job-state model, and worker death will leave the runner orphaned. Use run/3 (sync) so perform/1 actually blocks on the run.
  • Don't rely on Oban's max_attempts: N for retry. The runner has per-member retry already; whole-council retry is expensive and usually wrong. Decide per-error using Error.retryable?/1.
  • Don't use the same run_id across tenants if you've enabled caller-owned :registry. Within a single registry, run_id is the unique key; across registries, the runtime has no idea who owns what.

See also