CouncilEx (CouncilEx v0.1.0)

Copy Markdown View Source

Multi-model LLM council workflows for Elixir.

Usage

defmodule MyCouncil do
  use CouncilEx

  member :optimist, MyApp.Members.Optimist, provider: :open_router, model: "openai/gpt-4o-mini"
  member :skeptic,  MyApp.Members.Skeptic,  provider: :open_router, model: "anthropic/claude-3.5-sonnet"

  round :independent_analysis

  chair MyApp.Members.Synthesizer, id: :chair, provider: :open_router, model: "openai/gpt-4o"
end

{:ok, result} = CouncilEx.run(MyCouncil, %{question: "go or wait?"})

Summary

Types

Stable curated summary of an active (or recently completed) run, as returned by list_active_runs/0.

Functions

Convenience wrapper that routes input through the configured default AutoCouncil and runs it synchronously.

Block waiting for the run to finish; returns the Result.

Cancel a running council cooperatively (the runner will finalize as :cancelled after the in-flight round drains).

Declare the chair (final synthesizer).

Declare the default profile for members in this council.

List currently active (or recently completed but not yet reaped) runs as curated summaries. Slow or dead runs are skipped without blocking.

Declare a member with an inline block.

Declare a member in module form.

Look up the runner pid for run_id.

Declare a round by name (built-in) or by module.

Declare an adaptive router. Accepts a module implementing CouncilEx.Router or a 2-arity anonymous fn fn input, ctx -> [member_id] end.

Run a council synchronously.

Start an async run; returns {:ok, pid} immediately. Get the run id from the pid with CouncilEx.RunServer.run_id/1.

Like start/3, but links the calling process to the run server (GenServer.start_link/3 semantics).

Subscribe the calling process to run events.

Unsubscribe the calling process from run events.

Validate a council without running it. Returns :ok or {:error, [%{path, code, message}]}.

Types

run_summary()

@type run_summary() :: %{
  run_id: binary(),
  council: module(),
  status: :pending | :running | :completed | :failed | :cancelled,
  current_round: %{name: atom(), idx: non_neg_integer()} | nil,
  rounds_completed: non_neg_integer(),
  started_at: DateTime.t()
}

Stable curated summary of an active (or recently completed) run, as returned by list_active_runs/0.

Functions

auto(input, overrides \\ [])

@spec auto(
  term(),
  keyword()
) :: {:ok, CouncilEx.Result.t()} | {:error, CouncilEx.Result.t() | term()}

Convenience wrapper that routes input through the configured default AutoCouncil and runs it synchronously.

Default config lives under :council_ex, :auto:

config :council_ex, :auto,
  strategy:    :rules,
  catalog:     {:registry, :council},
  on_no_match: :error

Per-call overrides are merged on top:

CouncilEx.auto(%{question: "..."}, strategy: :cascade, options: [chain: [:rules]])

The keyword list may mix AutoCouncil struct opts with run opts in the same call. Struct opts (:strategy, :catalog, :on_no_match, :name, :options, :provider_check) build the router; everything else is forwarded to run/3 — including :verbose, :verbose_io, :await_timeout:

CouncilEx.auto(%{question: "audit my SEO"}, verbose: true)

See CouncilEx.AutoCouncil for the full options list. Inspect result.metadata.auto to see which council was picked.

await(pid_or_run_id, opts \\ [])

@spec await(
  pid() | String.t(),
  keyword()
) ::
  {:ok, CouncilEx.Result.t()}
  | {:error, CouncilEx.Result.t()}
  | {:error, :unknown_run | :timeout}

Block waiting for the run to finish; returns the Result.

Accepts a runner pid (preferred — direct, no registry lookup needed for the PubSub topic resolution) or a run_id binary (looks up the pid via Registry; pass :registry for caller-owned registries).

cancel(pid_or_run_id, opts \\ [])

@spec cancel(
  pid() | String.t(),
  keyword()
) :: :ok | {:error, :unknown_run | :runner_dead}

Cancel a running council cooperatively (the runner will finalize as :cancelled after the in-flight round drains).

Accepts a runner pid (preferred — direct GenServer.cast) or a run_id binary (looks up the pid via Registry).

Returns:

  • :ok — cancel signal delivered.
  • {:error, :unknown_run} — no entry in the registry (run_id form only).
  • {:error, :runner_dead} — the registry holds a stale entry whose pid is gone, OR the supplied pid is dead. Caller can fall through to terminate_run/2 or treat the run as already over.

This is a cooperative signal — for an immediate kill (e.g., the run is wedged inside a non-responsive provider call), use terminate_run/2.

chair(module, opts)

(macro)

Declare the chair (final synthesizer).

default_profile(profile_module)

(macro)

Declare the default profile for members in this council.

list_active_runs(opts \\ [])

@spec list_active_runs(keyword()) :: [run_summary()]

List currently active (or recently completed but not yet reaped) runs as curated summaries. Slow or dead runs are skipped without blocking.

Options:

  • :registry — registry to scan; defaults to the bundled CouncilEx.Runner.Registry. Pass your own when using a caller-owned Registry (e.g. for tenant isolation, where each tenant has its own registry).

member(id, module)

(macro)

Declare a member with an inline block.

member(id, module, opts)

(macro)

Declare a member in module form.

pid_for(run_id, opts \\ [])

@spec pid_for(
  String.t(),
  keyword()
) :: {:ok, pid()} | {:error, :unknown_run | :runner_dead}

Look up the runner pid for run_id.

