This guide covers how to embed CouncilEx into a long-running OTP app,
typically a Phoenix web server, though the same patterns apply to any
Application-style release (Broadway pipeline, GenStage system, GRPC
server, etc.).
The TL;DR:
Add
:council_exto your deps. Use the async API (start/3/start_link/3+ PubSub events) from your request-handling code. No setup required.CouncilEx does not bundle a run supervisor. Runners are spawned with
GenServer.start/3(unsupervised, unlinked) orGenServer.start_link/3(linked to caller). Need group lifecycle (tenant teardown, in-flight visibility, bulk-cancel)? PlaceCouncilEx.Supervisorin your tree — opt-inDynamicSupervisor.Runs are in-memory: they don't survive a node restart. If you need cross-restart durability, layer Oban over
CouncilEx.run/3and the opt-in persistence layer (seedocs/PERSISTENCE.mdanddocs/RUNNING_WITH_OBAN.md).
The rest of this doc unpacks the why and the how, with copy-pasteable examples.
1. The bundled supervision tree (minimal)
When :council_ex is in your mix.exs deps, the OTP application starts
on boot via CouncilEx.Application and brings up:
CouncilEx top-level supervisor (rest_for_one)
├── :pg group :council_ex # default PubSub backend
├── Registry CouncilEx.Runner.Registry # run_id → RunServer pid
└── Task.Supervisor CouncilEx.Runner.TaskSupervisorThat is the whole bundled tree. There is no DynamicSupervisor for
runs in core — CouncilEx.start/3 is GenServer.start/3 semantics and
CouncilEx.start_link/3 is GenServer.start_link/3 semantics. The
caller owns the pid.
Every call to CouncilEx.start/3 spawns a CouncilEx.RunServer
GenServer registered by run_id in CouncilEx.Runner.Registry. The
synchronous CouncilEx.run/3 is a thin start_link + await wrapper on
top of the same machinery.
You may add to your own tree:
CouncilEx.Supervisor— opt-inDynamicSupervisorwrapper for grouping runs (see §10).- A
Phoenix.PubSubadapter if you want to share PubSub with the rest of your app (see §6).
You should not add CouncilEx.Runner.Registry or
CouncilEx.Runner.TaskSupervisor to your child list — they're already
started by CouncilEx.Application.
2. Two execution shapes, picked by call site
| Call site | Shape | API |
|---|---|---|
| Background job, CLI, batch, short controller | Sync, blocking | CouncilEx.run/3 |
| LiveView, Channel, SSE handler, long controller | Async, event-driven | start/3 + subscribe |
The async shape is the one most Phoenix apps want, because LLM rounds take seconds-to-minutes and you don't want to hold an HTTP connection or block a LiveView process for that long.
Runs live in-memory inside a
RunServerGenServer; if the BEAM node restarts mid-run, the run is lost. For durability across restarts, wrap the run in an Oban job and use the opt-inRecorder.Ectopersistence — seedocs/RUNNING_WITH_OBAN.mdanddocs/PERSISTENCE.md.
PubSub is optional
The async shape does not require subscribing. A common pattern is
an Oban worker that calls CouncilEx.start/3 and then blocks on
await/2 — no PubSub plumbing, no handle_info clauses, no
subscribe: true. PubSub is the right tool when a UI process needs
incremental progress; for headless background work, await/2 is
sufficient.
# Oban worker — no PubSub needed.
def perform(%Oban.Job{args: %{"question" => q, "run_id" => run_id}}) do
{:ok, pid} = CouncilEx.start(MyApp.Councils.Advisor, %{question: q}, run_id: run_id)
{:ok, %CouncilEx.Result{final: final}} = CouncilEx.await(pid, :infinity)
MyApp.persist_answer(run_id, final)
:ok
endFor app-owned per-member persistence (cost / tokens / latency rows
written without Recorder.Ecto), attach a telemetry handler to
[:council_ex, :member, :stop] — see the Telemetry-driven
persistence section in docs/PERSISTENCE.md. Telemetry is the
cleaner seam than PubSub for that use case: per-member granularity,
no subscribe lifecycle, decoupled from any UI process.
3. LiveView: the canonical pattern
Use start/3 with subscribe: true. The LV process receives the
CouncilEx.Events messages on its mailbox and updates assigns
incrementally. No GenServer of your own. No polling.
The pid is the handle. Stash it in assigns alongside the run_id; pass
either to cancel/2 and await/2.
defmodule MyAppWeb.CouncilLive do
use MyAppWeb, :live_view
alias CouncilEx.{MemberResult, Result, RoundResult, StreamChunk}
@impl true
def mount(_params, _session, socket) do
{:ok,
socket
|> assign(
run_pid: nil,
run_id: nil,
status: :idle,
rounds: [],
tokens: %{},
final: nil,
error: nil
)}
end
@impl true
def handle_event("ask", %{"question" => q}, socket) do
# subscribe: true subscribes the LV process to the run's PubSub topic
# BEFORE the RunServer starts, so we can't miss :run_started.
{:ok, pid} =
CouncilEx.start(MyApp.Councils.Advisor, %{question: q}, subscribe: true)
run_id = CouncilEx.RunServer.run_id(pid)
{:noreply,
assign(socket, run_pid: pid, run_id: run_id, status: :running, error: nil)}
end
def handle_event("cancel", _params, %{assigns: %{run_pid: pid}} = socket)
when is_pid(pid) do
:ok = CouncilEx.cancel(pid)
{:noreply, socket}
end
# --- run lifecycle events ---
@impl true
def handle_info({:run_started, run_id, _council, _input}, %{assigns: %{run_id: run_id}} = socket) do
{:noreply, assign(socket, status: :running)}
end
def handle_info({:round_started, run_id, round_name, idx}, %{assigns: %{run_id: run_id}} = socket) do
{:noreply, assign(socket, status: {:round, round_name, idx})}
end
def handle_info({:member_started, run_id, round, member_id}, %{assigns: %{run_id: run_id}} = socket) do
{:noreply, assign(socket, status: {:member_started, round, member_id})}
end
# Streaming token chunks: only fire when a member has `stream: true`.
def handle_info(
{:member_token, run_id, _round, member_id, %StreamChunk{content: chunk}},
%{assigns: %{run_id: run_id, tokens: tokens}} = socket
) do
{:noreply, assign(socket, tokens: Map.update(tokens, member_id, chunk, &(&1 <> chunk)))}
end
def handle_info({:member_completed, run_id, _round, _id, %MemberResult{}}, %{assigns: %{run_id: run_id}} = socket) do
{:noreply, socket}
end
def handle_info({:round_completed, run_id, _name, %RoundResult{} = rr}, %{assigns: %{run_id: run_id, rounds: rs}} = socket) do
{:noreply, assign(socket, rounds: rs ++ [rr])}
end
def handle_info({:run_completed, run_id, %Result{} = result}, %{assigns: %{run_id: run_id}} = socket) do
{:noreply, assign(socket, status: :completed, final: result.final)}
end
def handle_info({:run_failed, run_id, _errors, %Result{} = result}, %{assigns: %{run_id: run_id}} = socket) do
{:noreply, assign(socket, status: :failed, error: result.errors)}
end
# Ignore late messages from a previous run (run_id mismatch).
def handle_info({_event, _other_run_id, _ | _}, socket), do: {:noreply, socket}
endWhy subscribe: true matters
If you do:
{:ok, pid} = CouncilEx.start(council, input) # RunServer starts here
run_id = CouncilEx.RunServer.run_id(pid)
CouncilEx.subscribe(run_id) # subscribe AFTER…there is a documented race: the RunServer's init/1 and immediate
handle_continue may have already broadcast :run_started and
:round_started before your subscribe lands. You'll miss them.
subscribe: true does the subscribe before RunServer.start/1 is
called inside CouncilEx.start/3, so the topic is live before the
RunServer exists. Use it everywhere you care about the full event
timeline.
4. Phoenix.Channel pattern
Same idea, slightly different plumbing. Phoenix.Channel processes
also have a mailbox, so subscribe + handle_info works identically:
defmodule MyAppWeb.CouncilChannel do
use MyAppWeb, :channel
def join("council:lobby", _params, socket), do: {:ok, socket}
def handle_in("ask", %{"q" => q}, socket) do
{:ok, pid} =
CouncilEx.start(MyApp.Councils.Advisor, %{question: q}, subscribe: true)
run_id = CouncilEx.RunServer.run_id(pid)
{:reply, {:ok, %{run_id: run_id}},
socket |> assign(:run_pid, pid) |> assign(:run_id, run_id)}
end
def handle_info({:round_completed, run_id, name, _rr}, %{assigns: %{run_id: run_id}} = socket) do
push(socket, "round_completed", %{name: name})
{:noreply, socket}
end
def handle_info({:run_completed, run_id, %CouncilEx.Result{final: final}}, %{assigns: %{run_id: run_id}} = socket) do
push(socket, "run_completed", %{final: final && final.content})
{:noreply, socket}
end
def handle_info({:run_failed, run_id, _errs, _result}, %{assigns: %{run_id: run_id}} = socket) do
push(socket, "run_failed", %{})
{:noreply, socket}
end
def handle_info(_, socket), do: {:noreply, socket}
end5. Phoenix.Controller / plain HTTP
Two viable shapes, depending on expected duration:
(a) Short councils (<10s, P95 well under your HTTP timeout): sync, return JSON:
def create(conn, %{"question" => q}) do
case CouncilEx.run(MyApp.Councils.Quick, %{question: q}, await_timeout: 30_000) do
{:ok, %CouncilEx.Result{final: final}} -> json(conn, %{answer: final.content})
{:error, %CouncilEx.Result{errors: errs}} -> conn |> put_status(500) |> json(%{errors: inspect(errs)})
end
end(b) Long councils: return run_id, expose a GET /runs/:id for
poll, or upgrade to SSE / LiveView / channel for push:
def create(conn, %{"question" => q}) do
{:ok, pid} = CouncilEx.start(MyApp.Councils.Slow, %{question: q})
run_id = CouncilEx.RunServer.run_id(pid)
json(conn, %{run_id: run_id, status: "running"})
end
def show(conn, %{"id" => run_id}) do
case CouncilEx.RunServer.state(run_id) do
{:ok, summary} -> json(conn, summary)
{:error, :not_found} -> conn |> put_status(404) |> json(%{error: "unknown run"})
end
endRunServer.state/2 returns a stable curated summary
(run_id, council, status, current_round, rounds_completed,
started_at) and is safe to call at high frequency. After a run
finalizes, the RunServer stays alive for ~60s for late fetches before
auto-reaping.
For SSE, subscribe inside the controller using Plug.Conn.chunk/2 in
a loop driven by receive. Same event shapes as the LiveView example.
6. Sharing your Phoenix.PubSub
By default CouncilEx uses Erlang's :pg for event broadcast. This is
fine and zero-config. If you'd rather route everything through your
existing Phoenix.PubSub server (for cross-node delivery via Redis /
PG2 adapter, unified topic plumbing, easier dashboard wiring),
configure the adapter:
# application.ex children: same as any Phoenix app
{Phoenix.PubSub, name: MyApp.PubSub}# config/runtime.exs (or config.exs)
config :council_ex, pubsub: {CouncilEx.PubSub.Phoenix, name: MyApp.PubSub}phoenix_pubsub is optional: true in CouncilEx's deps: only pulled
in if your app already depends on it. The runnable demo lives at
examples/phoenix_pubsub_example.exs.
For multi-node deployments, pair the PubSub adapter with
mode: :multi_node so Registry, Reliability, and Recorder also share
state across replicas. See docs/PERSISTENCE.md for
the full mode breakdown:
config :council_ex,
mode: :multi_node,
repo: MyApp.Repo,
pubsub: {CouncilEx.PubSub.Phoenix, name: MyApp.PubSub}7. Cancellation, timeouts, failure modes
These primitives are the same across every shape above; the call site just changes who's holding the pid (or run_id):
# Cancel mid-run. Cascades to active sub-runs.
:ok = CouncilEx.cancel(pid) # direct, no Registry lookup
:ok = CouncilEx.cancel(run_id) # Registry-resolved
# Force-terminate (Process.exit(:shutdown), no :run_failed event):
:ok = CouncilEx.terminate_run(pid)
:ok = CouncilEx.terminate_run(run_id)
# Per-member timeout (default 30_000ms or :default_member_timeout_ms config):
CouncilEx.start(council, input, member_timeout_ms: 60_000)
# Fail-fast vs continue (default :continue or :default_failure_mode config):
CouncilEx.start(council, input, failure_mode: :fail_fast)
# Bound your wait (sync only; default :infinity):
CouncilEx.run(council, input, await_timeout: 30_000)For LiveView, the natural cancel hook is a "Stop" button that calls
CouncilEx.cancel(pid) from handle_event/3. The RunServer kills its
current round task, finalizes with status: :error and a
%CouncilEx.Error{kind: :cancelled} in errors, broadcasts
:run_failed, and reaps after 60s.
When the LV process itself dies, the RunServer does not
automatically cancel (unless you used start_link/3): it keeps running
and finalizes normally. The events go to a topic with no subscribers.
Two ways to get LV-death-cancels-run semantics:
- Use
CouncilEx.start_link/3— linked. LV death kills the run. - Use
CouncilEx.start/3and addProcess.flag(:trap_exit, true)+ aterminate/2that callscancel/1. Be cautious: navigation away can also trigger terminate, which may not match user intent.
8. Verbose tracer in dev / staging
The verbose: true (timeline) and verbose: :debug (timeline + truncated
response bodies) opts attach a CouncilEx.Verbose tracer to the run's
PubSub topic and print to stdio. Pure event consumer, zero cost when
off. Handy for:
- Debugging a flaky council in staging without changing code paths.
- Demoing in dev with
iex -S mix phx.server.
# Anywhere: IEx, controller, mix task:
{:ok, pid} = CouncilEx.start(MyApp.Councils.Advisor, input, verbose: :debug)Don't ship verbose: to production HTTP requests: it pipes to :stdio,
which means your structured Phoenix logs get interleaved with raw
LLM output. Either omit it, route to a file via verbose_io:, or
build a custom tracer that emits structured Logger events (the
CouncilEx.Verbose source is ~230
lines and a fine starting template).
9. Anti-patterns
- Don't wrap
CouncilEx.run/3in your own GenServer for "lifecycle management".RunServeralready is that GenServer; address it by pid or byrun_idvia the registry. - Don't call sync
CouncilEx.run/3frommount/3orhandle_event/3. You'll block the LV process; events back up; patches stop flowing; users hate you. - Don't call sync
CouncilEx.run/3from a Phoenix controller for multi-minute councils: the HTTP connection will time out long before the council finishes. - Don't add
CouncilEx.Runner.RegistryorCouncilEx.Runner.TaskSupervisorto your supervision tree — they're already started byCouncilEx.Applicationand will fail with{:already_started, _}. - Don't spawn
Task.async/1(orTask.Supervisor.async/2) just to fire-and-forget aCouncilEx.run/3. You lose the pid, the run_id, the cancel handle, and the event stream. Usestart/3. - Don't rely on the in-memory RunServer surviving a node restart.
If the run is paid-for and must complete, wrap it in an Oban job
(see
docs/RUNNING_WITH_OBAN.md) and pair withRecorder.Ectofor durable history (seedocs/PERSISTENCE.md). - Don't use
CouncilEx.start/3from inside an Oban worker — it spawns unlinked, so worker death leaves the runner orphaned. UseCouncilEx.run/3(which callsstart_link/3internally).
10. Caller-owned supervision
CouncilEx.start/3 is GenServer.start/3 semantics — unsupervised,
unlinked. That's the right answer for most apps. Sometimes you want
group lifecycle:
- Tenant lifecycles: when a tenant logs out, sweep all of their in-flight runs as a single supervisor shutdown.
- Test isolation: each test gets its own
DynamicSupervisor, no cross-test leakage. - Graceful host shutdown: your app's release-stop hook waits on your tree, which contains the runs, so deployment doesn't drop them.
- In-flight visibility:
DynamicSupervisor.which_children/1enumerates active runs for a tenant.
The supported pattern is CouncilEx.Supervisor — a thin
DynamicSupervisor wrapper:
defmodule MyApp.Application do
use Application
def start(_, _) do
children = [
MyApp.Repo,
{Phoenix.PubSub, name: MyApp.PubSub},
MyAppWeb.Endpoint,
# Your run-grouping supervisor:
{CouncilEx.Supervisor, name: MyApp.RunSup}
]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
end
end{:ok, pid} =
CouncilEx.Supervisor.start_link(MyApp.RunSup, council, input,
subscribe: true
)
run_id = CouncilEx.RunServer.run_id(pid)
ref = Process.monitor(pid)
# Cancel + terminate work the same regardless of where the run is hosted:
:ok = CouncilEx.cancel(pid)
:ok = CouncilEx.terminate_run(pid)
receive do
{:DOWN, ^ref, :process, ^pid, _reason} -> :ok
end
# Bulk teardown — kills every run under MyApp.RunSup:
:ok = CouncilEx.Supervisor.terminate_all(MyApp.RunSup)Children are :temporary — no restart on crash. Re-running a
partially-completed council from scratch wastes tokens and produces
weird state.
Embedding RunServer directly
If you want full control (composing run servers with other workers
inside one supervisor, custom strategies, etc.) CouncilEx.RunServer
is a public GenServer module with the standard child_spec/1:
{CouncilEx.RunServer,
run_id: my_run_id,
council: MyCouncil,
spec: MyCouncil.__council__(),
input: %{question: "..."}}You won't typically need this; CouncilEx.Supervisor.start_link/4
covers 99% of cases.
cancel/2 vs terminate_run/2
cancel/2 | terminate_run/2 | |
|---|---|---|
| Protocol | Cooperative GenServer.cast | Process.exit(:shutdown) |
| Drains in-flight round | Yes — finalizes as :cancelled | No — runner exits immediately |
Emits :run_failed PubSub event | Yes (with %Error{kind: :cancelled}) | No |
Emits :DOWN to monitors | Eventually (after finalize) | Yes, immediately |
Result available via await/2 | {:error, %Result{status: :error, errors: [..]}} | {:error, :unknown_run} |
| Cascades to sub-runs | Yes | No (sub-runs may finish on their own) |
| Stale-pid handling | Returns {:error, :runner_dead} | Returns :ok (best-effort) |
| Use when | User clicked "Stop"; you want a partial result | Wedged provider call; tenant cleanup; host shutdown |
Most LiveView "Stop" buttons should keep using cancel/2. Reach for
terminate_run/2 when you genuinely need the process gone now.
Pre-run validation
Independent of supervision: CouncilEx.validate/1 runs the same checks
start/3 performs internally, but as a pure function. Use it in
form-submission handlers to surface config errors before you charge
tokens or spawn processes:
case CouncilEx.validate(council) do
:ok ->
{:ok, pid} = CouncilEx.start(council, input)
run_id = CouncilEx.RunServer.run_id(pid)
{:noreply, assign(socket, run_pid: pid, run_id: run_id)}
{:error, errs} ->
{:noreply, assign(socket, validation_errors: errs)}
enderrs are [%{path, code, message}] maps: JSON-serializable, ready
for inline form errors in a builder UI.
Control opts catalog
| Opt | Type | Use when |
|---|---|---|
:registry | atom() (Registry name) | Tenant isolation where two tenants might collide on run_id. Caller must start the registry. |
:run_id | String.t() | External system has already assigned an identity to the run (Oban job retry, tenant-namespaced id). |
:recorder | {module, args} | Persist run lifecycle to DB / external sink — see docs/PERSISTENCE.md. |
:relay_topics | String.t() | [String.t()] | App-wide event aggregation: every event also broadcast to your own topic. |
:subscribe | boolean() (default false) | LiveView / handler that wants the event stream. Subscribes BEFORE the runner starts. |
:verbose | true | :debug | false | Dev/staging tracer to stdio. Don't ship in prod. |
:verbose_io | IO device | Route the tracer somewhere other than stdio. |
:await_timeout | :infinity | pos_integer() | Sync-only (run/3 / await/2); bound the wait. |
:retry | {max_attempts, base_backoff_ms} | Per-member retry policy override. Default {2, 1000}. |
:failure_mode | :continue | :fail_fast | Whether the runner stops on first member failure. |
:member_timeout_ms | pos_integer() | Per-member call timeout. Default 30s. |
All lookup APIs (pid_for, cancel, terminate_run, await,
list_active_runs) accept :registry so they route to the same
registry the run was started under.
11. Quick reference
| Need | Use |
|---|---|
| Validate config (no spawn) | CouncilEx.validate(council) |
| Run + block | CouncilEx.run(council, input, await_timeout: ms) |
| Run + push events | CouncilEx.start(council, input, subscribe: true) |
| Run linked to caller | CouncilEx.start_link(council, input) |
| Place under caller-owned sup | CouncilEx.Supervisor.start_link(MyApp.RunSup, council, input) |
| Get run_id from pid | CouncilEx.RunServer.run_id(pid) |
| Look up pid from run_id | CouncilEx.pid_for(run_id) |
| Subscribe later (best-effort) | CouncilEx.subscribe(run_id) |
| Block on pid | CouncilEx.await(pid, await_timeout: :infinity) |
| Block on run id | CouncilEx.await(run_id, await_timeout: :infinity) |
| Cancel cooperatively | CouncilEx.cancel(pid) or CouncilEx.cancel(run_id) |
| Terminate (Process.exit) | CouncilEx.terminate_run(pid) or CouncilEx.terminate_run(run_id) |
| Live state | CouncilEx.RunServer.state(run_id) |
| List in-flight | CouncilEx.list_active_runs/0 |
| Phoenix.PubSub adapter | config :council_ex, pubsub: {CouncilEx.PubSub.Phoenix, name: MyApp.PubSub} |
See also
CouncilEx.Events: frozen event surfacedocs/PERSISTENCE.md: opt-in Ecto / Redis backendsdocs/RUNNING_WITH_OBAN.md: durable retriesexamples/phoenix_pubsub_example.exs: runnable PubSub adapter demo- README §Running councils: lifecycle API reference