macula_stream (macula v4.2.6)

View Source

Macula streaming RPC — single-stream state machine.

Owns one streaming RPC's state. Each call_stream, open_stream, or server-side handler invocation gets its own macula_stream gen_server. The state machine itself is carrier-agnostic; the peer shape ({local, _} or {remote_via_link, _, _}) decides how chunks reach the wire.

Two carriers route through forward_to_peer/2:

  • {local, Pid} — in-process pairing for unit tests and macula_stream_local dispatch.
  • {remote_via_link, Link, Sid} — V2 wire format via macula_station_link (CBOR macula_frame:stream_* frames over a peering connection).

Renamed from macula_stream_v1 in 3.17.0; the V1 mesh_client carrier ({remote, _, _}) was retired alongside the rest of the V1 surface in the same release. The module now spans the LOCAL carrier and the V2 station_link carrier only.

Summary

Functions

Abort the stream with a STREAM_ERROR frame. Both sides close; any pending recv/await_reply waiters receive {error, {Code, Message}}.

Attach a V2 macula_station_link peer to this stream. The station_link carries deliveries as V2 macula_frame:stream_* frames over its peering connection (one per pool seed); inbound STREAM_* frames are decoded by the link and forwarded into this stream via the deliver_chunk / end / error / reply casts below.

Wait for the terminal reply (client-stream / bidi).

Close both sides. Idempotent.

Half-close the write side. Recv side stays open.

Deliver a chunk frame from the peer.

Deliver a STREAM_END frame from the peer.

Deliver a STREAM_ERROR frame from the peer.

Deliver a STREAM_REPLY frame from the peer.

Inspect stream state (debugging).

Pair two stream processes as peers (Phase 1 local dispatch).

Receive the next chunk (blocks indefinitely).

Send a binary chunk on the stream.

Server-side: emit a terminal error as the reply value.

Server-side: emit the terminal reply.

Start a stream gen_server.

Types

chunk/0

-type chunk() :: binary() | {raw, binary()} | {term, term()}.

encoding/0

-type encoding() :: raw | msgpack.

mode/0

-type mode() :: server_stream | client_stream | bidi.

peer/0

-type peer() :: undefined | {local, pid()} | {remote_via_link, pid(), stream_id()}.

result/0

-type result() :: {ok, term()} | {error, term()}.

role/0

-type role() :: client | server.

stream_id/0

-type stream_id() :: binary().

Functions

abort(Pid, Code, Message)

-spec abort(pid(), binary(), binary()) -> ok.

Abort the stream with a STREAM_ERROR frame. Both sides close; any pending recv/await_reply waiters receive {error, {Code, Message}}.

attach_to_link(StreamPid, LinkPid, StreamId)

-spec attach_to_link(pid(), pid(), stream_id()) -> ok.

Attach a V2 macula_station_link peer to this stream. The station_link carries deliveries as V2 macula_frame:stream_* frames over its peering connection (one per pool seed); inbound STREAM_* frames are decoded by the link and forwarded into this stream via the deliver_chunk / end / error / reply casts below.

await_reply(Pid)

-spec await_reply(pid()) -> result().

Wait for the terminal reply (client-stream / bidi).

await_reply(Pid, Timeout)

-spec await_reply(pid(), timeout()) -> result() | {error, timeout}.

close(Pid)

-spec close(pid()) -> ok.

Close both sides. Idempotent.

close_send(Pid)

-spec close_send(pid()) -> ok.

Half-close the write side. Recv side stays open.

deliver_chunk(Pid, Encoding, Body)

-spec deliver_chunk(pid(), encoding(), term()) -> ok.

Deliver a chunk frame from the peer.

deliver_end(Pid, Role)

-spec deliver_end(pid(), send | both) -> ok.

Deliver a STREAM_END frame from the peer.

deliver_error(Pid, Code, Message)

-spec deliver_error(pid(), binary(), binary()) -> ok.

Deliver a STREAM_ERROR frame from the peer.

deliver_reply(Pid, Result)

-spec deliver_reply(pid(), result()) -> ok.

Deliver a STREAM_REPLY frame from the peer.

handle_call(Msg, From, State)

handle_cast(Msg, State)

handle_info(Msg, State)

info(Pid)

-spec info(pid()) -> map().

Inspect stream state (debugging).

init(Opts)

pair(A, B)

-spec pair(pid(), pid()) -> ok.

Pair two stream processes as peers (Phase 1 local dispatch).

recv(Pid)

-spec recv(pid()) -> {chunk, binary()} | {data, term()} | eof | {error, term()}.

Receive the next chunk (blocks indefinitely).

recv(Pid, Timeout)

-spec recv(pid(), timeout()) -> {chunk, binary()} | {data, term()} | eof | {error, term()}.

send(Pid, Bin)

-spec send(pid(), binary()) -> ok | {error, term()}.

Send a binary chunk on the stream.

send(Pid, Body, _)

-spec send(pid(), binary() | term(), encoding()) -> ok | {error, term()}.

set_error(Pid, Reason)

-spec set_error(pid(), term()) -> ok.

Server-side: emit a terminal error as the reply value.

set_reply(Pid, Result)

-spec set_reply(pid(), term()) -> ok.

Server-side: emit the terminal reply.

start_link(Opts)

-spec start_link(map()) -> {ok, pid()} | {error, term()}.

Start a stream gen_server.

Required opts: id, role, mode, owner.

terminate(Reason, State)