Running CouncilEx in a Phoenix (or any long-running) app

Copy Markdown View Source

This guide covers how to embed CouncilEx into a long-running OTP app, typically a Phoenix web server, though the same patterns apply to any Application-style release (Broadway pipeline, GenStage system, GRPC server, etc.).

The TL;DR:

Add :council_ex to your deps. Use the async API (start/3 / start_link/3 + PubSub events) from your request-handling code. No setup required.

CouncilEx does not bundle a run supervisor. Runners are spawned with GenServer.start/3 (unsupervised, unlinked) or GenServer.start_link/3 (linked to caller). Need group lifecycle (tenant teardown, in-flight visibility, bulk-cancel)? Place CouncilEx.Supervisor in your tree — opt-in DynamicSupervisor.

Runs are in-memory: they don't survive a node restart. If you need cross-restart durability, layer Oban over CouncilEx.run/3 and the opt-in persistence layer (see docs/PERSISTENCE.md and docs/RUNNING_WITH_OBAN.md).

The rest of this doc unpacks the why and the how, with copy-pasteable examples.


1. The bundled supervision tree (minimal)

When :council_ex is in your mix.exs deps, the OTP application starts on boot via CouncilEx.Application and brings up:

CouncilEx top-level supervisor (rest_for_one)
 :pg group :council_ex                  # default PubSub backend
 Registry CouncilEx.Runner.Registry     # run_id → RunServer pid
 Task.Supervisor CouncilEx.Runner.TaskSupervisor

That is the whole bundled tree. There is no DynamicSupervisor for runs in core — CouncilEx.start/3 is GenServer.start/3 semantics and CouncilEx.start_link/3 is GenServer.start_link/3 semantics. The caller owns the pid.

Every call to CouncilEx.start/3 spawns a CouncilEx.RunServer GenServer registered by run_id in CouncilEx.Runner.Registry. The synchronous CouncilEx.run/3 is a thin start_link + await wrapper on top of the same machinery.

You may add to your own tree:

You should not add CouncilEx.Runner.Registry or CouncilEx.Runner.TaskSupervisor to your child list — they're already started by CouncilEx.Application.


2. Two execution shapes, picked by call site

Call siteShapeAPI
Background job, CLI, batch, short controllerSync, blockingCouncilEx.run/3
LiveView, Channel, SSE handler, long controllerAsync, event-drivenstart/3 + subscribe

The async shape is the one most Phoenix apps want, because LLM rounds take seconds-to-minutes and you don't want to hold an HTTP connection or block a LiveView process for that long.

Runs live in-memory inside a RunServer GenServer; if the BEAM node restarts mid-run, the run is lost. For durability across restarts, wrap the run in an Oban job and use the opt-in Recorder.Ecto persistence — see docs/RUNNING_WITH_OBAN.md and docs/PERSISTENCE.md.

PubSub is optional

The async shape does not require subscribing. A common pattern is an Oban worker that calls CouncilEx.start/3 and then blocks on await/2 — no PubSub plumbing, no handle_info clauses, no subscribe: true. PubSub is the right tool when a UI process needs incremental progress; for headless background work, await/2 is sufficient.

# Oban worker — no PubSub needed.
def perform(%Oban.Job{args: %{"question" => q, "run_id" => run_id}}) do
  {:ok, pid} = CouncilEx.start(MyApp.Councils.Advisor, %{question: q}, run_id: run_id)
  {:ok, %CouncilEx.Result{final: final}} = CouncilEx.await(pid, :infinity)
  MyApp.persist_answer(run_id, final)
  :ok
end

For app-owned per-member persistence (cost / tokens / latency rows written without Recorder.Ecto), attach a telemetry handler to [:council_ex, :member, :stop] — see the Telemetry-driven persistence section in docs/PERSISTENCE.md. Telemetry is the cleaner seam than PubSub for that use case: per-member granularity, no subscribe lifecycle, decoupled from any UI process.


3. LiveView: the canonical pattern

Use start/3 with subscribe: true. The LV process receives the CouncilEx.Events messages on its mailbox and updates assigns incrementally. No GenServer of your own. No polling.

The pid is the handle. Stash it in assigns alongside the run_id; pass either to cancel/2 and await/2.

