Nous.Providers.HTTP (nous v0.15.3)

View Source

Shared HTTP utilities for all LLM providers.

Two HTTP families, deliberately split by use case:

  • Non-streaming requests (one-shot model calls, web fetching, search APIs) go through a pluggable Nous.HTTP.Backend. The default is Nous.HTTP.Backend.Req (Req on top of Finch). Nous.HTTP.Backend.Hackney is also shipped — pick it via per-call :backend opt, the NOUS_HTTP_BACKEND env var, or config :nous, :http_backend, .... See docs/benchmarks/http_backend.md for the trade-offs.
  • Streaming requests (SSE / chunked LLM responses) go through :hackney in :async, :once pull-based mode. Hackney's :async, :once is true pull-based streaming - the consumer calls :hackney.stream_next/1 to ask for one more chunk, the producer reads it off the socket and delivers it as a single message. The consumer paces the producer, so a slow consumer (LiveView assigns + diff + push, slow IO, etc.) can never grow its mailbox unboundedly.

This split fixes M-12 (streaming consumer backpressure) and H-12 (stream lifecycle EXIT handling) from the comprehensive review by eliminating the spawn-and-mailbox plumbing entirely - the Stream.resource consumer is the only process involved.

Usage

# Non-streaming request (Req + Finch)
{:ok, body} = HTTP.post(url, body, headers)

# Streaming request (hackney pull-based, returns lazy stream)
{:ok, stream} = HTTP.stream(url, body, headers)
Enum.each(stream, &process_event/1)

SSE Parsing

SSE events follow the Server-Sent Events spec (https://html.spec.whatwg.org/multipage/server-sent-events.html):

  • Events are separated by double newlines (\n\n)
  • Each event contains field lines like data: {...}
  • Multiple data: fields are concatenated with newlines
  • [DONE] signals stream completion (OpenAI convention)

Hackney pool

Hackney's :default pool starts automatically when the :hackney application boots. Defaults: 50 max connections per pool, 2s idle keepalive timeout. For most LLM workloads (long-lived streams of seconds-to-minutes) the defaults are appropriate. Apps that need a Nous-isolated pool can pass pool: :my_pool per request and start the pool with :hackney_pool.start_pool/2 (or include :hackney_pool.child_spec/2 in their supervision tree).

TLS verification

Streaming requests pass verify: :verify_peer with system CAs from :public_key.cacerts_get/0 explicitly. Do not silently regress this - hackney would otherwise default to :verify_none and accept MITM'd connections.

Summary

Functions

Build authorization header for API key auth (Anthropic style).

Build authorization header for Bearer token auth (OpenAI style).

Parse an SSE buffer into events.

Parse a single SSE event.

Make a non-streaming POST request.

Make a streaming POST request with SSE parsing.

Functions

api_key_header(api_key, header_name)

@spec api_key_header(String.t() | nil, String.t()) :: list()

Build authorization header for API key auth (Anthropic style).

Returns empty list for nil or empty string values.

bearer_auth_header(api_key)

@spec bearer_auth_header(String.t() | nil) :: list()

Build authorization header for Bearer token auth (OpenAI style).

Returns empty list for nil, empty string, or "not-needed" values.

parse_sse_buffer(buffer)

@spec parse_sse_buffer(String.t() | nil | any()) ::
  {list(), String.t()} | {:error, :buffer_overflow}

Parse an SSE buffer into events.

Returns {events, remaining_buffer} where events is a list of parsed JSON maps, {:stream_done, reason} tuples, or {:parse_error, reason} tuples.

Handles edge cases:

  • Empty events (ignored)
  • Whitespace-only events (ignored)
  • Malformed JSON (emits {:parse_error, reason})
  • Multiple data fields per event (concatenated per spec)
  • Comment lines (ignored)
  • Buffer overflow protection

Examples

iex> parse_sse_buffer("data: {\"text\": \"hi\"}\n\n")
{[%{"text" => "hi"}], ""}

iex> parse_sse_buffer("data: partial")
{[], "data: partial"}

iex> parse_sse_buffer("data: [DONE]\n\n")
{[{:stream_done, "stop"}], ""}

parse_sse_event(event)

@spec parse_sse_event(String.t()) ::
  map() | {:stream_done, String.t()} | {:parse_error, term()} | nil

Parse a single SSE event.

Returns parsed JSON map, {:stream_done, reason}, {:parse_error, reason}, or nil.

Handles per SSE spec:

  • data: fields (with or without space after colon)
  • Multiple data: fields concatenated with newlines
  • : prefix for comments (ignored)
  • event:, id:, retry: fields (ignored for now)
  • Empty lines within events

Examples

iex> parse_sse_event("data: {\"key\": \"value\"}")
%{"key" => "value"}

iex> parse_sse_event("data: [DONE]")
{:stream_done, "stop"}

iex> parse_sse_event(": this is a comment")
nil

iex> parse_sse_event("")
nil

post(url, body, headers, opts \\ [])

@spec post(String.t(), map(), list(), keyword()) :: {:ok, map()} | {:error, term()}

Make a non-streaming POST request.

Dispatches to the configured Nous.HTTP.Backend. Resolution order (highest precedence first):

  1. Per-call :backend opt — HTTP.post(url, body, headers, backend: Nous.HTTP.Backend.Hackney)
  2. NOUS_HTTP_BACKEND env var — req, hackney, or a fully-qualified module name (e.g. MyApp.MyHTTPBackend)
  3. Application.get_env(:nous, :http_backend, ...)
  4. Default: Nous.HTTP.Backend.Req

Returns {:ok, body} or {:error, reason}.

Options

  • :backend - Backend module (overrides env / config / default)
  • :timeout - Request timeout in ms (default: 60_000)

Error Reasons

  • %{status: integer(), body: term()} - HTTP error response
  • %Mint.TransportError{} - Network error (Req backend)
  • %JSON.DecodeError{} - JSON decode error

stream(url, body, headers, opts \\ [])

@spec stream(String.t(), map(), list(), keyword()) ::
  {:ok, Enumerable.t()} | {:error, term()}

Make a streaming POST request with SSE parsing.

Returns {:ok, stream} where stream is an Enumerable of parsed events. Events are maps with string keys (parsed JSON) or {:stream_done, reason} tuples.

Options

  • :timeout - Receive timeout in ms (default: 60_000) — passed to hackney as :recv_timeout.
  • :connect_timeout - TCP connect timeout in ms (default: 30_000).
  • :pool - Hackney pool name (default: :default). Configure the default pool via config :nous, :hackney_pool, max_connections: ..., timeout: ... or start a dedicated pool with :hackney_pool.start_pool/2.
  • :stream_parser - Module for parsing the stream buffer (default: SSE parsing). Must implement parse_buffer/1 returning {events, remaining_buffer}. See Nous.Providers.HTTP.JSONArrayParser for an example.
  • :finch_name - Ignored. Kept for source compatibility with callers from before the 0.15.0 hackney rewrite. Will be removed in a future release.

Error Handling

The stream will emit {:stream_error, reason} on errors and then halt.