Returns:

  • {:ok, pid()} when the runner is alive.
  • {:error, :unknown_run} when no registry entry exists.
  • {:error, :runner_dead} when a stale entry points at a dead pid.

Useful for Process.monitor/1 after the fact, or for asserting a run is still alive before issuing a long blocking call.

round(name_or_module, opts \\ [])

(macro)

Declare a round by name (built-in) or by module.

router(spec)

(macro)

Declare an adaptive router. Accepts a module implementing CouncilEx.Router or a 2-arity anonymous fn fn input, ctx -> [member_id] end.

run(council, input, opts \\ [])

Run a council synchronously.

Accepts either a council module (compile-time DSL) or a %CouncilEx.DynamicCouncil{} (data-only). For dynamic councils, Result.council carries "dynamic:" <> council.id.

start(council, input, opts \\ [])

@spec start(module() | CouncilEx.DynamicCouncil.t(), term(), keyword()) ::
  {:ok, pid()} | {:error, term()}

Start an async run; returns {:ok, pid} immediately. Get the run id from the pid with CouncilEx.RunServer.run_id/1.

Options

  • :verbosetrue | :debug | false (default false). When set, attaches a CouncilEx.Verbose tracer subscribed to the run's topic that prints a timeline to stdout. true = timeline + duration + tokens. :debug = also dumps response bodies (truncated). The tracer auto-detaches on terminal event.

  • :verbose_io — IO device for the tracer (default :stdio).
  • :subscribetrue | false (default false). When true, subscribes the calling process to the run's PubSub topic BEFORE the RunServer is started, guaranteeing no events are missed. Without this, callers must call CouncilEx.PubSub.subscribe/1 themselves AFTER start/3 returns, which races against the RunServer's continue callback under load. Recommended for any caller that needs the full event timeline.

  • :registry — name of a Registry to register the run under. Defaults to the bundled CouncilEx.Runner.Registry. Override only when you need tenant-level isolation (so two tenants can't collide on run_id). The caller MUST start the registry first: {Registry, keys: :unique, name: MyApp.RunReg}. Lookup APIs (await/2, cancel/2, terminate_run/2, pid_for/2, list_active_runs/1) take the same :registry opt and must be passed the same name. Most apps should leave this alone — run_id is already a strong unique key.
  • :relay_topics — string topic or list of topics. Every PubSub event broadcast for this run is also published to each of these topics. Use for cross-run aggregation (one app-wide topic that receives every event from every council run, no aggregator process needed).
  • :run_id — explicit run id (binary). When omitted, the runtime generates one. Supply this when an external system has already assigned an identity to the run (e.g. an Oban job that wants the same id across retries, or a tenant that needs a domain-specific naming scheme). Caller is responsible for uniqueness.
  • :recorder{module, args} tuple where module implements CouncilEx.Recorder. The runtime spawns a recorder process before the RunServer starts (so it subscribes to the run topic in time for :run_started), then dispatches lifecycle events to module.handle_event/2. The recorder exits after module.handle_finalize/2 is called with the terminal outcome. A recorder crash does not kill the run.

start_link(council, input, opts \\ [])

@spec start_link(module() | CouncilEx.DynamicCouncil.t(), term(), keyword()) ::
  {:ok, pid()} | {:error, term()}

Like start/3, but links the calling process to the run server (GenServer.start_link/3 semantics).

Caller-die kills the run; runner-die kills the caller (unless trapping exits). Use when the caller owns the run's lifecycle — Oban workers, tests, or any context where a leaked background run would burn tokens.

Returns {:ok, pid}. Use CouncilEx.RunServer.run_id/1 if you need the auto-generated run_id (e.g. for PubSub subscribe across processes, or to persist as a durable handle).

subscribe(run_id)

@spec subscribe(String.t()) :: :ok

Subscribe the calling process to run events.

terminate_run(pid_or_run_id, opts \\ [])

@spec terminate_run(
  pid() | String.t(),
  keyword()
) :: :ok | {:error, :unknown_run}

Forcefully terminate a run via Process.exit/2.

Looks up the runner pid by run_id and sends :shutdown. Returns :ok on success or {:error, :unknown_run} if the run id has no live registry entry.

Unlike cancel/1, this is non-cooperative: the run process is killed immediately and no :run_completed / :run_failed event is emitted. Use when:

  • the runner is wedged and cancel/1 won't be drained, or
  • the host application is shutting down a tenant / session and needs runs gone synchronously.

Subscribers should still receive a :DOWN message if they monitored the runner pid (returned from start/3 / start_link/3).

unsubscribe(run_id)

@spec unsubscribe(String.t()) :: :ok

Unsubscribe the calling process from run events.

validate(c)

@spec validate(module() | CouncilEx.DynamicCouncil.t()) :: :ok | {:error, [map()]}

Validate a council without running it. Returns :ok or {:error, [%{path, code, message}]}.

Accepts a module-form council (atom) or a %CouncilEx.DynamicCouncil{}. Dynamic councils delegate to CouncilEx.DynamicCouncil.validate/1. Module-form councils walk the compiled spec and verify that every member's resolved opts include :provider and :model, and that any referenced provider is registered under config :council_ex, :providers.

start/3 runs this validator first and returns {:error, {:invalid_council, errs}} on failure — call this manually when you want to surface structured errors before dispatch (e.g., from a builder UI, before charging tokens or spawning processes).

Sub-council shim members (provider: :__sub_council__) are skipped; their inner councils are validated recursively when the sub-council ref is itself a %DynamicCouncil{}.