Mix.install([
{:req, "~> 0.5.17"},
{:server_sent_events, "1.1.0"}
])Req+Claude+SSE stream
To run this livebook, you must have a valid ANTHROPIC_API_KEY environment variable enabled for this livebook.
defmodule Claude do
def stream_messages!(request) do
%Req.Response{status: 200, body: response_body} =
Req.post!("https://api.anthropic.com/v1/messages",
json: Map.put(request, :stream, true),
into: :self,
headers: %{
"x-api-key" => api_key(),
"anthropic-version" => "2023-06-01"
}
)
response_body
|> ServerSentEvents.decode_stream()
|> Stream.map(fn event -> JSON.decode!(event.data) end)
end
defp api_key do
System.get_env("LB_ANTHROPIC_API_KEY")
end
endIn the above module, you'll notice that we JSON.decode!/1 all events in the stream. As a slight optimization, you may choose to instead parse a subset. For example, the code below only cares about the content_block_delta event. If only that event is desired, you could filter on events with that type before JSON decoding, reducing the amount of JSON.decode calls.
stream =
Claude.stream_messages!(%{
model: "claude-haiku-4-5",
max_tokens: 8192,
system: """
You are a poet, fluent in multiple languages and emoji usage.
""",
messages: [
%{
role: :user,
content: "Write a poem across multiple languages and emojis!"
}
]
})
Enum.each(stream, fn event ->
with %{"type" => "content_block_delta", "delta" => %{"text" => text}} <- event do
IO.write(text)
end
end)Req+OpenAI+SSE Parse
To run this livebook, you must have a valid OPENAI_API_KEY environment variable enabled for this livebook.
defmodule OpenAI do
def responses(request, callback \\ &Function.identity/1) do
Req.new(
base_url: "https://api.openai.com",
auth: {:bearer, System.fetch_env!("LB_OPENAI_API_KEY")},
method: :post,
url: "/v1/responses",
json: Map.put(request, :stream, true),
into: fn
{:data, data}, {req, %{status: 200} = res} ->
state = Req.Request.get_private(req, :sse_state, ServerSentEvents.Parser.new())
{events, state} = ServerSentEvents.Parser.parse(state, data)
req = Req.Request.put_private(req, :sse_state, state)
res =
Enum.reduce(events, res, fn
%{data: encoded, event: "response.output_text.delta"}, res ->
%{"delta" => delta} = JSON.decode!(encoded)
callback.(delta)
res
%{data: encoded, event: "response.completed"}, res ->
%{"response" => response} = JSON.decode!(encoded)
%{res | body: response}
_, res ->
res
end)
{:cont, {req, res}}
{:data, data}, {req, res} -> {:cont, {req, %{res | body: data}}}
end
)
end
endIn this example we use a different Req's streaming style. Instead of setting into: :self and decoding a stream of incoming messages, we equip Req with 2-arity callback that is called on each chunk. In the callback, we use ServerSentEvents.Parser to parse incoming chunks while maintaining parser state in request struct. For each parsed response.output_text.delta event, we pass it to a client-provided callback, which can, for example, send delta to a LiveView process. Finally, once we receive response.completed event with a complete response body, we use this body as a final request result. This is useful if the caller wants to access final response body in addition to streamed deltas.
%{
model: "gpt-5.4-nano",
reasoning: %{effort: "none"},
instructions: "You are a poet, fluent in multiple languages and emoji usage.",
input: "Write a poem across multiple languages and emojis!"
}
|> OpenAI.responses(&IO.write/1)
|> Req.request!()