defmodule MyAppWeb.CouncilLive do
  use MyAppWeb, :live_view

  alias CouncilEx.{MemberResult, Result, RoundResult, StreamChunk}

  @impl true
  def mount(_params, _session, socket) do
    {:ok,
     socket
     |> assign(
       run_pid: nil,
       run_id: nil,
       status: :idle,
       rounds: [],
       tokens: %{},
       final: nil,
       error: nil
     )}
  end

  @impl true
  def handle_event("ask", %{"question" => q}, socket) do
    # subscribe: true subscribes the LV process to the run's PubSub topic
    # BEFORE the RunServer starts, so we can't miss :run_started.
    {:ok, pid} =
      CouncilEx.start(MyApp.Councils.Advisor, %{question: q}, subscribe: true)

    run_id = CouncilEx.RunServer.run_id(pid)

    {:noreply,
     assign(socket, run_pid: pid, run_id: run_id, status: :running, error: nil)}
  end

  def handle_event("cancel", _params, %{assigns: %{run_pid: pid}} = socket)
      when is_pid(pid) do
    :ok = CouncilEx.cancel(pid)
    {:noreply, socket}
  end

  # --- run lifecycle events ---

  @impl true
  def handle_info({:run_started, run_id, _council, _input}, %{assigns: %{run_id: run_id}} = socket) do
    {:noreply, assign(socket, status: :running)}
  end

  def handle_info({:round_started, run_id, round_name, idx}, %{assigns: %{run_id: run_id}} = socket) do
    {:noreply, assign(socket, status: {:round, round_name, idx})}
  end

  def handle_info({:member_started, run_id, round, member_id}, %{assigns: %{run_id: run_id}} = socket) do
    {:noreply, assign(socket, status: {:member_started, round, member_id})}
  end

  # Streaming token chunks: only fire when a member has `stream: true`.
  def handle_info(
        {:member_token, run_id, _round, member_id, %StreamChunk{content: chunk}},
        %{assigns: %{run_id: run_id, tokens: tokens}} = socket
      ) do
    {:noreply, assign(socket, tokens: Map.update(tokens, member_id, chunk, &(&1 <> chunk)))}
  end

  def handle_info({:member_completed, run_id, _round, _id, %MemberResult{}}, %{assigns: %{run_id: run_id}} = socket) do
    {:noreply, socket}
  end

  def handle_info({:round_completed, run_id, _name, %RoundResult{} = rr}, %{assigns: %{run_id: run_id, rounds: rs}} = socket) do
    {:noreply, assign(socket, rounds: rs ++ [rr])}
  end

  def handle_info({:run_completed, run_id, %Result{} = result}, %{assigns: %{run_id: run_id}} = socket) do
    {:noreply, assign(socket, status: :completed, final: result.final)}
  end

  def handle_info({:run_failed, run_id, _errors, %Result{} = result}, %{assigns: %{run_id: run_id}} = socket) do
    {:noreply, assign(socket, status: :failed, error: result.errors)}
  end

  # Ignore late messages from a previous run (run_id mismatch).
  def handle_info({_event, _other_run_id, _ | _}, socket), do: {:noreply, socket}
end

Why subscribe: true matters

If you do:

{:ok, pid} = CouncilEx.start(council, input)         # RunServer starts here
run_id = CouncilEx.RunServer.run_id(pid)
CouncilEx.subscribe(run_id)                          # subscribe AFTER

…there is a documented race: the RunServer's init/1 and immediate handle_continue may have already broadcast :run_started and :round_started before your subscribe lands. You'll miss them.

subscribe: true does the subscribe before RunServer.start/1 is called inside CouncilEx.start/3, so the topic is live before the RunServer exists. Use it everywhere you care about the full event timeline.


4. Phoenix.Channel pattern

Same idea, slightly different plumbing. Phoenix.Channel processes also have a mailbox, so subscribe + handle_info works identically:

defmodule MyAppWeb.CouncilChannel do
  use MyAppWeb, :channel

  def join("council:lobby", _params, socket), do: {:ok, socket}

  def handle_in("ask", %{"q" => q}, socket) do
    {:ok, pid} =
      CouncilEx.start(MyApp.Councils.Advisor, %{question: q}, subscribe: true)

    run_id = CouncilEx.RunServer.run_id(pid)

    {:reply, {:ok, %{run_id: run_id}},
     socket |> assign(:run_pid, pid) |> assign(:run_id, run_id)}
  end

  def handle_info({:round_completed, run_id, name, _rr}, %{assigns: %{run_id: run_id}} = socket) do
    push(socket, "round_completed", %{name: name})
    {:noreply, socket}
  end

  def handle_info({:run_completed, run_id, %CouncilEx.Result{final: final}}, %{assigns: %{run_id: run_id}} = socket) do
    push(socket, "run_completed", %{final: final && final.content})
    {:noreply, socket}
  end

  def handle_info({:run_failed, run_id, _errs, _result}, %{assigns: %{run_id: run_id}} = socket) do
    push(socket, "run_failed", %{})
    {:noreply, socket}
  end

  def handle_info(_, socket), do: {:noreply, socket}
