Normandy.Components.StreamProcessor
(normandy v0.6.2)
View Source
Utilities for processing streaming LLM responses.
Provides functions to accumulate streaming events into complete messages, handle text deltas, and build final responses from event streams.
Summary
Functions
Accumulates text deltas from a stream of events.
Builds a complete message from all stream events.
Invokes a callback for each event type.
Functions
@spec accumulate_text(Enumerable.t()) :: Enumerable.t()
Accumulates text deltas from a stream of events.
Returns a stream that emits accumulated text chunks.
Example
stream
|> parse_stream_events()
|> accumulate_text()
|> Enum.each(&IO.write/1)
@spec build_final_message([Normandy.Components.StreamEvent.t() | map()]) :: map()
Builds a complete message from all stream events.
Accumulates all events and constructs a final response structure similar to non-streaming responses.
Example
events = Enum.to_list(stream)
final_message = StreamProcessor.build_final_message(events)
@spec process_with_callback(Enumerable.t(), function()) :: {:ok, map()} | {:error, term()}
Invokes a callback for each event type.
Callbacks receive (event_type, data) and should return :ok or {:error, reason}.
Example
callback = fn
:text_delta, text -> IO.write(text)
:tool_use, tool -> IO.puts("Tool: #{tool.name}")
_, _ -> :ok
end
StreamProcessor.process_with_callback(stream, callback)