macula_station_client (macula v3.10.2)

View Source

Station-client — outbound RPC + pubsub over macula_peering.

A macula_station_client is a gen_server that owns one macula_peering connection to a single station endpoint. It drives the CONNECT/HELLO handshake as the client side, then exposes two surfaces over the same peering pipe:

  • Request/responsecall/4 sends a CALL frame and matches inbound RESULT/ERROR frames against pending callers using the 16-byte CALL id. Convenience wrappers cover _dht.put_record, _dht.find_record, and _dht.find_records_by_type.
  • Streaming subscribesubscribe/3 sends a SUBSCRIBE frame and registers a delivery pid. Inbound EVENT frames matching the topic fan out to subscribers as {macula_event, SubRef, Topic, Payload, Meta}. On disconnect each subscriber receives a single {macula_event_gone, SubRef, Reason}.

Lifecycle

  1. start_link/1 — spawn worker, schedule connect.
  2. connect_now/1 (cast) — build connect opts, call macula_peering:connect/1, store the worker pid.
  3. Peering handshake completes → {macula_peering, connected, Pid, PeerNodeId} arrives → state moves to connected.
  4. call/4 from caller → build CALL frame, sign happens inside peering, store {from, deadline_timer}` keyed by CALL id, send frame via `macula_peering:send_frame/2.
  5. RESULT or ERROR arrives as {macula_peering, frame, Pid, Frame} → look up call_id, cancel timer, reply to caller.
  6. {macula_peering, disconnected, Pid, Reason} → fail all pending calls with {error, {disconnected, Reason}}, notify all subscribers via macula_event_gone, stop the client (caller is responsible for restart / reconnect).

Call reply taxonomy

<table><tr><th>Inbound frame</th><th>call/4 returns</th></tr><tr><td>RESULT(payload={error, Reason})</td><td>{ok, {error, Reason}}</td></tr><tr><td>RESULT(payload=Value)</td><td>{ok, Value}</td></tr><tr><td>ERROR(code=C, name=N)</td><td>{error, {call_error, C, N}}</td></tr><tr><td>(deadline elapses)</td><td>{error, timeout}</td></tr><tr><td>(connection drops)</td><td>{error, {disconnected, Reason}}</td></tr></table>

Realm field

Outbound CALL and SUBSCRIBE frames carry a 32-byte realm id. Stations deployed today advertise an empty realms list (realm-agnostic infrastructure) and do not enforce a realm match on inbound frames — the dispatch path verifies the signature and looks up the procedure / topic, nothing more. Callers therefore pass any 32-byte value; this module defaults to all-zeros when no realm is configured.

Summary

Functions

Issue a CALL frame and block until the station replies, the deadline elapses, or the connection drops.

Convenience wrapper for _dht.find_record. Looks up a record by its macula_record:storage_key/1 (32-byte BLAKE3 digest). Returns {error, not_found} when no record exists at the key. Callers SHOULD verify the returned record's signature with macula_record:verify/1 before trusting its payload.

Convenience wrapper for _dht.find_records_by_type. Returns the decoded list of signed records (CBOR-decoded maps as produced by macula_record).

Convenience wrapper for _dht.put_record. The record must be a fully-signed macula_record:record() map (build via macula_record:envelope/3,4 + macula_record:sign/2). Returns ok on success, {error, Reason} on RPC failure or unexpected reply.

Start a station-client connected to seed. Returns once the gen_server is alive; the QUIC handshake completes asynchronously. Use is_connected/1 to poll readiness or just issue call/4 (which blocks the caller until ready or until its timeout elapses).

Subscribe to a peering pubsub topic. Sends a SUBSCRIBE frame to the connected station and registers Subscriber as the delivery pid for inbound EVENT frames matching Topic.

Drop a subscription. Sends a best-effort UNSUBSCRIBE frame to the station and clears local bookkeeping. Always returns ok, even when SubRef is unknown — unsubscribe is idempotent.

Types

opts/0

-type opts() ::
          #{seed := url() | #{host := binary() | string(), port := inet:port_number()},
            identity => macula_identity:key_pair(),
            realm => <<_:256>>,
            capabilities => non_neg_integer(),
            alpn => [binary()],
            connect_timeout_ms => non_neg_integer()}.

