LLM.Stream (LLM v0.1.2)

Copy Markdown View Source

Stream handling for LLM API responses.

Basic usage

{:ok, stream} = LLM.stream("Hello", provider: :openai, model: "gpt-4")
{:ok, response} = LLM.Stream.collect(stream, on_chunk: fn c -> IO.write(c.text) end)

For most use cases LLM.generate/2 is simpler — it opens the stream and calls collect/2 for you.

Chunk types

Each call to next/1 returns a list of typed chunk structs:

StructMeaning
LLM.Stream.ChunkText delta (:text field)
LLM.Stream.ThinkingReasoning delta (:text and optional :signature)
LLM.Stream.ToolCallTool invocation (:id, :name, :arguments, :complete)
LLM.Stream.StopEnd of generation (:reason, :usage)
LLM.Stream.ErrorProvider-level error (:message)

Manual iteration

{:ok, stream} = LLM.stream("Hello", provider: :anthropic, model: "claude-sonnet-4-6")

Enum.reduce_while(Stream.iterate(stream, &LLM.Stream.next/1), [], fn
  {:ok, chunks, _s}, acc -> {:cont, acc ++ chunks}
  {:halt, _}, acc -> {:halt, acc}
end)

Tool call loop

When auto_tools: true (default), collect/2 automatically:

  1. Detects complete ToolCall chunks at end of stream
  2. Looks up and executes each tool in context.tools
  3. Appends the assistant turn + tool results to the conversation
  4. Opens a new stream and repeats (up to max_rounds times)

Use :on_message to observe each completed turn. It fires once per finished LLM.Message in conversation order — every assistant turn, and every tool result (as a role: :tool message) — including the final assistant turn:

{:ok, response} = LLM.Stream.collect(stream,
  on_message: fn
    %LLM.Message{role: :assistant} = msg -> IO.puts("assistant: #{msg.content}")
    %LLM.Message{role: :tool} = msg -> IO.puts("tool #{msg.name}: #{msg.content}")
  end
)

Pair it with :on_chunk to observe both token-level deltas and turn-level messages in a single pass. The full reconstructed conversation is also available afterwards as response.messages.

Avoid sending messages to the calling process's mailbox from inside the callback, as next/1's receive loop will consume and discard unknown messages — use an Agent or ETS table to accumulate instead.

Summary

Functions

Collect all chunks into a final response, executing tool calls automatically.

Receive the next chunk from the stream.

Start a streaming request. Returns {:ok, stream} or {:error, reason}.

Types

chunk_type()

t()

@type t() :: %LLM.Stream{
  adapter: module(),
  adapter_state: map(),
  base_message_count: non_neg_integer(),
  buffer: String.t(),
  context: LLM.Context.t(),
  deadline: term(),
  done: boolean(),
  headers: [{String.t(), String.t()}],
  opts: keyword(),
  provider: map(),
  ref: Req.Response.t(),
  rounds: non_neg_integer(),
  timeout: non_neg_integer(),
  url: String.t()
}

Functions

collect(stream, opts \\ [])

@spec collect(
  t(),
  keyword()
) :: {:ok, LLM.Response.t()} | {:error, term()}

Collect all chunks into a final response, executing tool calls automatically.

Options

  • :auto_tools — auto-execute tool calls (default: true)
  • :max_rounds — max tool call rounds (default: 10)
  • :on_chunk — callback for each chunk, fn chunk -> ... end
  • :on_message — callback fired once per completed LLM.Message (each assistant turn and each tool result, in order), fn message -> ... end

next(stream)

@spec next(t()) :: {:ok, [chunk_type()], t()} | {:halt, t()} | {:error, term()}

Receive the next chunk from the stream.

Returns:

  • {:ok, [chunk], stream} — one or more chunks received
  • {:halt, stream} — stream ended
  • {:error, reason} — error occurred

start(context, opts)

@spec start(
  LLM.Context.t(),
  keyword()
) :: {:ok, t()} | {:error, term()}

Start a streaming request. Returns {:ok, stream} or {:error, reason}.