Ragex.Agent.StreamConsumer (Ragex v0.12.0)

View Source

Consumes a provider stream and accumulates a response compatible with generate/3.

Forwards chunks to an optional callback in real-time while building the final response map. This bridges the gap between streaming providers and the Executor's generate-based loop.

Usage

{:ok, stream} = provider.stream_generate(prompt, context, opts)

{:ok, response} = StreamConsumer.consume(stream,
  on_chunk: fn chunk -> IO.write(chunk.content) end,
  on_phase: fn phase -> IO.puts("Phase: #{phase}") end
)

# response is compatible with provider.generate/3 return format:
# %{content: "...", reasoning_content: "...", tool_calls: nil, usage: %{}, ...}

Summary

Functions

Consume a provider stream, forwarding chunks to callbacks.

Types

chunk()

@type chunk() :: %{
  content: String.t(),
  thinking: String.t() | nil,
  done: boolean(),
  metadata: map()
}

response()

@type response() :: %{
  content: String.t(),
  reasoning_content: String.t() | nil,
  tool_calls: nil,
  model: String.t() | nil,
  usage: map(),
  metadata: map()
}

Functions

consume(stream, opts \\ [])

@spec consume(
  Enumerable.t(),
  keyword()
) :: {:ok, response()} | {:error, term()}

Consume a provider stream, forwarding chunks to callbacks.

Returns a response map compatible with what provider.generate/3 returns.

Options

  • :on_chunk - (chunk -> :ok) callback invoked for each content/thinking chunk
  • :on_phase - (:thinking | :answering | :done -> :ok) callback on phase transitions

  • :on_tool_progress - (map() -> :ok) callback for tool-call iteration progress

Returns

  • {:ok, response} - Accumulated response (content may be empty if LLM returned tool_calls)
  • {:error, reason} - Stream produced an error