ReqServerSentEvents (ReqServerSentEvents v0.2.0)

Copy Markdown View Source

Req plugin for Server-Sent Events (SSE).

Attach to any %Req.Request{} via attach/1. The plugin intercepts Req's three streaming hooks and transparently decodes raw SSE byte chunks into %ReqServerSentEvents.Frame{} structs.

Usage

# into: fun — frames delivered as {:sse_event, %Frame{}} arguments
url
|> Req.new(into: fn {:sse_event, frame}, {req, resp} ->
     IO.inspect(frame)
     {:cont, {req, resp}}
   end)
|> ReqServerSentEvents.attach()
|> Req.get!()

# into: :self — frames sent as messages to the calling process
task = Task.async(fn ->
  url
  |> Req.new(into: :self)
  |> ReqServerSentEvents.attach()
  |> Req.get!()
end)
resp = Task.await(task)
sse_ref = ReqServerSentEvents.ref(resp)
receive do
  {^sse_ref, {:sse_event, frame}} -> IO.inspect(frame)
  {^sse_ref, :sse_done}           -> :done
end

# into: collectable — frames collected into any Collectable
{:ok, resp} =
  url
  |> Req.new(into: [])
  |> ReqServerSentEvents.attach()
  |> Req.get()
frames = resp.body  # [%ReqServerSentEvents.Frame{}, ...]

Telemetry

The plugin emits the following :telemetry events:

  • [:req_server_sent_events, :stream, :start] — emitted on the first decoded chunk of an into: fun or into: :self request. Measurements: %{system_time, monotonic_time}. Metadata: %{request: req}.

  • [:req_server_sent_events, :frame, :decoded] — emitted once per decoded frame in any mode. Measurements: %{bytes: byte_size(raw)}. Metadata: %{frame: frame}.

  • [:req_server_sent_events, :stream, :stop] — emitted in a response step when an into: fun or into: :self stream ends. Only fires if :start fired (i.e. at least one chunk was processed). Measurements: %{monotonic_time, duration}. Metadata: %{request: req, response: resp}.

:start and :stop are not emitted for into: collectable requests — the wrapper has no access to the response. For stream-level timing in that mode, use Req's own [:req, :request, :start | :stop] events.

Summary

Functions

Attach the SSE decoder to a %Req.Request{}.

Return the SSE ref for a into: :self request.

Functions

attach(req, opts \\ [])

@spec attach(
  Req.Request.t(),
  keyword()
) :: Req.Request.t()

Attach the SSE decoder to a %Req.Request{}.

Rewrites req.into to decode raw SSE byte chunks into %ReqServerSentEvents.Frame{} structs before they reach the caller. When into: :self is used, also registers a response step that sends a {ref, :sse_done} sentinel once the stream closes.

Options

  • :max_frame_size — maximum number of bytes allowed to accumulate in the pending-frame buffer between delimiters. If the buffer grows past this limit without seeing a "\n\n", a ReqServerSentEvents.FrameTooLargeError is raised. Defaults to nil (unbounded).

ref(req)

@spec ref(Req.Request.t() | Req.Response.t()) :: reference() | nil

Return the SSE ref for a into: :self request.

Accepts either the final %Req.Request{} or %Req.Response{} — Req's high-level functions (Req.get!/2 etc.) return only the response, while Req.request/2 returns {request, response}.