Stream handling for LLM API responses.
Basic usage
{:ok, stream} = LLM.stream("Hello", provider: :openai, model: "gpt-4")
{:ok, response} = LLM.Stream.collect(stream, on_chunk: fn c -> IO.write(c.text) end)For most use cases LLM.generate/2 is simpler — it opens the stream and
calls collect/2 for you.
Chunk types
Each call to next/1 returns a list of typed chunk structs:
| Struct | Meaning |
|---|---|
LLM.Stream.Chunk | Text delta (:text field) |
LLM.Stream.Thinking | Reasoning delta (:text and optional :signature) |
LLM.Stream.ToolCall | Tool invocation (:id, :name, :arguments, :complete) |
LLM.Stream.Stop | End of generation (:reason, :usage) |
LLM.Stream.Error | Provider-level error (:message) |
Manual iteration
{:ok, stream} = LLM.stream("Hello", provider: :anthropic, model: "claude-sonnet-4-6")
Enum.reduce_while(Stream.iterate(stream, &LLM.Stream.next/1), [], fn
{:ok, chunks, _s}, acc -> {:cont, acc ++ chunks}
{:halt, _}, acc -> {:halt, acc}
end)Tool call loop
When auto_tools: true (default), collect/2 automatically:
- Detects complete
ToolCallchunks at end of stream - Looks up and executes each tool in
context.tools - Appends the assistant turn + tool results to the conversation
- Opens a new stream and repeats (up to
max_roundstimes)
Use :on_message to observe each completed turn. It fires once per finished
LLM.Message in conversation order — every assistant turn, and every tool
result (as a role: :tool message) — including the final assistant turn:
{:ok, response} = LLM.Stream.collect(stream,
on_message: fn
%LLM.Message{role: :assistant} = msg -> IO.puts("assistant: #{msg.content}")
%LLM.Message{role: :tool} = msg -> IO.puts("tool #{msg.name}: #{msg.content}")
end
)Pair it with :on_chunk to observe both token-level deltas and turn-level
messages in a single pass. The full reconstructed conversation is also
available afterwards as response.messages.
Avoid sending messages to the calling process's mailbox from inside the
callback, as next/1's receive loop will consume and discard unknown
messages — use an Agent or ETS table to accumulate instead.
Summary
Functions
Collect all chunks into a final response, executing tool calls automatically.
Receive the next chunk from the stream.
Start a streaming request. Returns {:ok, stream} or {:error, reason}.
Types
@type chunk_type() :: LLM.Stream.Chunk.t() | LLM.Stream.ToolCall.t() | LLM.Stream.Thinking.t() | LLM.Stream.Stop.t() | LLM.Stream.Error.t()
@type t() :: %LLM.Stream{ accumulated_usage: LLM.Usage.t(), adapter: module(), adapter_state: map(), base_message_count: non_neg_integer(), buffer: String.t(), context: LLM.Context.t(), deadline: term(), done: boolean(), headers: [{String.t(), String.t()}], opts: keyword(), provider: map(), ref: Req.Response.t(), rounds: non_neg_integer(), timeout: non_neg_integer(), url: String.t() }
Functions
@spec collect( t(), keyword() ) :: {:ok, LLM.Response.t()} | {:error, term()}
Collect all chunks into a final response, executing tool calls automatically.
Options
:auto_tools— auto-execute tool calls (default:true):max_rounds— max tool call rounds (default:10):on_chunk— callback for each chunk,fn chunk -> ... end:on_message— callback fired once per completedLLM.Message(each assistant turn and each tool result, in order),fn message -> ... end
@spec next(t()) :: {:ok, [chunk_type()], t()} | {:halt, t()} | {:error, term()}
Receive the next chunk from the stream.
Returns:
{:ok, [chunk], stream}— one or more chunks received{:halt, stream}— stream ended{:error, reason}— error occurred
@spec start( LLM.Context.t(), keyword() ) :: {:ok, t()} | {:error, term()}
Start a streaming request. Returns {:ok, stream} or {:error, reason}.