livery_client (livery v0.4.1)

View Source

A composable HTTP client: the outbound twin of the server middleware.

Build a client once with new/1 (a transport adapter, a base URL, default headers, and a layer stack), then call it with get/2, post/3, request/3,4. Layers run outermost-first and each is call(Request, Next, State) -> {ok, response()} | {error, term()}, the same shape as server middleware, with errors threaded as values.

Client = livery_client:new(#{
    base_url => <<"https://api.example.com">>,
    stack    => [
        livery_client:timeout(5000),
        livery_client:retry(#{max => 3}),
        livery_client:circuit_breaker(#{name => api}),
        livery_client:concurrency(50)
    ]
}),
{ok, Resp} = livery_client:get(Client, <<"/users/42">>),
200 = livery_client:status(Resp).

The transport is a livery_client_adapter; the default, livery_client_hackney, speaks HTTP/1.1, HTTP/2, and HTTP/3.

Summary

Functions

Add an endpoint to a balance pool at runtime.

Spread requests across a pool of endpoints, with passive outlier ejection and lazy half-open recovery. Opts: name (required), endpoints (base URLs or a {Module, Arg} discovery pair), policy (p2c | round_robin), eject_after, eject_for, fail_status. With balance you pass paths; the chosen endpoint supplies the host.

Build a client. Opts: adapter (default livery_client_hackney), adapter_opts, base_url, headers (defaults applied to every request), stack (the layers).

Pull the next chunk of a {stream, Reader} response body.

Drain a {stream, Reader} response body to a single binary.

Point a request URL at a chosen endpoint. Strips any scheme+authority the URL already carries and joins the endpoint with the remaining path+query, so the balancer owns the host. Used by livery_client_balance.

Remove an endpoint from a balance pool at runtime.

Send a request. Opts: body (iodata | {full, _} | {stream, Fun}), headers, timeout, stream (true to receive a {stream, Reader} response body), meta.

Run a fully built request through the client's layer stack.

Cancel a push stream and release its connection.

Ask a flow => manual push stream for one more {chunk, _} message. Ref is the value from the {push, Ref} response body.

Types

client()

-opaque client()

endpoint()

-type endpoint() :: binary().

entry()

-type entry() :: {module(), term()} | fun((request(), next()) -> result()).

next()

-type next() :: fun((request()) -> result()).

request()

-type request() ::
          #{method := atom() | binary(),
            url := binary(),
            headers := [{binary(), binary()}],
            body := empty | {full, iodata()} | {stream, fun()},
            timeout := timeout(),
            stream := boolean(),
            stream_to := pid() | undefined,
            flow := auto | manual,
            meta := map()}.

response()

-type response() ::
          #{status := 100..599 | undefined,
            headers := [{binary(), binary()}],
            body := {full, binary()} | {stream, term()} | {push, stream_ref()}}.

result()

-type result() :: {ok, response()} | {error, term()}.

stack()

-type stack() :: [entry()].

stream_ref()

-opaque stream_ref()

Functions

add_endpoint(Name, Endpoint)

-spec add_endpoint(term(), endpoint()) -> ok.

Add an endpoint to a balance pool at runtime.

after_response(Fun)

-spec after_response(fun((response()) -> response())) -> entry().

balance(Opts)

-spec balance(map()) -> entry().

Spread requests across a pool of endpoints, with passive outlier ejection and lazy half-open recovery. Opts: name (required), endpoints (base URLs or a {Module, Arg} discovery pair), policy (p2c | round_robin), eject_after, eject_for, fail_status. With balance you pass paths; the chosen endpoint supplies the host.

before(Fun)

-spec before(fun((request()) -> request())) -> entry().

body/1

-spec body(response()) -> {full, binary()} | {stream, term()}.

circuit_breaker(Opts)

-spec circuit_breaker(map()) -> entry().

concurrency(Limit)

-spec concurrency(non_neg_integer()) -> entry().

delete(Client, Path)

-spec delete(client(), binary()) -> result().

get(Client, Path)

-spec get(client(), binary()) -> result().

header(Name, Map)

-spec header(binary(), request() | response()) -> binary() | undefined.

header/3

-spec header(binary(), request() | response(), Default) -> binary() | Default.

headers/1

-spec headers(request() | response()) -> [{binary(), binary()}].

method/1

-spec method(request()) -> atom() | binary().

new(Opts)

-spec new(map()) -> client().

Build a client. Opts: adapter (default livery_client_hackney), adapter_opts, base_url, headers (defaults applied to every request), stack (the layers).

post(Client, Path, Body)

-spec post(client(), binary(), iodata()) -> result().

put(Client, Path, Body)

-spec put(client(), binary(), iodata()) -> result().

read/2

-spec read(term(), timeout()) -> {ok, binary(), term()} | {done, term()} | {error, term()}.

Pull the next chunk of a {stream, Reader} response body.

read_body(Reader)

-spec read_body(term()) -> {ok, binary()} | {error, term()}.

Drain a {stream, Reader} response body to a single binary.

rebase(Endpoint, Url)

-spec rebase(endpoint(), binary()) -> binary().

Point a request URL at a chosen endpoint. Strips any scheme+authority the URL already carries and joins the endpoint with the remaining path+query, so the balancer owns the host. Used by livery_client_balance.

remove_endpoint(Name, Endpoint)

-spec remove_endpoint(term(), endpoint()) -> ok.

Remove an endpoint from a balance pool at runtime.

request(Client, Method, Path)

-spec request(client(), atom() | binary(), binary()) -> result().

request(Client, Method, Path, Opts)

-spec request(client(), atom() | binary(), binary(), map()) -> result().

Send a request. Opts: body (iodata | {full, _} | {stream, Fun}), headers, timeout, stream (true to receive a {stream, Reader} response body), meta.

Push streaming: set stream_to to a pid and the response is delivered to it as ordered messages, freeing the pid to selectively receive body chunks alongside its own control messages instead of dedicating a process to a pull loop. The reply is {ok, #{body := {push, Ref}}}; the recipient then receives

{livery_response, Ref, {status, Status, Headers}}
{livery_response, Ref, {chunk, Binary}}      %% zero or more
{livery_response, Ref, done}
{livery_response, Ref, {error, Reason}}       %% instead of done, on failure

flow (default auto) pushes chunks as fast as the wire allows; manual sends one chunk per stream_next/1 for backpressure. stop_stream/1 cancels the stream and drops its connection. Push streaming bypasses the layer stack; the adapter owns the connection.

retry(Opts)

-spec retry(map()) -> entry().

run(Client, Req)

-spec run(client(), request()) -> result().

Run a fully built request through the client's layer stack.

set_header/3

-spec set_header(binary(), binary(), request()) -> request().

status/1

-spec status(response()) -> 100..599.

stop_stream/1

-spec stop_stream(stream_ref()) -> ok.

Cancel a push stream and release its connection.

stream_next/1

-spec stream_next(stream_ref()) -> ok | {error, term()}.

Ask a flow => manual push stream for one more {chunk, _} message. Ref is the value from the {push, Ref} response body.

timeout(Ms)

-spec timeout(pos_integer()) -> entry().

url/1

-spec url(request()) -> binary().

wrap(Fun)

-spec wrap(fun((throw | error | exit, term(), list()) -> result())) -> entry().