Ragex.Agent.StreamConsumer
(Ragex v0.14.0)
View Source
Consumes a provider stream and accumulates a response compatible with generate/3.
Forwards chunks to an optional callback in real-time while building the final response map. This bridges the gap between streaming providers and the Executor's generate-based loop.
Usage
{:ok, stream} = provider.stream_generate(prompt, context, opts)
{:ok, response} = StreamConsumer.consume(stream,
on_chunk: fn chunk -> IO.write(chunk.content) end,
on_phase: fn phase -> IO.puts("Phase: #{phase}") end
)
# response is compatible with provider.generate/3 return format:
# %{content: "...", reasoning_content: "...", tool_calls: nil, usage: %{}, ...}
Summary
Functions
Consume a provider stream, forwarding chunks to callbacks.
Types
Functions
@spec consume( Enumerable.t(), keyword() ) :: {:ok, response()} | {:error, term()}
Consume a provider stream, forwarding chunks to callbacks.
Returns a response map compatible with what provider.generate/3 returns.
Options
:on_chunk-(chunk -> :ok)callback invoked for each content/thinking chunk:on_phase-(:thinking | :answering | :done -> :ok)callback on phase transitions:on_tool_progress-(map() -> :ok)callback for tool-call iteration progress
Returns
{:ok, response}- Accumulated response (content may be empty if LLM returned tool_calls){:error, reason}- Stream produced an error