ServerSentEvents Req Example

Copy Markdown View Source
Mix.install([
  {:req, "~> 0.5.17"},
  {:server_sent_events, "1.1.0"}
])

Req+Claude+SSE stream

To run this livebook, you must have a valid ANTHROPIC_API_KEY environment variable enabled for this livebook.

defmodule Claude do
  def stream_messages!(request) do
    %Req.Response{status: 200, body: response_body} =
      Req.post!("https://api.anthropic.com/v1/messages",
        json: Map.put(request, :stream, true),
        into: :self,
        headers: %{
          "x-api-key" => api_key(),
          "anthropic-version" => "2023-06-01"
        }
      )

    response_body
    |> ServerSentEvents.decode_stream()
    |> Stream.map(fn event -> JSON.decode!(event.data) end)
  end

  defp api_key do
    System.get_env("LB_ANTHROPIC_API_KEY")
  end
end

In the above module, you'll notice that we JSON.decode!/1 all events in the stream. As a slight optimization, you may choose to instead parse a subset. For example, the code below only cares about the content_block_delta event. If only that event is desired, you could filter on events with that type before JSON decoding, reducing the amount of JSON.decode calls.

stream =
  Claude.stream_messages!(%{
    model: "claude-haiku-4-5",
    max_tokens: 8192,
    system: """
    You are a poet, fluent in multiple languages and emoji usage.
    """,
    messages: [
      %{
        role: :user,
        content: "Write a poem across multiple languages and emojis!"
      }
    ]
  })

Enum.each(stream, fn event ->
  with %{"type" => "content_block_delta", "delta" => %{"text" => text}} <- event do
    IO.write(text)
  end
end)

Req+OpenAI+SSE Parse

To run this livebook, you must have a valid OPENAI_API_KEY environment variable enabled for this livebook.

defmodule OpenAI do
  def responses(request, callback \\ &Function.identity/1) do
    Req.new(
      base_url: "https://api.openai.com",
      auth: {:bearer, System.fetch_env!("LB_OPENAI_API_KEY")},
      method: :post,
      url: "/v1/responses",
      json: Map.put(request, :stream, true),
      into: fn
        {:data, data}, {req, %{status: 200} = res} ->
          state = Req.Request.get_private(req, :sse_state, ServerSentEvents.Parser.new())
          {events, state} = ServerSentEvents.Parser.parse(state, data)
          req = Req.Request.put_private(req, :sse_state, state)
  
          res =
            Enum.reduce(events, res, fn
              %{data: encoded, event: "response.output_text.delta"}, res ->
                %{"delta" => delta} = JSON.decode!(encoded)
                callback.(delta)
                res
  
              %{data: encoded, event: "response.completed"}, res ->
                %{"response" => response} = JSON.decode!(encoded)
                %{res | body: response}
  
              _, res ->
                res
            end)
  
          {:cont, {req, res}}

        {:data, data}, {req, res} -> {:cont, {req, %{res | body: data}}}
      end
    )
  end
end

In this example we use a different Req's streaming style. Instead of setting into: :self and decoding a stream of incoming messages, we equip Req with 2-arity callback that is called on each chunk. In the callback, we use ServerSentEvents.Parser to parse incoming chunks while maintaining parser state in request struct. For each parsed response.output_text.delta event, we pass it to a client-provided callback, which can, for example, send delta to a LiveView process. Finally, once we receive response.completed event with a complete response body, we use this body as a final request result. This is useful if the caller wants to access final response body in addition to streamed deltas.

%{
  model: "gpt-5.4-nano",
  reasoning: %{effort: "none"},
  instructions: "You are a poet, fluent in multiple languages and emoji usage.",
  input: "Write a poem across multiple languages and emojis!"
}
|> OpenAI.responses(&IO.write/1)
|> Req.request!()