Nous.Providers.HTTP (nous v0.15.4)
View SourceShared HTTP utilities for all LLM providers.
Two HTTP families, both pluggable:
- Non-streaming requests (one-shot model calls, web fetching, search
APIs) go through a
Nous.HTTP.Backend. Default isNous.HTTP.Backend.Req;Nous.HTTP.Backend.Hackneyis also shipped. - Streaming requests (SSE / chunked LLM responses) go through a
Nous.HTTP.StreamBackend. Default isNous.HTTP.StreamBackend.Req(Req's:intocallback driven by Finch);Nous.HTTP.StreamBackend.Hackneyprovides strict pull-based backpressure via:hackney's{:async, :once}mode for callers whose downstream consumers can block per chunk.
Both backend layers resolve via the same precedence: per-call opt → env
var → app config → default. See Nous.HTTP.Backend and
Nous.HTTP.StreamBackend for selection details.
Usage
# Non-streaming request
{:ok, body} = HTTP.post(url, body, headers)
# Streaming request — returns a lazy stream of parsed events
{:ok, stream} = HTTP.stream(url, body, headers)
Enum.each(stream, &process_event/1)
# Per-call backend override
{:ok, stream} = HTTP.stream(url, body, headers,
stream_backend: Nous.HTTP.StreamBackend.Hackney)SSE Parsing
SSE events follow the Server-Sent Events spec (https://html.spec.whatwg.org/multipage/server-sent-events.html):
- Events are separated by double newlines (
\n\n) - Each event contains field lines like
data: {...} - Multiple
data:fields are concatenated with newlines [DONE]signals stream completion (OpenAI convention)
The default SSE parser (parse_sse_buffer/1) is transport-agnostic and
shared by both stream backends. Custom parsers can be plugged in via
the :stream_parser opt; see Nous.Providers.HTTP.JSONArrayParser
for an example.
Stream backpressure
Nous.HTTP.StreamBackend.Req(default): the:intocallback runs in aTaskand feeds the consumer process viasend/2. BEAM mailboxes are unbounded, so a fast producer + slow consumer can grow the consumer's mailbox. Acceptable for typical LLM workloads where the consumer is parsing-bound (and parsing throttles naturally) or where token-generation rate is the bottleneck.Nous.HTTP.StreamBackend.Hackney: strict pull-based — the consumer calls:hackney.stream_next/1per chunk, so the producer literally cannot outrun the consumer. Pick this when downstream consumers can block per chunk (LiveView fan-out, persistence-on-every-chunk, slow IO).
Summary
Functions
Build authorization header for API key auth (Anthropic style).
Build authorization header for Bearer token auth (OpenAI style).
Parse an SSE buffer into events.
Parse a single SSE event.
Make a non-streaming POST request.
Make a streaming POST request.
Functions
Build authorization header for API key auth (Anthropic style).
Returns empty list for nil or empty string values.
Build authorization header for Bearer token auth (OpenAI style).
Returns empty list for nil, empty string, or "not-needed" values.
@spec parse_sse_buffer(String.t() | nil | any()) :: {list(), String.t()} | {:error, :buffer_overflow}
Parse an SSE buffer into events.
Returns {events, remaining_buffer} where events is a list of parsed
JSON maps, {:stream_done, reason} tuples, or {:parse_error, reason} tuples.
Handles edge cases:
- Empty events (ignored)
- Whitespace-only events (ignored)
- Malformed JSON (emits
{:parse_error, reason}) - Multiple data fields per event (concatenated per spec)
- Comment lines (ignored)
- Buffer overflow protection
Examples
iex> parse_sse_buffer("data: {\"text\": \"hi\"}\n\n")
{[%{"text" => "hi"}], ""}
iex> parse_sse_buffer("data: partial")
{[], "data: partial"}
iex> parse_sse_buffer("data: [DONE]\n\n")
{[{:stream_done, "stop"}], ""}
@spec parse_sse_event(String.t()) :: map() | {:stream_done, String.t()} | {:parse_error, term()} | nil
Parse a single SSE event.
Returns parsed JSON map, {:stream_done, reason}, {:parse_error, reason}, or nil.
Handles per SSE spec:
data:fields (with or without space after colon)- Multiple
data:fields concatenated with newlines :prefix for comments (ignored)event:,id:,retry:fields (ignored for now)- Empty lines within events
Examples
iex> parse_sse_event("data: {\"key\": \"value\"}")
%{"key" => "value"}
iex> parse_sse_event("data: [DONE]")
{:stream_done, "stop"}
iex> parse_sse_event(": this is a comment")
nil
iex> parse_sse_event("")
nil
Make a non-streaming POST request.
Dispatches to the configured Nous.HTTP.Backend. Resolution order
(highest precedence first):
- Per-call
:backendopt —HTTP.post(url, body, headers, backend: Nous.HTTP.Backend.Hackney) NOUS_HTTP_BACKENDenv var —req,hackney, or a fully-qualified module name (e.g.MyApp.MyHTTPBackend)Application.get_env(:nous, :http_backend, ...)- Default:
Nous.HTTP.Backend.Req
Returns {:ok, body} or {:error, reason}.
Options
:backend- Backend module (overrides env / config / default):timeout- Request timeout in ms (default: 60_000)
Error Reasons
%{status: integer(), body: term()}- HTTP error response%Mint.TransportError{}- Network error (Req backend)%JSON.DecodeError{}- JSON decode error
Make a streaming POST request.
Dispatches to the configured Nous.HTTP.StreamBackend. Resolution
order (highest precedence first):
- Per-call
:stream_backendopt NOUS_HTTP_STREAM_BACKENDenv var —req,hackney, or a fully-qualified module nameApplication.get_env(:nous, :http_stream_backend, ...)- Default:
Nous.HTTP.StreamBackend.Req
Returns {:ok, stream} where stream is an Enumerable.t() of parsed
events. Events are maps with string keys (parsed JSON),
{:stream_done, reason} tuples on completion, or
{:stream_error, reason} tuples on failure.
Options
:stream_backend- Backend module (overrides env / config / default):timeout- Receive timeout in ms (default: 60_000):connect_timeout- TCP connect timeout in ms (default: 30_000):stream_parser- Module for parsing the stream buffer (default: SSE). Must implementparse_buffer/1returning{events, remaining_buffer}. SeeNous.Providers.HTTP.JSONArrayParserfor an example.:pool- (Hackney backend only) Hackney pool name (default::default).
Error Handling
The stream emits {:stream_error, reason} on errors and then halts.