Nous.Providers.HTTP (nous v0.15.5)

View Source

Shared HTTP utilities for all LLM providers.

Two HTTP families, both pluggable:

Both backend layers resolve via the same precedence: per-call opt → env var → app config → default. See Nous.HTTP.Backend and Nous.HTTP.StreamBackend for selection details.

Usage

# Non-streaming request
{:ok, body} = HTTP.post(url, body, headers)

# Streaming request — returns a lazy stream of parsed events
{:ok, stream} = HTTP.stream(url, body, headers)
Enum.each(stream, &process_event/1)

# Per-call backend override
{:ok, stream} = HTTP.stream(url, body, headers,
  stream_backend: Nous.HTTP.StreamBackend.Hackney)

SSE Parsing

SSE events follow the Server-Sent Events spec (https://html.spec.whatwg.org/multipage/server-sent-events.html):

  • Events are separated by double newlines (\n\n)
  • Each event contains field lines like data: {...}
  • Multiple data: fields are concatenated with newlines
  • [DONE] signals stream completion (OpenAI convention)

The default SSE parser (parse_sse_buffer/1) is transport-agnostic and shared by both stream backends. Custom parsers can be plugged in via the :stream_parser opt; see Nous.Providers.HTTP.JSONArrayParser for an example.

Stream backpressure

  • Nous.HTTP.StreamBackend.Req (default): the :into callback runs in a Task and feeds the consumer process via send/2. BEAM mailboxes are unbounded, so a fast producer + slow consumer can grow the consumer's mailbox. Acceptable for typical LLM workloads where the consumer is parsing-bound (and parsing throttles naturally) or where token-generation rate is the bottleneck.
  • Nous.HTTP.StreamBackend.Hackney: strict pull-based — the consumer calls :hackney.stream_next/1 per chunk, so the producer literally cannot outrun the consumer. Pick this when downstream consumers can block per chunk (LiveView fan-out, persistence-on-every-chunk, slow IO).

Summary

Functions

Build authorization header for API key auth (Anthropic style).

Build authorization header for Bearer token auth (OpenAI style).

Parse an SSE buffer into events.

Parse a single SSE event.

Make a non-streaming POST request.

Make a streaming POST request.

Functions

api_key_header(api_key, header_name)

@spec api_key_header(String.t() | nil, String.t()) :: list()

Build authorization header for API key auth (Anthropic style).

Returns empty list for nil or empty string values.

bearer_auth_header(api_key)

@spec bearer_auth_header(String.t() | nil) :: list()

Build authorization header for Bearer token auth (OpenAI style).

Returns empty list for nil, empty string, or "not-needed" values.

parse_sse_buffer(buffer)

@spec parse_sse_buffer(String.t() | nil | any()) ::
  {list(), String.t()} | {:error, :buffer_overflow}

Parse an SSE buffer into events.

Returns {events, remaining_buffer} where events is a list of parsed JSON maps, {:stream_done, reason} tuples, or {:parse_error, reason} tuples.

Handles edge cases:

  • Empty events (ignored)
  • Whitespace-only events (ignored)
  • Malformed JSON (emits {:parse_error, reason})
  • Multiple data fields per event (concatenated per spec)
  • Comment lines (ignored)
  • Buffer overflow protection

Examples

iex> parse_sse_buffer("data: {\"text\": \"hi\"}\n\n")
{[%{"text" => "hi"}], ""}

iex> parse_sse_buffer("data: partial")
{[], "data: partial"}

iex> parse_sse_buffer("data: [DONE]\n\n")
{[{:stream_done, "stop"}], ""}

parse_sse_event(event)

@spec parse_sse_event(String.t()) ::
  map() | {:stream_done, String.t()} | {:parse_error, term()} | nil

Parse a single SSE event.

Returns parsed JSON map, {:stream_done, reason}, {:parse_error, reason}, or nil.

Handles per SSE spec:

  • data: fields (with or without space after colon)
  • Multiple data: fields concatenated with newlines
  • : prefix for comments (ignored)
  • event:, id:, retry: fields (ignored for now)
  • Empty lines within events

Examples

iex> parse_sse_event("data: {\"key\": \"value\"}")
%{"key" => "value"}

iex> parse_sse_event("data: [DONE]")
{:stream_done, "stop"}

iex> parse_sse_event(": this is a comment")
nil

iex> parse_sse_event("")
nil

post(url, body, headers, opts \\ [])

@spec post(String.t(), map(), list(), keyword()) :: {:ok, map()} | {:error, term()}

Make a non-streaming POST request.

Dispatches to the configured Nous.HTTP.Backend. Resolution order (highest precedence first):

  1. Per-call :backend opt — HTTP.post(url, body, headers, backend: Nous.HTTP.Backend.Hackney)
  2. NOUS_HTTP_BACKEND env var — req, hackney, or a fully-qualified module name (e.g. MyApp.MyHTTPBackend)
  3. Application.get_env(:nous, :http_backend, ...)
  4. Default: Nous.HTTP.Backend.Req

Returns {:ok, body} or {:error, reason}.

Options

  • :backend - Backend module (overrides env / config / default)
  • :timeout - Request timeout in ms (default: 180_000)

Error Reasons

  • %{status: integer(), body: term()} - HTTP error response
  • %Mint.TransportError{} - Network error (Req backend)
  • %JSON.DecodeError{} - JSON decode error

stream(url, body, headers, opts \\ [])

@spec stream(String.t(), map(), list(), keyword()) ::
  {:ok, Enumerable.t()} | {:error, term()}

Make a streaming POST request.

Dispatches to the configured Nous.HTTP.StreamBackend. Resolution order (highest precedence first):

  1. Per-call :stream_backend opt
  2. NOUS_HTTP_STREAM_BACKEND env var — req, hackney, or a fully-qualified module name
  3. Application.get_env(:nous, :http_stream_backend, ...)
  4. Default: Nous.HTTP.StreamBackend.Req

Returns {:ok, stream} where stream is an Enumerable.t() of parsed events. Events are maps with string keys (parsed JSON), {:stream_done, reason} tuples on completion, or {:stream_error, reason} tuples on failure.

Options

  • :stream_backend - Backend module (overrides env / config / default)
  • :timeout - Receive timeout in ms (default: 180_000)
  • :connect_timeout - TCP connect timeout in ms (default: 30_000)
  • :stream_parser - Module for parsing the stream buffer (default: SSE). Must implement parse_buffer/1 returning {events, remaining_buffer}. See Nous.Providers.HTTP.JSONArrayParser for an example.
  • :pool - (Hackney backend only) Hackney pool name (default: :default).

Error Handling

The stream emits {:stream_error, reason} on errors and then halts.