macula_client (macula v4.2.6)
View Sourcemacula_client — the canonical pool client.
Holds N peering links to N stations and routes ops with replication, subscription replay, and inbound-event dedup. Apps don't manage individual macula_station_link workers; they call macula_client (or the macula facade, which re-exports the public surface).
Per PLAN_V2_PARITY Q2 §1: pool is the canonical client handle. A single-station link is an internal worker only.
Lifecycle
{ok, Pool} = macula_client:connect(Seeds, Opts).
ok = macula_client:publish(Pool, Realm, Topic, Payload, #{}).
{ok, Sub} = macula_client:subscribe(Pool, Realm, Topic, self(), #{}).
receive {macula_event, Sub, Topic, Payload, Meta} -> ... end.
ok = macula_client:unsubscribe(Pool, Sub).
ok = macula_client:close(Pool).
Replication
publish/5 fans the PUBLISH frame to replication_factor (default 1) currently-spawned links. **Partial success counts as success** per PLAN_V2_PARITY §5.1.1: the call returns ok as soon as one link accepts the frame; the others are best-effort. When zero links are spawned the call returns {error, {transient, no_healthy_station}}.
subscribe/5 applies to every spawned link. The pool delivers a deduped event stream to the consumer regardless of which link relayed any given EVENT.
Dedup
Inbound EVENT frames are keyed by (Realm, Publisher, Seq) in an ETS table owned by the pool. The table is swept every dedup_sweep_ms (default 30s) for entries older than dedup_window_ms (default 60s).
Replay
When a link's process dies the pool monitor fires; the pool schedules a respawn after ?LINK_RESPAWN_DELAY_MS (1s). On respawn, the pool re-issues every currently-tracked (Realm, Topic) subscription against the new link via the internal macula_client_replay helper.
Summary
Functions
Advertise a procedure handler on every healthy link. Stored in pool state so links respawned later replay the advertisement. Returns ok when at least one link accepted the registration.
Advertise a streaming procedure handler on every healthy link. Stored in pool state so links respawned later replay the advertisement. Returns ok when at least one link accepted the registration.
Issue a CALL frame against the pool. Tries each healthy link in turn and returns the first non-error reply. Returns {error, no_healthy_station} when no link has completed its CONNECT/HELLO handshake.
Open a streaming RPC against the pool. Picks the first currently-healthy link and opens the stream there; the returned stream pid is sticky — if the underlying link dies, the stream errors with {error, peer_down} and the caller must re-open.
OTP child spec — drop the pool into a caller's supervision tree. Id is the supervisor child id.
Stop the pool. Every subscriber receives a final {macula_event_gone, SubRef, pool_closed} message; every link terminates with the pool.
Spawn a pool with one link per seed. Returns immediately; link handshakes complete asynchronously. Publish/subscribe block until at least one link is connected (or fail with {error, {transient, no_healthy_station}} on the publish path).
Publish a frame to replication_factor currently-spawned links. Partial success = success. Realm is per-call (32 bytes).
Aggregate health snapshot of the pool. Single round-trip to the pool's gen_server plus one is_connected probe per spawned link (each capped at 1s). Suitable for /health or /status endpoints; not for hot-loop polling.
Subscribe Subscriber to (Realm, Topic). The pool subscribes every currently-spawned link and dedupes inbound events before fan-out. Returns {ok, SubRef}; Subscriber receives {macula_event, SubRef, Topic, Payload, Meta} for each delivered event and {macula_event_gone, SubRef, Reason} once when the pool closes or the subscriber pid dies.
Drop a previously-advertised procedure on every healthy link and remove it from the pool's replay state. Idempotent.
Drop a streaming procedure on every healthy link and remove it from the pool's replay state. Idempotent.
Drop a subscription. Idempotent — unknown SubRef is a no-op. The wire-level link subscription persists for the pool's lifetime (one wire sub per (Realm, Topic) multiplexed across local consumers); Phase 4 will tighten this.
Types
-type opts() :: #{identity => macula_identity:key_pair(), replication_factor => pos_integer(), capabilities => non_neg_integer(), alpn => [binary()], connect_timeout_ms => pos_integer(), dedup_window_ms => non_neg_integer(), dedup_sweep_ms => pos_integer()}.
-type pool() :: pid().
-type seed() :: binary() | string() | #{host := binary() | string(), port := inet:port_number()}.
-type status() :: #{seeds := [seed()], healthy_links := non_neg_integer(), failed_links := non_neg_integer(), self_node_id := macula_identity:pubkey(), subscriptions := non_neg_integer()}.
Functions
Advertise a procedure handler on every healthy link. Stored in pool state so links respawned later replay the advertisement. Returns ok when at least one link accepted the registration.
-spec advertise_stream(pool(), <<_:256>>, binary(), macula_frame:stream_mode(), stream_handler()) -> ok | {error, term()}.
Advertise a streaming procedure handler on every healthy link. Stored in pool state so links respawned later replay the advertisement. Returns ok when at least one link accepted the registration.
Issue a CALL frame against the pool. Tries each healthy link in turn and returns the first non-error reply. Returns {error, no_healthy_station} when no link has completed its CONNECT/HELLO handshake.
Realm is per-call (32 bytes). Different realms can share a single pool with no extra plumbing.
Open a streaming RPC against the pool. Picks the first currently-healthy link and opens the stream there; the returned stream pid is sticky — if the underlying link dies, the stream errors with {error, peer_down} and the caller must re-open.
Returns {error, no_healthy_station} when no link has completed its CONNECT/HELLO handshake. Realm (32 bytes) and Procedure name the remote endpoint. Args is the opening payload; Opts accepts mode (default server_stream), owner (default the calling pid), and deadline_ms.
-spec child_spec(term(), [seed()], opts()) -> supervisor:child_spec().
OTP child spec — drop the pool into a caller's supervision tree. Id is the supervisor child id.
-spec close(pool()) -> ok.
Stop the pool. Every subscriber receives a final {macula_event_gone, SubRef, pool_closed} message; every link terminates with the pool.
Spawn a pool with one link per seed. Returns immediately; link handshakes complete asynchronously. Publish/subscribe block until at least one link is connected (or fail with {error, {transient, no_healthy_station}} on the publish path).
Publish a frame to replication_factor currently-spawned links. Partial success = success. Realm is per-call (32 bytes).
Aggregate health snapshot of the pool. Single round-trip to the pool's gen_server plus one is_connected probe per spawned link (each capped at 1s). Suitable for /health or /status endpoints; not for hot-loop polling.
Counts:
healthy_links— links whose worker pid is alive and whose CONNECT/HELLO handshake has completed.failed_links— every other configured seed (link not yet spawned, dead, or still handshaking).
Subscribe Subscriber to (Realm, Topic). The pool subscribes every currently-spawned link and dedupes inbound events before fan-out. Returns {ok, SubRef}; Subscriber receives {macula_event, SubRef, Topic, Payload, Meta} for each delivered event and {macula_event_gone, SubRef, Reason} once when the pool closes or the subscriber pid dies.
Drop a previously-advertised procedure on every healthy link and remove it from the pool's replay state. Idempotent.
Drop a streaming procedure on every healthy link and remove it from the pool's replay state. Idempotent.
Drop a subscription. Idempotent — unknown SubRef is a no-op. The wire-level link subscription persists for the pool's lifetime (one wire sub per (Realm, Topic) multiplexed across local consumers); Phase 4 will tighten this.