end

5. Phoenix.Controller / plain HTTP

Two viable shapes, depending on expected duration:

(a) Short councils (<10s, P95 well under your HTTP timeout): sync, return JSON:

def create(conn, %{"question" => q}) do
  case CouncilEx.run(MyApp.Councils.Quick, %{question: q}, await_timeout: 30_000) do
    {:ok, %CouncilEx.Result{final: final}} -> json(conn, %{answer: final.content})
    {:error, %CouncilEx.Result{errors: errs}} -> conn |> put_status(500) |> json(%{errors: inspect(errs)})
  end
end

(b) Long councils: return run_id, expose a GET /runs/:id for poll, or upgrade to SSE / LiveView / channel for push:

def create(conn, %{"question" => q}) do
  {:ok, pid} = CouncilEx.start(MyApp.Councils.Slow, %{question: q})
  run_id = CouncilEx.RunServer.run_id(pid)
  json(conn, %{run_id: run_id, status: "running"})
end

def show(conn, %{"id" => run_id}) do
  case CouncilEx.RunServer.state(run_id) do
    {:ok, summary} -> json(conn, summary)
    {:error, :not_found} -> conn |> put_status(404) |> json(%{error: "unknown run"})
  end
end

RunServer.state/2 returns a stable curated summary (run_id, council, status, current_round, rounds_completed, started_at) and is safe to call at high frequency. After a run finalizes, the RunServer stays alive for ~60s for late fetches before auto-reaping.

For SSE, subscribe inside the controller using Plug.Conn.chunk/2 in a loop driven by receive. Same event shapes as the LiveView example.


6. Sharing your Phoenix.PubSub

By default CouncilEx uses Erlang's :pg for event broadcast. This is fine and zero-config. If you'd rather route everything through your existing Phoenix.PubSub server (for cross-node delivery via Redis / PG2 adapter, unified topic plumbing, easier dashboard wiring), configure the adapter:

# application.ex children: same as any Phoenix app
{Phoenix.PubSub, name: MyApp.PubSub}
# config/runtime.exs (or config.exs)
config :council_ex, pubsub: {CouncilEx.PubSub.Phoenix, name: MyApp.PubSub}

phoenix_pubsub is optional: true in CouncilEx's deps: only pulled in if your app already depends on it. The runnable demo lives at examples/phoenix_pubsub_example.exs.

For multi-node deployments, pair the PubSub adapter with mode: :multi_node so Registry, Reliability, and Recorder also share state across replicas. See docs/PERSISTENCE.md for the full mode breakdown:

config :council_ex,
  mode: :multi_node,
  repo: MyApp.Repo,
  pubsub: {CouncilEx.PubSub.Phoenix, name: MyApp.PubSub}

7. Cancellation, timeouts, failure modes

These primitives are the same across every shape above; the call site just changes who's holding the pid (or run_id):

# Cancel mid-run. Cascades to active sub-runs.
:ok = CouncilEx.cancel(pid)               # direct, no Registry lookup
:ok = CouncilEx.cancel(run_id)            # Registry-resolved

# Force-terminate (Process.exit(:shutdown), no :run_failed event):
:ok = CouncilEx.terminate_run(pid)
:ok = CouncilEx.terminate_run(run_id)

# Per-member timeout (default 30_000ms or :default_member_timeout_ms config):
CouncilEx.start(council, input, member_timeout_ms: 60_000)

# Fail-fast vs continue (default :continue or :default_failure_mode config):
CouncilEx.start(council, input, failure_mode: :fail_fast)

# Bound your wait (sync only; default :infinity):
CouncilEx.run(council, input, await_timeout: 30_000)

For LiveView, the natural cancel hook is a "Stop" button that calls CouncilEx.cancel(pid) from handle_event/3. The RunServer kills its current round task, finalizes with status: :error and a %CouncilEx.Error{kind: :cancelled} in errors, broadcasts :run_failed, and reaps after 60s.

