Req plugin for Server-Sent Events (SSE).
Attach to any %Req.Request{} via attach/1. The plugin intercepts
Req's three streaming hooks and transparently decodes raw SSE byte chunks
into %ReqServerSentEvents.Frame{} structs.
Usage
# into: fun — frames delivered as {:sse_event, %Frame{}} arguments
url
|> Req.new(into: fn {:sse_event, frame}, {req, resp} ->
IO.inspect(frame)
{:cont, {req, resp}}
end)
|> ReqServerSentEvents.attach()
|> Req.get!()
# into: :self — frames sent as messages to the calling process
task = Task.async(fn ->
url
|> Req.new(into: :self)
|> ReqServerSentEvents.attach()
|> Req.get!()
end)
resp = Task.await(task)
sse_ref = ReqServerSentEvents.ref(resp)
receive do
{^sse_ref, {:sse_event, frame}} -> IO.inspect(frame)
{^sse_ref, :sse_done} -> :done
end
# into: collectable — frames collected into any Collectable
{:ok, resp} =
url
|> Req.new(into: [])
|> ReqServerSentEvents.attach()
|> Req.get()
frames = resp.body # [%ReqServerSentEvents.Frame{}, ...]Telemetry
The plugin emits the following :telemetry events:
[:req_server_sent_events, :stream, :start]— emitted on the first decoded chunk of aninto: funorinto: :selfrequest. Measurements:%{system_time, monotonic_time}. Metadata:%{request: req}.[:req_server_sent_events, :frame, :decoded]— emitted once per decoded frame in any mode. Measurements:%{bytes: byte_size(raw)}. Metadata:%{frame: frame}.[:req_server_sent_events, :stream, :stop]— emitted in a response step when aninto: funorinto: :selfstream ends. Only fires if:startfired (i.e. at least one chunk was processed). Measurements:%{monotonic_time, duration}. Metadata:%{request: req, response: resp}.
:start and :stop are not emitted for into: collectable requests —
the wrapper has no access to the response. For stream-level timing in that
mode, use Req's own [:req, :request, :start | :stop] events.
Summary
Functions
@spec attach( Req.Request.t(), keyword() ) :: Req.Request.t()
Attach the SSE decoder to a %Req.Request{}.
Rewrites req.into to decode raw SSE byte chunks into
%ReqServerSentEvents.Frame{} structs before they reach the caller.
When into: :self is used, also registers a response step that sends
a {ref, :sse_done} sentinel once the stream closes.
Options
:max_frame_size— maximum number of bytes allowed to accumulate in the pending-frame buffer between delimiters. If the buffer grows past this limit without seeing a"\n\n", aReqServerSentEvents.FrameTooLargeErroris raised. Defaults tonil(unbounded).
@spec ref(Req.Request.t() | Req.Response.t()) :: reference() | nil
Return the SSE ref for a into: :self request.
Accepts either the final %Req.Request{} or %Req.Response{} — Req's
high-level functions (Req.get!/2 etc.) return only the response, while
Req.request/2 returns {request, response}.