Per-member capabilities

Copy Markdown View Source

Scope: detailed reference for capabilities that are configured per-member (or per-round). For provider wiring, see PROVIDERS.md. For the full PubSub event catalog, see lib/council_ex/events.ex.

CouncilEx members are independent participants in a council run. Beyond choosing a provider: and model:, each member can opt into structured outputs, streaming, tool calling (including parallel execution and streaming tool loops), and real-time PubSub observability.


Structured outputs

Set output_schema on a member to an Ecto embedded schema. The dispatcher (CouncilEx.Providers.Instructor) casts the LLM's JSON response into that schema and runs the schema's optional validate_changeset/2 callback. After casting, the member module's own validate/1 callback runs for any business-rule checks that Ecto constraints cannot express.

defmodule MyAnalysis do
  use Ecto.Schema

  @primary_key false
  embedded_schema do
    field :summary, :string
    field :score, :integer
  end

  def validate_changeset(cs, _ctx) do
    Ecto.Changeset.validate_number(cs, :score, greater_than_or_equal_to: 1, less_than_or_equal_to: 10)
  end
end

member :analyst do
  provider :openai
  model "gpt-4o"
  output_schema MyAnalysis
end

Response.parsed is set to a validated %MyAnalysis{} struct on success, or the call returns {:error, %CouncilEx.Error{kind: :validation, reason: %Ecto.Changeset{}}} on failure.

Anthropic structured output

On the Anthropic adapter, structured output is implemented via a synthetic _respond tool whose input_schema mirrors your Ecto schema. CouncilEx forces the model to call this tool; the adapter then reassembles the response — including partial_json SSE fragments during streaming — and feeds the final JSON through the same Ecto cast + validate pipeline.

Structured-output and user-supplied tools: are mutually exclusive on the same member. Setting both raises ArgumentError. If you need a model to both call tools and return structured output, split the work across two members or two rounds.

See examples/anthropic_structured_output_example.exs.


Streaming

Opt in per member via stream true:

defmodule StreamingCouncil do
  use CouncilEx

  member :writer do
    provider :openai
    model "gpt-4o"
    system_prompt "..."
    stream true
  end

  round :independent_analysis
end

{:ok, pid} = CouncilEx.start(StreamingCouncil, %{topic: "..."})
run_id = CouncilEx.RunServer.run_id(pid)
:ok = CouncilEx.PubSub.subscribe("council_ex:run:#{run_id}")

receive do
  {:member_token, _, _, _, %CouncilEx.StreamChunk{content: c, finish_reason: nil}} ->
    IO.write(c)

  {:member_token, _, _, _, %CouncilEx.StreamChunk{finish_reason: :stop}} ->
    IO.puts("[done]")
end

Subscribers receive :member_token PubSub events carrying a %CouncilEx.StreamChunk{} struct with three fields:

FieldTypeDescription
contentString.t()Token text (empty string on finish marker)
indexnon_neg_integer()Chunk index within the stream
finish_reasonnil | :stop | :length | :tool_calls | :content_filternil for mid-stream chunks; set on the final chunk

Adapter behaviour

  • OpenAI-compatible (OpenAI, OpenRouter, Ollama): standard SSE data: lines parsed by the Instructor adapter.
  • Anthropic: typed-event SSE; the adapter reassembles partial_json fragments and emits them as :chunk events. The final Response.parsed is cast and validated through your schema once the stream completes.
  • Providers that do not export stream/3 fall back to complete/2 with a one-shot log warning.

Telemetry

[:council_ex, :member, :stream_chunk] fires per chunk. The [:council_ex, :tool, :execute] event fires per tool execution even during streaming tool loops (see Stream tool-loop).

See examples/streaming_example.exs.


Tools

Members can declare LLM-callable tools that CouncilEx.Providers.Instructor invokes mid-completion via a bounded tool-call loop. A tool implements the CouncilEx.Tool behaviour — exactly four callbacks:

CallbackReturnDescription
name/0String.t()Tool name sent to the LLM
description/0String.t()Natural-language description for the model
parameters_schema/0module()Ecto embedded schema for argument casting
execute/1{:ok, term()} | {:error, term()}Called with a cast struct of args
defmodule MyTools.Calculator do
  use Ecto.Schema

  @primary_key false
  embedded_schema do
    field :a, :float
    field :b, :float
    field :op, :string
  end

  @behaviour CouncilEx.Tool
  def name,              do: "calculator"
  def description,       do: "Perform arithmetic on two numbers."
  def parameters_schema, do: __MODULE__

  def execute(%{op: "add", a: a, b: b}), do: {:ok, a + b}
  def execute(%{op: "mul", a: a, b: b}), do: {:ok, a * b}
end

Declare tools on a member via the tools: opt (list of modules):