When the LV process itself dies, the RunServer does not automatically cancel (unless you used start_link/3): it keeps running and finalizes normally. The events go to a topic with no subscribers.

Two ways to get LV-death-cancels-run semantics:

  1. Use CouncilEx.start_link/3 — linked. LV death kills the run.
  2. Use CouncilEx.start/3 and add Process.flag(:trap_exit, true) + a terminate/2 that calls cancel/1. Be cautious: navigation away can also trigger terminate, which may not match user intent.

8. Verbose tracer in dev / staging

The verbose: true (timeline) and verbose: :debug (timeline + truncated response bodies) opts attach a CouncilEx.Verbose tracer to the run's PubSub topic and print to stdio. Pure event consumer, zero cost when off. Handy for:

  • Debugging a flaky council in staging without changing code paths.
  • Demoing in dev with iex -S mix phx.server.
# Anywhere: IEx, controller, mix task:
{:ok, pid} = CouncilEx.start(MyApp.Councils.Advisor, input, verbose: :debug)

Don't ship verbose: to production HTTP requests: it pipes to :stdio, which means your structured Phoenix logs get interleaved with raw LLM output. Either omit it, route to a file via verbose_io:, or build a custom tracer that emits structured Logger events (the CouncilEx.Verbose source is ~230 lines and a fine starting template).


9. Anti-patterns

  • Don't wrap CouncilEx.run/3 in your own GenServer for "lifecycle management". RunServer already is that GenServer; address it by pid or by run_id via the registry.
  • Don't call sync CouncilEx.run/3 from mount/3 or handle_event/3. You'll block the LV process; events back up; patches stop flowing; users hate you.
  • Don't call sync CouncilEx.run/3 from a Phoenix controller for multi-minute councils: the HTTP connection will time out long before the council finishes.
  • Don't add CouncilEx.Runner.Registry or CouncilEx.Runner.TaskSupervisor to your supervision tree — they're already started by CouncilEx.Application and will fail with {:already_started, _}.
  • Don't spawn Task.async/1 (or Task.Supervisor.async/2) just to fire-and-forget a CouncilEx.run/3. You lose the pid, the run_id, the cancel handle, and the event stream. Use start/3.
  • Don't rely on the in-memory RunServer surviving a node restart. If the run is paid-for and must complete, wrap it in an Oban job (see docs/RUNNING_WITH_OBAN.md) and pair with Recorder.Ecto for durable history (see docs/PERSISTENCE.md).
  • Don't use CouncilEx.start/3 from inside an Oban worker — it spawns unlinked, so worker death leaves the runner orphaned. Use CouncilEx.run/3 (which calls start_link/3 internally).

10. Caller-owned supervision

CouncilEx.start/3 is GenServer.start/3 semantics — unsupervised, unlinked. That's the right answer for most apps. Sometimes you want group lifecycle:

  • Tenant lifecycles: when a tenant logs out, sweep all of their in-flight runs as a single supervisor shutdown.
  • Test isolation: each test gets its own DynamicSupervisor, no cross-test leakage.
  • Graceful host shutdown: your app's release-stop hook waits on your tree, which contains the runs, so deployment doesn't drop them.
  • In-flight visibility: DynamicSupervisor.which_children/1 enumerates active runs for a tenant.

The supported pattern is CouncilEx.Supervisor — a thin DynamicSupervisor wrapper:

defmodule MyApp.Application do
  use Application

  def start(_, _) do
    children = [
      MyApp.Repo,
      {Phoenix.PubSub, name: MyApp.PubSub},
      MyAppWeb.Endpoint,
      # Your run-grouping supervisor:
      {CouncilEx.Supervisor, name: MyApp.RunSup}
    ]

    Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
  end
end
{:ok, pid} =
  CouncilEx.Supervisor.start_link(MyApp.RunSup, council, input,
    subscribe: true
  )

run_id = CouncilEx.RunServer.run_id(pid)
ref = Process.monitor(pid)

# Cancel + terminate work the same regardless of where the run is hosted:
:ok = CouncilEx.cancel(pid)
:ok = CouncilEx.terminate_run(pid)

receive do
  {:DOWN, ^ref, :process, ^pid, _reason} -> :ok
end

# Bulk teardown — kills every run under MyApp.RunSup:
:ok = CouncilEx.Supervisor.terminate_all(MyApp.RunSup)

Children are :temporary — no restart on crash. Re-running a partially-completed council from scratch wastes tokens and produces weird state.

Embedding RunServer directly

