Nous.StreamNormalizer.ToolCallAccumulator (nous v0.16.3)

View Source

Reassembles partial tool-call fragments emitted by Nous.StreamNormalizer into the final list shape that Nous.Messages.from_provider_response/2 produces for the non-streaming path.

Used by the stream: true branch of Nous.AgentRunner.run/3 to convert a sequence of {:tool_call_delta, fragment} events into the tool_calls field of an assembled %Nous.Message{}.

Polymorphic across the three provider chunk shapes that Nous.StreamNormalizer emits:

OpenAI-compatible

Fragments arrive as a list of partial calls, each with an "index" plus potentially split "function"."arguments" JSON:

[%{"index" => 0, "id" => "call_a", "function" => %{"name" => "search", "arguments" => "{\"q"}}]
[%{"index" => 0, "function" => %{"arguments" => "uery\":\"hi\"}"}}]

Anthropic

Fragments are tagged with _phase and _index (see Nous.StreamNormalizer.Anthropic):

%{"id" => "tu_a", "name" => "search", "_index" => 0, "_phase" => :start}
%{"_index" => 0, "_phase" => :partial, "partial_json" => "{\"q"}
%{"_index" => 0, "_phase" => :partial, "partial_json" => "uery\":\"hi\"}"}
%{"_index" => 0, "_phase" => :stop}

Gemini

Fragments arrive already-complete (Gemini does not split tool-call arguments across chunks):

%{"name" => "search", "arguments" => %{"query" => "hi"}}

API

acc = ToolCallAccumulator.new()
acc = ToolCallAccumulator.feed(acc, fragment)
tool_calls = ToolCallAccumulator.finalize(acc)
# => [%{"id" => "call_a", "name" => "search", "arguments" => %{"query" => "hi"}}]

Summary

Functions

Feed a single {:tool_call_delta, fragment} payload into the accumulator.

Finalize the accumulator into a list of tool calls in the unified shape

Build an empty accumulator.

Types

partial_call()

@type partial_call() :: %{
  id: String.t() | nil,
  name: String.t() | nil,
  args_io: iodata()
}

t()

@type t() :: %{
  openai: %{required(integer()) => partial_call()},
  anthropic: %{required(integer()) => partial_call()},
  gemini: [%{required(String.t()) => term()}]
}

Functions

feed(acc, fragment)

@spec feed(t(), term()) :: t()

Feed a single {:tool_call_delta, fragment} payload into the accumulator.

The fragment shape is detected automatically — see the module doc for the three supported shapes. Unrecognized fragments are silently dropped (the caller has already filtered {:unknown, _} events upstream).

finalize(map)

@spec finalize(t()) :: [%{required(String.t()) => term()}]

Finalize the accumulator into a list of tool calls in the unified shape:

[%{"id" => id_or_nil, "name" => name, "arguments" => decoded_map}, ...]

OpenAI and Anthropic argument buffers are JSON-decoded via Nous.Messages.OpenAI.decode_arguments/1. On malformed JSON the tool call is tagged with "_invalid_arguments" => raw so the agent runner can emit a clean tool-error result instead of invoking the tool with bogus args. Gemini calls already carry decoded arguments.

Order: OpenAI calls sorted by index, then Anthropic calls sorted by _index, then Gemini calls in arrival order. In practice only one of the three is non-empty per response.

new()

@spec new() :: t()

Build an empty accumulator.