url/0

-type url() :: binary() | string().

Functions

call(Pid, Procedure, Payload, TimeoutMs)

-spec call(pid(), binary(), term(), pos_integer()) -> {ok, term()} | {error, term()}.

Issue a CALL frame and block until the station replies, the deadline elapses, or the connection drops.

Procedure is the V2 procedure name, e.g. <<"_dht.find_records_by_type">>. Payload is any term that macula_frame:call/1 accepts (typically a map).

code_change(OldVsn, S, Extra)

find_record(Pid, Key)

-spec find_record(pid(), <<_:256>>) -> {ok, map()} | {error, not_found | term()}.

Convenience wrapper for _dht.find_record. Looks up a record by its macula_record:storage_key/1 (32-byte BLAKE3 digest). Returns {error, not_found} when no record exists at the key. Callers SHOULD verify the returned record's signature with macula_record:verify/1 before trusting its payload.

find_record(Pid, Key, TimeoutMs)

-spec find_record(pid(), <<_:256>>, pos_integer()) -> {ok, map()} | {error, not_found | term()}.

find_records_by_type(Pid, Type)

-spec find_records_by_type(pid(), 0..255) -> {ok, [map()]} | {error, term()}.

Convenience wrapper for _dht.find_records_by_type. Returns the decoded list of signed records (CBOR-decoded maps as produced by macula_record).

find_records_by_type(Pid, Type, TimeoutMs)

-spec find_records_by_type(pid(), 0..255, pos_integer()) -> {ok, [map()]} | {error, term()}.

handle_call(Req, From, State)

handle_cast(Msg, S)

handle_info(Other, State)

init(Opts)

is_connected(Pid)

-spec is_connected(pid()) -> boolean().

peer_node_id(Pid)

-spec peer_node_id(pid()) -> {ok, macula_identity:pubkey()} | {error, not_connected}.

put_record(Pid, Record)

-spec put_record(pid(), map()) -> ok | {error, term()}.

Convenience wrapper for _dht.put_record. The record must be a fully-signed macula_record:record() map (build via macula_record:envelope/3,4 + macula_record:sign/2). Returns ok on success, {error, Reason} on RPC failure or unexpected reply.

Stations replicate the put across the K-nearest peers in their Kademlia routing table, so a single put_record/2 call against any one connected station propagates to the rest of the DHT.

put_record(Pid, Record, TimeoutMs)

-spec put_record(pid(), map(), pos_integer()) -> ok | {error, term()}.

start_link(Opts)

-spec start_link(opts()) -> {ok, pid()} | {error, term()}.

Start a station-client connected to seed. Returns once the gen_server is alive; the QUIC handshake completes asynchronously. Use is_connected/1 to poll readiness or just issue call/4 (which blocks the caller until ready or until its timeout elapses).

stop(Pid)

-spec stop(pid()) -> ok.

subscribe(Client, Topic, Subscriber)

-spec subscribe(pid(), binary(), pid()) -> {ok, reference()} | {error, not_connected}.

Subscribe to a peering pubsub topic. Sends a SUBSCRIBE frame to the connected station and registers Subscriber as the delivery pid for inbound EVENT frames matching Topic.

Returns {ok, SubRef} once the SUBSCRIBE frame is sent. Stations do not acknowledge SUBSCRIBE — the contract is best-effort, mirroring the existing peering pubsub semantics.

Subscriber receives one of:

  • {macula_event, SubRef, Topic, Payload, Meta} — every time an EVENT frame arrives for Topic. Meta is a map with publisher, seq, and delivered_via fields.
  • {macula_event_gone, SubRef, Reason} — once, when the connection drops or the client stops. The subscription map is cleared on the same transition.

The client monitors Subscriber; if it dies the subscription is torn down (best-effort UNSUBSCRIBE on the wire).

terminate(Reason, State)

unsubscribe(Client, SubRef)

-spec unsubscribe(pid(), reference()) -> ok.

Drop a subscription. Sends a best-effort UNSUBSCRIBE frame to the station and clears local bookkeeping. Always returns ok, even when SubRef is unknown — unsubscribe is idempotent.