If you want full control (composing run servers with other workers inside one supervisor, custom strategies, etc.) CouncilEx.RunServer is a public GenServer module with the standard child_spec/1:

{CouncilEx.RunServer,
 run_id: my_run_id,
 council: MyCouncil,
 spec: MyCouncil.__council__(),
 input: %{question: "..."}}

You won't typically need this; CouncilEx.Supervisor.start_link/4 covers 99% of cases.

cancel/2 vs terminate_run/2

cancel/2terminate_run/2
ProtocolCooperative GenServer.castProcess.exit(:shutdown)
Drains in-flight roundYes — finalizes as :cancelledNo — runner exits immediately
Emits :run_failed PubSub eventYes (with %Error{kind: :cancelled})No
Emits :DOWN to monitorsEventually (after finalize)Yes, immediately
Result available via await/2{:error, %Result{status: :error, errors: [..]}}{:error, :unknown_run}
Cascades to sub-runsYesNo (sub-runs may finish on their own)
Stale-pid handlingReturns {:error, :runner_dead}Returns :ok (best-effort)
Use whenUser clicked "Stop"; you want a partial resultWedged provider call; tenant cleanup; host shutdown

Most LiveView "Stop" buttons should keep using cancel/2. Reach for terminate_run/2 when you genuinely need the process gone now.

Pre-run validation

Independent of supervision: CouncilEx.validate/1 runs the same checks start/3 performs internally, but as a pure function. Use it in form-submission handlers to surface config errors before you charge tokens or spawn processes:

case CouncilEx.validate(council) do
  :ok ->
    {:ok, pid} = CouncilEx.start(council, input)
    run_id = CouncilEx.RunServer.run_id(pid)
    {:noreply, assign(socket, run_pid: pid, run_id: run_id)}

  {:error, errs} ->
    {:noreply, assign(socket, validation_errors: errs)}
end

errs are [%{path, code, message}] maps: JSON-serializable, ready for inline form errors in a builder UI.

Control opts catalog

OptTypeUse when
:registryatom() (Registry name)Tenant isolation where two tenants might collide on run_id. Caller must start the registry.
:run_idString.t()External system has already assigned an identity to the run (Oban job retry, tenant-namespaced id).
:recorder{module, args}Persist run lifecycle to DB / external sink — see docs/PERSISTENCE.md.
:relay_topicsString.t() | [String.t()]App-wide event aggregation: every event also broadcast to your own topic.
:subscribeboolean() (default false)LiveView / handler that wants the event stream. Subscribes BEFORE the runner starts.
:verbosetrue | :debug | falseDev/staging tracer to stdio. Don't ship in prod.
:verbose_ioIO deviceRoute the tracer somewhere other than stdio.
:await_timeout:infinity | pos_integer()Sync-only (run/3 / await/2); bound the wait.
:retry{max_attempts, base_backoff_ms}Per-member retry policy override. Default {2, 1000}.
:failure_mode:continue | :fail_fastWhether the runner stops on first member failure.
:member_timeout_mspos_integer()Per-member call timeout. Default 30s.

All lookup APIs (pid_for, cancel, terminate_run, await, list_active_runs) accept :registry so they route to the same registry the run was started under.


11. Quick reference

NeedUse
Validate config (no spawn)CouncilEx.validate(council)
Run + blockCouncilEx.run(council, input, await_timeout: ms)
Run + push eventsCouncilEx.start(council, input, subscribe: true)
Run linked to callerCouncilEx.start_link(council, input)
Place under caller-owned supCouncilEx.Supervisor.start_link(MyApp.RunSup, council, input)
Get run_id from pidCouncilEx.RunServer.run_id(pid)
Look up pid from run_idCouncilEx.pid_for(run_id)
Subscribe later (best-effort)CouncilEx.subscribe(run_id)
Block on pidCouncilEx.await(pid, await_timeout: :infinity)
Block on run idCouncilEx.await(run_id, await_timeout: :infinity)
Cancel cooperativelyCouncilEx.cancel(pid) or CouncilEx.cancel(run_id)
Terminate (Process.exit)CouncilEx.terminate_run(pid) or CouncilEx.terminate_run(run_id)
Live stateCouncilEx.RunServer.state(run_id)
List in-flightCouncilEx.list_active_runs/0
Phoenix.PubSub adapterconfig :council_ex, pubsub: {CouncilEx.PubSub.Phoenix, name: MyApp.PubSub}

See also