Normandy.Components.StreamProcessor (normandy v0.6.1)

View Source

Utilities for processing streaming LLM responses.

Provides functions to accumulate streaming events into complete messages, handle text deltas, and build final responses from event streams.

Summary

Functions

Accumulates text deltas from a stream of events.

Builds a complete message from all stream events.

Invokes a callback for each event type.

Functions

accumulate_text(event_stream)

@spec accumulate_text(Enumerable.t()) :: Enumerable.t()

Accumulates text deltas from a stream of events.

Returns a stream that emits accumulated text chunks.

Example

stream
|> parse_stream_events()
|> accumulate_text()
|> Enum.each(&IO.write/1)

build_final_message(events)

@spec build_final_message([Normandy.Components.StreamEvent.t() | map()]) :: map()

Builds a complete message from all stream events.

Accumulates all events and constructs a final response structure similar to non-streaming responses.

Example

events = Enum.to_list(stream)
final_message = StreamProcessor.build_final_message(events)

process_with_callback(event_stream, callback)

@spec process_with_callback(Enumerable.t(), function()) ::
  {:ok, map()} | {:error, term()}

Invokes a callback for each event type.

Callbacks receive (event_type, data) and should return :ok or {:error, reason}.

Example

callback = fn
  :text_delta, text -> IO.write(text)
  :tool_use, tool -> IO.puts("Tool: #{tool.name}")
  _, _ -> :ok
end

StreamProcessor.process_with_callback(stream, callback)