Streaming is the primitive in ALLM. The non-streaming entry points
(generate/3, step/3, chat/3) are reducers over a stream — every
provider adapter implements both shapes, but the streaming path is the
canonical one. This guide covers when to stream, what events to expect,
and how to control the firehose.
When to stream
Reach for stream_generate/3 or stream/3 when you want incremental
output — typing-effect UIs, progressive tool-call dispatch, latency
hiding for long completions. The non-streaming variants give you a
single %Response{} (or %ChatResult{}) at the end; the streaming
variants give you a lazy Enumerable of ALLM.Event.t/0 values you fold
over yourself.
The simplest stream
iex> engine = ALLM.Engine.new(
...> adapter: ALLM.Providers.Fake,
...> adapter_opts: [
...> stream_script: [[
...> {:text_delta, "Hello"},
...> {:text_delta, ", "},
...> {:text_delta, "world"},
...> {:finish, :stop}
...> ]]
...> ]
...> )
iex> req = ALLM.request([ALLM.user("hi")])
iex> {:ok, stream} = ALLM.stream_generate(engine, req)
iex> Enum.any?(Enum.to_list(stream), &match?({:message_completed, _}, &1))
truestream_generate/3 returns {:ok, stream}. The stream is lazy; it
doesn't dispatch to the provider until you start consuming. Common
consumption patterns:
# Print every text delta as it arrives.
{:ok, stream} = ALLM.stream_generate(engine, req)
stream
|> Stream.each(fn
{:text_delta, %{delta: chunk}} -> IO.write(chunk)
_ -> :ok
end)
|> Stream.run()# Collect the full response synchronously (equivalent to generate/3).
{:ok, stream} = ALLM.stream_generate(engine, req)
{:ok, response} = ALLM.StreamCollector.collect(stream)The event union
Every event is a tagged tuple with a payload map. The closed set:
| Tag | When it fires | Payload keys |
|---|---|---|
{:request_started, _} | Before the first byte to the provider | :request, :engine_summary |
{:text_delta, _} | Each chunk of assistant text | :delta, :cumulative |
{:tool_call_delta, _} | Each chunk of a tool-call argument blob | :index, :delta, :cumulative_args |
{:tool_call, _} | A complete tool call has assembled | :tool_call |
{:tool_result, _} | A tool executed and returned a result | :tool_call_id, :result, :status |
{:message_completed, _} | The assistant message finished | :response |
{:step_completed, _} | One round-trip completed (chat/3 emits this each loop iteration) | :step_result, :mode |
{:halted, _} | The chat loop halted (manual mode, ask-user, etc.) | :reason, :metadata |
{:raw_chunk, _} | Provider-native chunk passthrough (when :include_raw_chunks is on) | :chunk |
{:error, _} | Mid-stream error from the provider | :error |
Pattern-matching on a payload key is not exhaustive — adding new keys to a payload map is non-breaking. Match on the leading tag.
Filter opts
Most consumers don't need every event. stream_generate/3 and
stream/3 accept filter options:
:emit_text_deltas(defaulttrue) — set tofalseto drop:text_deltaevents.:emit_tool_deltas(defaulttrue) — set tofalseto drop:tool_call_deltaevents; you'll still receive the assembled:tool_callevent.:include_raw_chunks(defaultfalse) — set totrueto receive:raw_chunkevents with provider-native chunks (useful for passthrough proxies).:on_event— a 1-arity function called on every event before the consumer sees it. Useful for telemetry instrumentation that doesn't need to mutate the stream.
{:ok, stream} = ALLM.stream_generate(engine, req,
emit_tool_deltas: false,
on_event: fn event -> :telemetry.execute([:my_app, :llm, :event], %{}, %{event: event}) end
)stream/3 — the multi-turn streaming loop
ALLM.stream/3 is chat/3 plus streaming. It runs the auto-loop —
calling tools as they're requested, feeding results back in, looping
until the model stops asking — and emits events the entire way. You'll
see a :step_completed event per loop iteration, and a final
:message_completed when the loop exits.
iex> engine = ALLM.Engine.new(
...> adapter: ALLM.Providers.Fake,
...> adapter_opts: [
...> stream_script: [[
...> {:text_delta, "done"},
...> {:finish, :stop}
...> ]]
...> ]
...> )
iex> {:ok, stream} = ALLM.stream(engine, [ALLM.user("hi")])
iex> events = Enum.to_list(stream)
iex> Enum.any?(events, &match?({:message_completed, _}, &1))
trueCancellation and cleanup
The stream returned by stream_generate/3 / stream/3 is built on
Stream.resource/3. When the consumer halts early — Enum.take/2,
breaking out of Enum.reduce_while/3, the consumer process crashing —
the resource's after_fun runs and tears down the underlying HTTP
connection. You don't need to call any explicit cancel function.
If you want to halt the stream after a fixed number of text deltas:
{:ok, stream} = ALLM.stream_generate(engine, req)
stream
|> Stream.filter(&match?({:text_delta, _}, &1))
|> Enum.take(10)Taking 10 elements halts the stream; the underlying HTTP connection is closed automatically.
Mid-stream errors
A mid-stream {:error, struct} event surfaces as
{:ok, %Response{finish_reason: :error, metadata: %{error: struct}}}
from the non-streaming variants — the call-site tuple stays {:ok, _}.
For streaming consumers, you see the {:error, _} event in the stream
itself. The stream then ends; subsequent enumeration yields no further
events.
{:ok, stream} = ALLM.stream_generate(engine, req)
Enum.each(stream, fn
{:error, %ALLM.Error.AdapterError{reason: :rate_limited}} ->
IO.puts("backing off")
{:text_delta, %{delta: chunk}} ->
IO.write(chunk)
_ ->
:ok
end)If you're using StreamCollector.collect/1, the collector folds the
mid-stream error into the response struct's :metadata.error field and
returns {:ok, response} with finish_reason: :error. Pre-flight
errors (validation failures, missing adapter) surface as {:error, _}
at the stream_generate/3 call site, before the stream is built.
Where to next
tools.md— streaming + tool calls.errors_and_retries.md— retry policy for transient errors.examples/02_streaming_text.exs— runnable smoke test against any provider.