Claudio.Messages.Stream (Claudio v0.5.0)

View Source

Utilities for parsing and consuming Server-Sent Events (SSE) from streaming Messages API responses.

Streaming usage telemetry is emitted via [:claudio, :messages, :stream, :usage] when parse_events/1 reaches the terminal message_stop event and final usage is available from message_delta frames.

Event Types

The Messages API streaming responses include the following event types:

  • message_start - Initial message with empty content
  • content_block_start - Beginning of a content block
  • content_block_delta - Incremental content updates (text, JSON, thinking)
  • content_block_stop - End of a content block
  • message_delta - Top-level message changes (usage updates)
  • message_stop - Stream completion
  • ping - Keep-alive events
  • error - Error events

Example

response = Claudio.Messages.create_message(client, request)

response
|> Claudio.Messages.Stream.parse_events()
|> Stream.filter(&match?({:ok, %{event: "content_block_delta"}}, &1))
|> Enum.each(fn {:ok, event} ->
  IO.puts(event.data["delta"]["text"])
end)

Summary

Functions

Accumulates text deltas from streaming events into complete text chunks.

Accumulates all events and returns the final complete message.

Filters stream to only specific event types.

Parses Server-Sent Events from a streaming response body.

Types

event()

@type event() :: %{event: String.t(), data: map() | nil}

parsed_event()

@type parsed_event() :: {:ok, event()} | {:error, term()}

Functions

accumulate_text(event_stream)

@spec accumulate_text(Enumerable.t()) :: Enumerable.t()

Accumulates text deltas from streaming events into complete text chunks.

Example

response
|> Stream.parse_events()
|> Stream.accumulate_text()
|> Enum.each(&IO.puts/1)

build_final_message(event_stream)

@spec build_final_message(Enumerable.t()) :: {:ok, map()} | {:error, term()}

Accumulates all events and returns the final complete message.

Example

{:ok, final_message} =
  response
  |> Stream.parse_events()
  |> Stream.build_final_message()

filter_events(event_stream, event_types)

@spec filter_events(Enumerable.t(), [String.t()]) :: Enumerable.t()

Filters stream to only specific event types.

Example

response
|> Stream.parse_events()
|> Stream.filter_events(["content_block_delta", "message_stop"])

parse_events(stream)

@spec parse_events(Enumerable.t()) :: Enumerable.t()

Parses Server-Sent Events from a streaming response body.

Returns a Stream of {:ok, event} or {:error, reason} tuples.

Example

response
|> Stream.parse_events()
|> Enum.to_list()