macula_station_client (macula v3.10.3)
View SourceStation-client — outbound RPC + pubsub over macula_peering.
A macula_station_client is a gen_server that owns one macula_peering connection to a single station endpoint. It drives the CONNECT/HELLO handshake as the client side, then exposes two surfaces over the same peering pipe:
- Request/response —
call/4sends a CALL frame and matches inbound RESULT/ERROR frames against pending callers using the 16-byte CALL id. Convenience wrappers cover_dht.put_record,_dht.find_record, and_dht.find_records_by_type. - Streaming subscribe —
subscribe/3sends a SUBSCRIBE frame and registers a delivery pid. Inbound EVENT frames matching the topic fan out to subscribers as{macula_event, SubRef, Topic, Payload, Meta}. On disconnect each subscriber receives a single{macula_event_gone, SubRef, Reason}.
Lifecycle
start_link/1— spawn worker, schedule connect.connect_now/1(cast) — build connect opts, callmacula_peering:connect/1, store the worker pid.- Peering handshake completes →
{macula_peering, connected, Pid, PeerNodeId}arrives → state moves toconnected. call/4from caller → build CALL frame, sign happens inside peering, store{from, deadline_timer}` keyed by CALL id, send frame via `macula_peering:send_frame/2.- RESULT or ERROR arrives as
{macula_peering, frame, Pid, Frame}→ look upcall_id, cancel timer, reply to caller. {macula_peering, disconnected, Pid, Reason}→ fail all pending calls with{error, {disconnected, Reason}}, notify all subscribers viamacula_event_gone, stop the client (caller is responsible for restart / reconnect).
Call reply taxonomy
<table><tr><th>Inbound frame</th><th>call/4 returns</th></tr><tr><td>RESULT(payload={error, Reason})</td><td>{ok, {error, Reason}}</td></tr><tr><td>RESULT(payload=Value)</td><td>{ok, Value}</td></tr><tr><td>ERROR(code=C, name=N)</td><td>{error, {call_error, C, N}}</td></tr><tr><td>(deadline elapses)</td><td>{error, timeout}</td></tr><tr><td>(connection drops)</td><td>{error, {disconnected, Reason}}</td></tr></table>
Realm field
Outbound CALL and SUBSCRIBE frames carry a 32-byte realm id. Stations deployed today advertise an empty realms list (realm-agnostic infrastructure) and do not enforce a realm match on inbound frames — the dispatch path verifies the signature and looks up the procedure / topic, nothing more. Callers therefore pass any 32-byte value; this module defaults to all-zeros when no realm is configured.
Summary
Functions
Issue a CALL frame and block until the station replies, the deadline elapses, or the connection drops.
Convenience wrapper for _dht.find_record. Looks up a record by its macula_record:storage_key/1 (32-byte BLAKE3 digest). Returns {error, not_found} when no record exists at the key. Callers SHOULD verify the returned record's signature with macula_record:verify/1 before trusting its payload.
Convenience wrapper for _dht.find_records_by_type. Returns the decoded list of signed records (CBOR-decoded maps as produced by macula_record).
Convenience wrapper for _dht.put_record. The record must be a fully-signed macula_record:record() map (build via macula_record:envelope/3,4 + macula_record:sign/2). Returns ok on success, {error, Reason} on RPC failure or unexpected reply.
Start a station-client connected to seed. Returns once the gen_server is alive; the QUIC handshake completes asynchronously. Use is_connected/1 to poll readiness or just issue call/4 (which blocks the caller until ready or until its timeout elapses).
Subscribe to a peering pubsub topic. Sends a SUBSCRIBE frame to the connected station and registers Subscriber as the delivery pid for inbound EVENT frames matching Topic.
Drop a subscription. Sends a best-effort UNSUBSCRIBE frame to the station and clears local bookkeeping. Always returns ok, even when SubRef is unknown — unsubscribe is idempotent.
Types
-type opts() :: #{seed := url() | #{host := binary() | string(), port := inet:port_number()}, identity => macula_identity:key_pair(), realm => <<_:256>>, capabilities => non_neg_integer(), alpn => [binary()], connect_timeout_ms => non_neg_integer()}.
Functions
Issue a CALL frame and block until the station replies, the deadline elapses, or the connection drops.
Procedure is the V2 procedure name, e.g. <<"_dht.find_records_by_type">>. Payload is any term that macula_frame:call/1 accepts (typically a map).
Convenience wrapper for _dht.find_record. Looks up a record by its macula_record:storage_key/1 (32-byte BLAKE3 digest). Returns {error, not_found} when no record exists at the key. Callers SHOULD verify the returned record's signature with macula_record:verify/1 before trusting its payload.
-spec find_record(pid(), <<_:256>>, pos_integer()) -> {ok, map()} | {error, not_found | term()}.
Convenience wrapper for _dht.find_records_by_type. Returns the decoded list of signed records (CBOR-decoded maps as produced by macula_record).
-spec find_records_by_type(pid(), 0..255, pos_integer()) -> {ok, [map()]} | {error, term()}.
-spec peer_node_id(pid()) -> {ok, macula_identity:pubkey()} | {error, not_connected}.
Convenience wrapper for _dht.put_record. The record must be a fully-signed macula_record:record() map (build via macula_record:envelope/3,4 + macula_record:sign/2). Returns ok on success, {error, Reason} on RPC failure or unexpected reply.
Stations replicate the put across the K-nearest peers in their Kademlia routing table, so a single put_record/2 call against any one connected station propagates to the rest of the DHT.
-spec put_record(pid(), map(), pos_integer()) -> ok | {error, term()}.
Start a station-client connected to seed. Returns once the gen_server is alive; the QUIC handshake completes asynchronously. Use is_connected/1 to poll readiness or just issue call/4 (which blocks the caller until ready or until its timeout elapses).
-spec stop(pid()) -> ok.
Subscribe to a peering pubsub topic. Sends a SUBSCRIBE frame to the connected station and registers Subscriber as the delivery pid for inbound EVENT frames matching Topic.
Returns {ok, SubRef} once the SUBSCRIBE frame is sent. Stations do not acknowledge SUBSCRIBE — the contract is best-effort, mirroring the existing peering pubsub semantics.
Subscriber receives one of:
{macula_event, SubRef, Topic, Payload, Meta}— every time an EVENT frame arrives forTopic.Metais a map withpublisher,seq, anddelivered_viafields.{macula_event_gone, SubRef, Reason}— once, when the connection drops or the client stops. The subscription map is cleared on the same transition.
The client monitors Subscriber; if it dies the subscription is torn down (best-effort UNSUBSCRIBE on the wire).
Drop a subscription. Sends a best-effort UNSUBSCRIBE frame to the station and clears local bookkeeping. Always returns ok, even when SubRef is unknown — unsubscribe is idempotent.