Scope: detailed reference for capabilities that are configured per-member (or per-round). For provider wiring, see
PROVIDERS.md. For the full PubSub event catalog, seelib/council_ex/events.ex.
CouncilEx members are independent participants in a council run. Beyond
choosing a provider: and model:, each member can opt into structured
outputs, streaming, tool calling (including parallel execution and
streaming tool loops), and real-time PubSub observability.
Structured outputs
Set output_schema on a member to an Ecto embedded schema. The
dispatcher (CouncilEx.Providers.Instructor) casts the LLM's JSON
response into that schema and runs the schema's optional
validate_changeset/2 callback. After casting, the member module's own
validate/1 callback runs for any business-rule checks that Ecto
constraints cannot express.
defmodule MyAnalysis do
use Ecto.Schema
@primary_key false
embedded_schema do
field :summary, :string
field :score, :integer
end
def validate_changeset(cs, _ctx) do
Ecto.Changeset.validate_number(cs, :score, greater_than_or_equal_to: 1, less_than_or_equal_to: 10)
end
end
member :analyst do
provider :openai
model "gpt-4o"
output_schema MyAnalysis
endResponse.parsed is set to a validated %MyAnalysis{} struct on
success, or the call returns {:error, %CouncilEx.Error{kind: :validation, reason: %Ecto.Changeset{}}} on failure.
Anthropic structured output
On the Anthropic adapter, structured output is implemented via a
synthetic _respond tool whose input_schema mirrors your Ecto schema.
CouncilEx forces the model to call this tool; the adapter then
reassembles the response — including partial_json SSE fragments during
streaming — and feeds the final JSON through the same Ecto cast +
validate pipeline.
Structured-output and user-supplied tools: are mutually exclusive on
the same member. Setting both raises ArgumentError. If you need a
model to both call tools and return structured output, split the work
across two members or two rounds.
See examples/anthropic_structured_output_example.exs.
Streaming
Opt in per member via stream true:
defmodule StreamingCouncil do
use CouncilEx
member :writer do
provider :openai
model "gpt-4o"
system_prompt "..."
stream true
end
round :independent_analysis
end
{:ok, pid} = CouncilEx.start(StreamingCouncil, %{topic: "..."})
run_id = CouncilEx.RunServer.run_id(pid)
:ok = CouncilEx.PubSub.subscribe("council_ex:run:#{run_id}")
receive do
{:member_token, _, _, _, %CouncilEx.StreamChunk{content: c, finish_reason: nil}} ->
IO.write(c)
{:member_token, _, _, _, %CouncilEx.StreamChunk{finish_reason: :stop}} ->
IO.puts("[done]")
endSubscribers receive :member_token PubSub events carrying a
%CouncilEx.StreamChunk{} struct with three fields:
| Field | Type | Description |
|---|---|---|
content | String.t() | Token text (empty string on finish marker) |
index | non_neg_integer() | Chunk index within the stream |
finish_reason | nil | :stop | :length | :tool_calls | :content_filter | nil for mid-stream chunks; set on the final chunk |
Adapter behaviour
- OpenAI-compatible (OpenAI, OpenRouter, Ollama): standard SSE
data:lines parsed by the Instructor adapter. - Anthropic: typed-event SSE; the adapter reassembles
partial_jsonfragments and emits them as:chunkevents. The finalResponse.parsedis cast and validated through your schema once the stream completes. - Providers that do not export
stream/3fall back tocomplete/2with a one-shot log warning.
Telemetry
[:council_ex, :member, :stream_chunk] fires per chunk. The
[:council_ex, :tool, :execute] event fires per tool execution even
during streaming tool loops (see Stream tool-loop).
See examples/streaming_example.exs.
Tools
Members can declare LLM-callable tools that CouncilEx.Providers.Instructor
invokes mid-completion via a bounded tool-call loop. A tool implements
the CouncilEx.Tool behaviour — exactly four callbacks:
| Callback | Return | Description |
|---|---|---|
name/0 | String.t() | Tool name sent to the LLM |
description/0 | String.t() | Natural-language description for the model |
parameters_schema/0 | module() | Ecto embedded schema for argument casting |
execute/1 | {:ok, term()} | {:error, term()} | Called with a cast struct of args |
defmodule MyTools.Calculator do
use Ecto.Schema
@primary_key false
embedded_schema do
field :a, :float
field :b, :float
field :op, :string
end
@behaviour CouncilEx.Tool
def name, do: "calculator"
def description, do: "Perform arithmetic on two numbers."
def parameters_schema, do: __MODULE__
def execute(%{op: "add", a: a, b: b}), do: {:ok, a + b}
def execute(%{op: "mul", a: a, b: b}), do: {:ok, a * b}
endDeclare tools on a member via the tools: opt (list of modules):
member :solver do
provider :openai
model "gpt-4o"
system_prompt "Use tools for math."
tools [MyTools.Calculator]
endTool-call loop
The dispatcher runs a bounded loop: build request → parse tool calls →
execute → send results back → repeat. The loop is capped at
max_tool_iterations (default 5), configurable per member opt or per
CouncilEx.run/3 opts. When the limit is reached the loop returns
{:error, %CouncilEx.Error{kind: :permanent, reason: :max_tool_iterations}}.
Both the OpenAI and Anthropic adapters handle tool calls in the
complete/2 and stream/3 paths.
Error handling
A tool that raises is caught by safe_execute/2, which surfaces the
exception as %ToolCallResult{error: {:tool_raised, exception}}. The
LLM sees the failure message and can retry or change strategy — the loop
is not aborted by a single tool failure under the default :collect
strategy.
See examples/tool_calling_example.exs.
Parallel tool execution
When a model emits multiple tool calls in a single assistant turn, CouncilEx executes them in parallel by default.
member :solver do
provider :openai
model "gpt-4o"
tools [Calc, Search, FetchUrl]
parallel_tools true # default
parallel_tools_strategy :collect # :collect | :fail_fast (default :collect)
tool_concurrency_factor 1.0 # concurrency = round(schedulers × factor), clamped to [1, 5 × schedulers_online()]
tool_timeout_ms 30_000 # per-tool kill-on-timeout (default 30 000 ms)
endThese opts also live on CouncilEx.Request as defaults:
| Field | Default | Notes |
|---|---|---|
parallel_tools | true | Set false for tools with order-dependent side effects |
parallel_tools_strategy | :collect | See strategies below |
tool_concurrency_factor | 1.0 | Clamped to [1, 5 * schedulers_online()] |
tool_timeout_ms | 30_000 | Timed-out tasks are killed; error surfaced as {:tool_timeout, ms} |
Strategies
:collect— runs all tools to completion (or timeout/error), returns one%ToolCallResult{}per call. Timeouts and exceptions are surfaced asToolCallResult.error; the loop continues. This is the default.:fail_fast— aborts the loop on the first failed or timed-out tool call, returning{:error, %CouncilEx.Error{}}. Sibling tasks already running are not cancelled; they finish (or hittool_timeout_ms) and their results are discarded.
Use :collect for independent tools; use :fail_fast when a failure in
one tool makes subsequent results meaningless.
See bench/parallel_tools.exs for a
wall-clock speedup measurement.
Stream tool-loop
CouncilEx.Providers.Instructor.stream/3 drives the full tool-call loop
end-to-end across multiple HTTP round-trips while streaming. The
user-supplied sink receives natural-language text chunks from all
iterations as if they were one continuous response. Tool-call argument
fragments are not forwarded to the sink; observe them via PubSub events
(see PubSub tool-call events).
member :researcher do
provider :openai
model "gpt-4o"
tools [WebSearch, FetchUrl]
stream true
endThe stream tool-loop honours the same bounds as the non-streaming loop:
- Bounded by
max_tool_iterations(default5). - Respects
parallel_tools_strategy: :fail_fastandtool_timeout_ms. - A sink callback that raises aborts the current iteration with
{:error, %CouncilEx.Error{reason: {:sink_raised, exception}}}.
PubSub tool-call events
Subscribe to "council_ex:run:#{run_id}" to observe tool-call lifecycle
events in real time. This works for both complete/2 and stream/3
tool loops.
:ok = CouncilEx.PubSub.subscribe("council_ex:run:#{run_id}")
receive do
{:tool_call_request, ^run_id, round_name, member_id,
%CouncilEx.ToolCall{name: name, args_raw: args}} ->
Logger.info("Calling #{name} with #{inspect(args)}")
{:tool_call_result, ^run_id, round_name, member_id,
%CouncilEx.ToolCallResult{result: r, error: nil}} ->
Logger.info("Result: #{inspect(r)}")
{:tool_call_result, ^run_id, round_name, member_id,
%CouncilEx.ToolCallResult{error: err}} when err != nil ->
Logger.warning("Tool error: #{inspect(err)}")
endEvent payloads
:tool_call_request — fires just before each tool execution.
- 5th element:
%CouncilEx.ToolCall{id, name, args_raw, args_parsed}
:tool_call_result — fires after each tool execution completes
(success or failure).
- 5th element:
%CouncilEx.ToolCallResult{id, name, result, error}resultis set on success,nilon failure.erroris set on failure (e.g.,{:tool_raised, exception},{:tool_timeout, ms},{:tool_not_found, name}),nilon success.
Exclusion
The synthetic Anthropic structured-output _respond tool is excluded
from all PubSub broadcasts. Only user-declared tools produce events.
See examples/tool_call_events_example.exs
for a runnable demo.