ReqLLM.Response.Stream (ReqLLM v1.12.0)

View Source

Stream processing utilities for ReqLLM responses.

This module contains helper functions for working with streaming responses, particularly for joining stream chunks into complete responses.

Summary

Types

Summary of accumulated stream data.

Functions

Join a stream of chunks into a complete response.

Summarize a stream of chunks into accumulated data.

Types

summary()

@type summary() :: %{
  text: String.t(),
  thinking: String.t(),
  tool_calls: [map()],
  finish_reason: atom() | nil,
  usage: map() | nil
}

Summary of accumulated stream data.

Contains all the extracted content from a stream of chunks, suitable for building responses or classifying stream results.

Functions

join(stream, response)

@spec join(Enumerable.t(), ReqLLM.Response.t()) ::
  {:ok, ReqLLM.Response.t()} | {:error, term()}

Join a stream of chunks into a complete response.

This function consumes the entire stream, builds the complete message from content chunks, and returns a new response with the stream consumed and message populated.

Implementation Notes

The joining process involves several steps:

  1. Collect all stream chunks by consuming the enumerable
  2. Filter and concatenate content chunks to build the response text
  3. Extract final usage statistics from meta chunks, merging with existing usage
  4. Build a complete assistant message with the concatenated text content
  5. Return an updated response with materialized data and stream cleared

Parameters

  • stream - The stream enumerable containing stream chunks
  • response - The original response to update with materialized data

Returns

  • {:ok, updated_response} on success
  • {:error, %ReqLLM.Error.API.Stream{}} on stream processing failure

summarize(chunks)

@spec summarize(Enumerable.t()) :: summary()

Summarize a stream of chunks into accumulated data.

Processes all chunks and returns a map with:

  • text - Accumulated text content
  • thinking - Accumulated thinking/reasoning content
  • tool_calls - List of reconstructed tool calls with merged argument fragments
  • finish_reason - The finish reason from metadata chunks (normalized to atom)
  • usage - Token usage statistics from metadata chunks

This function is the shared core for both join/2 and ReqLLM.Stream.ToolCalls. It delegates accumulation to ReqLLM.Provider.ChunkAccumulator so the streaming chunk reducer stays a single source of truth.

Examples

chunks = Enum.to_list(stream_response.stream)
summary = ReqLLM.Response.Stream.summarize(chunks)
summary.text        #=> "Hello, world!"
summary.tool_calls  #=> [%{id: "call_123", name: "get_weather", arguments: %{...}}]