ReqLLM.Provider.ChunkAccumulator (ReqLLM v1.14.0)

View Source

Shared streaming-chunk reducer used by both ReqLLM.StreamServer (the hot path, one chunk at a time) and ReqLLM.Provider.Defaults.ResponseBuilder (batch, full chunk list at end-of-stream).

Maintains running iodata buffers for text/thinking, a running tool-call list, and per-index argument-fragment buffers. Reasoning details and logprobs are also collected from :meta chunks.

Finalizers

Two different finalizers exist because StreamServer and ResponseBuilder consume the accumulator differently:

  • finalize_tool_calls_for_response/1 — preserves the historical ResponseBuilder contract: returns raw maps with :id, :name, :arguments (decoded JSON when fragments are present, else the raw args from the tool_call chunk). Used to feed ResponseBuilder.normalize_tool_calls/1.

  • finalize_message/1 — preserves the historical StreamServer contract: returns either nil (empty acc) or an assistant %ReqLLM.Message{} ready to attach to OTel content-capture metadata. Text content becomes a single :text ContentPart; tool calls become %ReqLLM.ToolCall{} structs (with builtin flag preserved).

Reasoning text is intentionally not surfaced through finalize_message/1 — OTel content capture redacts it anyway and the canonical response message is built separately by ResponseBuilder with full reasoning details.

Performance notes

The accumulator is on the streaming hot path. To keep push/2 O(1) per chunk we prepend list entries (tool calls, reasoning details, logprobs) and reverse them at finalize time. Text and thinking buffers are iodata — also O(1) per chunk. Argument fragments are iodata buffers keyed by tool-call index, joined only at finalize time. A stream with N chunks costs O(N) total work, not O(N²).

Summary

Functions

Returns the most recently observed finish_reason from meta chunks, or nil if no meta chunk surfaced one. The value is returned raw (atom or string) — callers normalize.

Returns logprob tokens in arrival order.

Returns a partial assistant %ReqLLM.Message{} for OTel content capture, or nil when the accumulator has no text and no tool calls. Reasoning is intentionally nil — OTel content capture redacts it.

Returns reasoning details in arrival order.

Returns the concatenated text content as a binary.

Returns the concatenated thinking content as a binary.

Returns tool calls in the format ResponseBuilder.normalize_tool_calls/1 expects: maps with :id, :name, :arguments, and optionally a :builtin? flag. If argument fragments were observed and decode successfully, arguments are the decoded JSON; otherwise they fall back to the raw arguments captured from the tool_call chunk.

Returns the merged usage map (or nil if no meta chunk surfaced usage).

Returns an empty accumulator.

Folds a single chunk into the accumulator. Hot path — O(1) per chunk.

Folds a list of chunks through push/2. Convenience wrapper for the batch path (ResponseBuilder).

Types

t()

@type t() :: %ReqLLM.Provider.ChunkAccumulator{
  arg_fragments: %{optional(non_neg_integer()) => iodata()},
  finish_reason: atom() | String.t() | nil,
  logprobs: [term()],
  reasoning_details: [term()],
  text_content: iodata(),
  thinking_content: iodata(),
  tool_calls: [tool_call_record()],
  usage: map() | nil
}

tool_call_record()

@type tool_call_record() :: %{
  :id => String.t(),
  :name => String.t(),
  :arguments => term(),
  :index => non_neg_integer(),
  optional(:builtin?) => true,
  optional(:expects_arg_fragments) => true,
  optional(:metadata) => map()
}

Functions

finalize_finish_reason(chunk_accumulator)

@spec finalize_finish_reason(t()) :: atom() | String.t() | nil

Returns the most recently observed finish_reason from meta chunks, or nil if no meta chunk surfaced one. The value is returned raw (atom or string) — callers normalize.

finalize_logprobs(chunk_accumulator)

@spec finalize_logprobs(t()) :: [term()]

Returns logprob tokens in arrival order.

finalize_message(acc)

@spec finalize_message(t()) :: ReqLLM.Message.t() | nil

Returns a partial assistant %ReqLLM.Message{} for OTel content capture, or nil when the accumulator has no text and no tool calls. Reasoning is intentionally nil — OTel content capture redacts it.

finalize_reasoning_details(chunk_accumulator)

@spec finalize_reasoning_details(t()) :: [term()]

Returns reasoning details in arrival order.

finalize_text(chunk_accumulator)

@spec finalize_text(t()) :: String.t()

Returns the concatenated text content as a binary.

finalize_thinking(chunk_accumulator)

@spec finalize_thinking(t()) :: String.t()

Returns the concatenated thinking content as a binary.

finalize_tool_calls_for_response(chunk_accumulator)

@spec finalize_tool_calls_for_response(t()) :: [map()]

Returns tool calls in the format ResponseBuilder.normalize_tool_calls/1 expects: maps with :id, :name, :arguments, and optionally a :builtin? flag. If argument fragments were observed and decode successfully, arguments are the decoded JSON; otherwise they fall back to the raw arguments captured from the tool_call chunk.

finalize_usage(chunk_accumulator)

@spec finalize_usage(t()) :: map() | nil

Returns the merged usage map (or nil if no meta chunk surfaced usage).

new()

@spec new() :: t()

Returns an empty accumulator.

push(acc, chunk)

@spec push(t(), ReqLLM.StreamChunk.t()) :: t()

Folds a single chunk into the accumulator. Hot path — O(1) per chunk.

reduce(acc, chunks)

@spec reduce(t(), [ReqLLM.StreamChunk.t()]) :: t()

Folds a list of chunks through push/2. Convenience wrapper for the batch path (ResponseBuilder).