member :solver do
  provider :openai
  model "gpt-4o"
  system_prompt "Use tools for math."
  tools [MyTools.Calculator]
end

Tool-call loop

The dispatcher runs a bounded loop: build request → parse tool calls → execute → send results back → repeat. The loop is capped at max_tool_iterations (default 5), configurable per member opt or per CouncilEx.run/3 opts. When the limit is reached the loop returns {:error, %CouncilEx.Error{kind: :permanent, reason: :max_tool_iterations}}.

Both the OpenAI and Anthropic adapters handle tool calls in the complete/2 and stream/3 paths.

Error handling

A tool that raises is caught by safe_execute/2, which surfaces the exception as %ToolCallResult{error: {:tool_raised, exception}}. The LLM sees the failure message and can retry or change strategy — the loop is not aborted by a single tool failure under the default :collect strategy.

See examples/tool_calling_example.exs.


Parallel tool execution

When a model emits multiple tool calls in a single assistant turn, CouncilEx executes them in parallel by default.

member :solver do
  provider :openai
  model "gpt-4o"
  tools [Calc, Search, FetchUrl]
  parallel_tools true                # default
  parallel_tools_strategy :collect   # :collect | :fail_fast  (default :collect)
  tool_concurrency_factor 1.0        # concurrency = round(schedulers × factor), clamped to [1, 5 × schedulers_online()]
  tool_timeout_ms 30_000             # per-tool kill-on-timeout (default 30 000 ms)
end

These opts also live on CouncilEx.Request as defaults:

FieldDefaultNotes
parallel_toolstrueSet false for tools with order-dependent side effects
parallel_tools_strategy:collectSee strategies below
tool_concurrency_factor1.0Clamped to [1, 5 * schedulers_online()]
tool_timeout_ms30_000Timed-out tasks are killed; error surfaced as {:tool_timeout, ms}

Strategies

  • :collect — runs all tools to completion (or timeout/error), returns one %ToolCallResult{} per call. Timeouts and exceptions are surfaced as ToolCallResult.error; the loop continues. This is the default.
  • :fail_fast — aborts the loop on the first failed or timed-out tool call, returning {:error, %CouncilEx.Error{}}. Sibling tasks already running are not cancelled; they finish (or hit tool_timeout_ms) and their results are discarded.

Use :collect for independent tools; use :fail_fast when a failure in one tool makes subsequent results meaningless.

See bench/parallel_tools.exs for a wall-clock speedup measurement.


Stream tool-loop

CouncilEx.Providers.Instructor.stream/3 drives the full tool-call loop end-to-end across multiple HTTP round-trips while streaming. The user-supplied sink receives natural-language text chunks from all iterations as if they were one continuous response. Tool-call argument fragments are not forwarded to the sink; observe them via PubSub events (see PubSub tool-call events).

member :researcher do
  provider :openai
  model "gpt-4o"
  tools [WebSearch, FetchUrl]
  stream true
end

The stream tool-loop honours the same bounds as the non-streaming loop:

  • Bounded by max_tool_iterations (default 5).
  • Respects parallel_tools_strategy: :fail_fast and tool_timeout_ms.
  • A sink callback that raises aborts the current iteration with {:error, %CouncilEx.Error{reason: {:sink_raised, exception}}}.

PubSub tool-call events

Subscribe to "council_ex:run:#{run_id}" to observe tool-call lifecycle events in real time. This works for both complete/2 and stream/3 tool loops.

:ok = CouncilEx.PubSub.subscribe("council_ex:run:#{run_id}")

receive do
  {:tool_call_request, ^run_id, round_name, member_id,
   %CouncilEx.ToolCall{name: name, args_raw: args}} ->
    Logger.info("Calling #{name} with #{inspect(args)}")

  {:tool_call_result, ^run_id, round_name, member_id,
   %CouncilEx.ToolCallResult{result: r, error: nil}} ->
    Logger.info("Result: #{inspect(r)}")

  {:tool_call_result, ^run_id, round_name, member_id,
   %CouncilEx.ToolCallResult{error: err}} when err != nil ->
    Logger.warning("Tool error: #{inspect(err)}")
end

Event payloads

:tool_call_request — fires just before each tool execution.

  • 5th element: %CouncilEx.ToolCall{id, name, args_raw, args_parsed}

:tool_call_result — fires after each tool execution completes (success or failure).

  • 5th element: %CouncilEx.ToolCallResult{id, name, result, error}
    • result is set on success, nil on failure.
    • error is set on failure (e.g., {:tool_raised, exception}, {:tool_timeout, ms}, {:tool_not_found, name}), nil on success.

Exclusion

The synthetic Anthropic structured-output _respond tool is excluded from all PubSub broadcasts. Only user-declared tools produce events.

See examples/tool_call_events_example.exs for a runnable demo.