macula (macula v4.2.1)

View Source

Macula SDK — Public API for mesh applications.

This is the main entry point for applications using the Macula SDK.

Apps connect via connect/2, which returns a macula_client pool that internally wraps N peering links to N stations. publish/4,5, subscribe/4,5, unsubscribe/2, call/5, advertise/5, unadvertise/3, call_stream/5, advertise_stream/5, and unadvertise_stream/3 route through the pool with realm-per-call semantics. See macula_pubsub for the slice module of the publish/subscribe surface.

LOCAL streaming (call_stream/2,3, open_stream/3,4, advertise_stream/2,3, unadvertise_stream/1) dispatches in-process via macula_stream_local — for unit tests and same-BEAM pairs.

Erlang distribution over the mesh ships via join_mesh/1 (V2 pool carrier) or join_dist_relay/1 (dedicated dist relay). See macula_dist_relay / macula_dist_system.

Summary

Functions

Abort the stream with an error frame.

Advertise a procedure handler on a V2 pool. Fans out to every healthy link and stores in pool state for replay on link respawn. See macula_client:advertise/4.

Advertise a LOCAL in-process streaming procedure (default: server_stream).

Advertise a LOCAL in-process streaming procedure with mode.

Advertise a streaming procedure on a V2 pool. Fans out to every healthy link and stores in pool state for replay on link respawn. See macula_client:advertise_stream/5.

Wait for the terminal reply (client-stream / bidi).

Issue a CALL frame against a V2 pool. First-success across the pool's healthy links. See macula_client:call/5.

Open a LOCAL in-process server-stream call. Used for unit tests and same-BEAM dispatch via macula_stream_local.

Open a LOCAL in-process server-stream call with options.

Open a streaming RPC against a V2 pool. Picks the first currently-healthy link and opens the stream there; the returned stream is sticky-to-link (errors with peer_down if the link dies; caller re-opens). See macula_client:call_stream/5.

OTP child spec to drop a V2 pool into a caller's supervision tree.

Stop a V2 pool. Every subscriber receives a final {macula_event_gone, SubRef, pool_closed} message.

Half-close the write side; recv still drains.

Close a V1 stream (both sides). Renamed from close/1 in 3.11.0 because close/1 now refers to the V2 pool surface.

Connect to the Macula relay mesh and return a pool handle.

Ensure this node is running in distributed mode.

Fetch a record from the mesh DHT by its macula_record:storage_key/1.

Return every record of a given type currently visible from the pool's connected stations.

Get the Erlang cluster cookie.

Enable Erlang distribution over a dedicated dist relay (macula-io/macula-dist-relay).

Join the Macula relay mesh with Erlang distribution.

Subscribe to node up/down events.

Open a LOCAL in-process client-stream or bidi call. Used for unit tests and same-BEAM dispatch via macula_stream_local.

Open a LOCAL in-process stream with explicit mode.

Publish to (Realm, Topic) on Pool. Equivalent to publish/5 with empty opts.

Publish to (Realm, Topic) on Pool with options. See macula_pubsub:publish/5 for honored opts.

Store a signed record in the mesh DHT via a V2 pool.

Receive the next chunk (blocks).

Send a binary chunk on the stream.

Send a chunk with explicit encoding.

Set the Erlang cluster cookie.

Server-side: emit the terminal reply value.

Aggregate health snapshot of a V2 pool. Suitable for /health or /status endpoints; not for hot-loop polling. See macula_client:status/1 for the full shape.

Subscribe Subscriber to (Realm, Topic) on Pool. Equivalent to subscribe/5 with empty opts.

Subscribe Subscriber to (Realm, Topic) on Pool with options. See macula_pubsub:subscribe/5.

Subscribe with a callback function. The SDK spawns a small receiver process internally and invokes the callback once per inbound event. See macula_pubsub:subscribe_callback/4.

Subscribe to live record-stored events filtered by type.

Stop advertising a procedure on a V2 pool.

Stop advertising a LOCAL streaming procedure.

Stop advertising a streaming procedure on a V2 pool.

Unsubscribe from node up/down events.

Drop a pool subscription. Idempotent.

Types

pool/0

-type pool() :: macula_client:pool().

procedure/0

-type procedure() :: binary().

realm/0

-type realm() :: <<_:256>>.

32-byte realm tag.

record/0

-type record() :: macula_record:record().

record_key/0

-type record_key() :: <<_:256>>.

DHT storage key — macula_record:storage_key/1 output.

record_type/0

-type record_type() :: macula_record:type_tag().

stream/0

-type stream() :: pid().

stream_handler/0

-type stream_handler() :: fun((stream(), term()) -> any()).

stream_mode/0

-type stream_mode() :: server_stream | client_stream | bidi.

topic/0

-type topic() :: binary().

Functions

abort(Stream, Code, Message)

-spec abort(stream(), binary(), binary()) -> ok.

Abort the stream with an error frame.

advertise(Pool, Realm, Procedure, Handler, Opts)

-spec advertise(pool(), realm(), procedure(), macula_client:handler(), map()) -> ok | {error, term()}.

Advertise a procedure handler on a V2 pool. Fans out to every healthy link and stores in pool state for replay on link respawn. See macula_client:advertise/4.

await_reply(Stream)

-spec await_reply(stream()) -> {ok, term()} | {error, term()}.

Wait for the terminal reply (client-stream / bidi).

await_reply(Stream, Timeout)

-spec await_reply(stream(), timeout()) -> {ok, term()} | {error, term()}.

call(Pool, Realm, Procedure, Payload, TimeoutMs)

-spec call(pool(), realm(), procedure(), term(), pos_integer()) -> {ok, term()} | {error, term()}.

Issue a CALL frame against a V2 pool. First-success across the pool's healthy links. See macula_client:call/5.

call_stream(Procedure, Args)

-spec call_stream(procedure(), term()) -> {ok, stream()} | {error, term()}.

Open a LOCAL in-process server-stream call. Used for unit tests and same-BEAM dispatch via macula_stream_local.

call_stream(Procedure, Args, Opts)

-spec call_stream(procedure(), term(), map()) -> {ok, stream()} | {error, term()}.

Open a LOCAL in-process server-stream call with options.

call_stream(Pool, Realm, Procedure, Args, Opts)

-spec call_stream(pool(), realm(), procedure(), term(), map()) -> {ok, stream()} | {error, term()}.

Open a streaming RPC against a V2 pool. Picks the first currently-healthy link and opens the stream there; the returned stream is sticky-to-link (errors with peer_down if the link dies; caller re-opens). See macula_client:call_stream/5.

child_spec(Id, Seeds, Opts)

OTP child spec to drop a V2 pool into a caller's supervision tree.

close(Pool)

-spec close(pool()) -> ok.

Stop a V2 pool. Every subscriber receives a final {macula_event_gone, SubRef, pool_closed} message.

close_send(Stream)

-spec close_send(stream()) -> ok.

Half-close the write side; recv still drains.

close_stream(Stream)

-spec close_stream(stream()) -> ok.

Close a V1 stream (both sides). Renamed from close/1 in 3.11.0 because close/1 now refers to the V2 pool surface.

connect(Seeds, Opts)

-spec connect([macula_client:seed()], macula_client:opts()) -> {ok, pool()} | {error, term()}.

Connect to the Macula relay mesh and return a pool handle.

Seeds is a list of relay endpoints (URL binaries/strings or #{host, port} maps). The pool spawns one peering link per seed and routes ops with replication, replay, and event dedup. Returns immediately; link handshakes complete asynchronously.

Honored opts (full reference: macula_client:opts()):

  • identity — pool's Ed25519 keypair; auto-generated if absent.
  • replication_factor — links per PUBLISH (default 1).
  • capabilities — per-link bitfield (default 0).
  • alpn — QUIC ALPN list (default [<<"macula">>]).
  • connect_timeout_ms — per-link CONNECT/HELLO deadline (default 30_000).
  • dedup_window_ms, dedup_sweep_ms — inbound-EVENT dedup tunables.

Legacy opts silently dropped (with a one-shot logger:notice): relays (use the Seeds positional argument), realm (V2 is realm-per-call), site (no V2 analog), connections (one link per seed; add more seeds to grow the pool).

See macula_client for the canonical pool implementation and macula_pubsub for the slice module.

ensure_distributed()

-spec ensure_distributed() -> ok | {error, term()}.

Ensure this node is running in distributed mode.

find_record(Pool, Key)

-spec find_record(pool(), record_key()) -> {ok, record()} | {error, not_found | term()}.

Fetch a record from the mesh DHT by its macula_record:storage_key/1.

Returns {error, not_found} when no record exists at the key. The returned record's signature should be verified via macula_record:verify/1 before its payload is trusted.

find_records_by_type(Pool, Type)

-spec find_records_by_type(pool(), record_type()) -> {ok, [record()]} | {error, term()}.

Return every record of a given type currently visible from the pool's connected stations.

Coverage depends on each station's view of the DHT — a single station sees its local replicas plus whatever its peers have gossiped. Aggregating across the full mesh requires querying multiple stations and deduplicating by record key.

get_cookie()

-spec get_cookie() -> atom().

Get the Erlang cluster cookie.

join_dist_relay(Opts)

-spec join_dist_relay(map()) -> ok | {error, term()}.

Enable Erlang distribution over a dedicated dist relay (macula-io/macula-dist-relay).

Different from join_mesh/1: - Connects to a dist relay (port 4434, ALPN macula-dist), NOT the pub/sub station mesh - No mesh_client, no pub/sub subscriptions — only dist traffic - Uses raw QUIC stream routing with no MessagePack overhead

Options: - url (required): &lt;&lt;"quic://relay.example.com:4434"&gt;&gt;

After this returns ok, standard OTP distribution (rpc:call/4, gen_server:call/3 across nodes, pg groups, etc.) works across firewalls via the dist relay.

join_mesh(Opts)

-spec join_mesh(map()) -> ok | {error, term()}.

Join the Macula relay mesh with Erlang distribution.

After calling this, standard OTP distribution works across firewalls. Opts takes:

  • relays (required) — list of seed URLs for the V2 pool.
  • identity — V2 pool's macula_identity:key_pair(). Default: auto-generated.

Internally builds a V2 macula_client:pool() and registers it with macula_dist_relay as the carrier for _dist.tunnel.* traffic. Dist tunnel frames travel under the all-zeros realm (protocol-internal infrastructure, not bound to any user realm).

monitor_nodes()

-spec monitor_nodes() -> ok.

Subscribe to node up/down events.

open_stream(Procedure, Args, Opts)

-spec open_stream(procedure(), term(), map()) -> {ok, stream()} | {error, term()}.

Open a LOCAL in-process client-stream or bidi call. Used for unit tests and same-BEAM dispatch via macula_stream_local.

open_stream(Procedure, Args, Opts, Mode)

-spec open_stream(procedure(), term(), map(), stream_mode()) -> {ok, stream()} | {error, term()}.

Open a LOCAL in-process stream with explicit mode.

publish(Pool, Realm, Topic, Payload)

-spec publish(pool(), realm(), topic(), term()) -> ok | {error, term()}.

Publish to (Realm, Topic) on Pool. Equivalent to publish/5 with empty opts.

publish(Pool, Realm, Topic, Payload, Opts)

-spec publish(pool(), realm(), topic(), term(), map()) -> ok | {error, term()}.

Publish to (Realm, Topic) on Pool with options. See macula_pubsub:publish/5 for honored opts.

put_record(Pool, Record)

-spec put_record(pool(), record()) -> ok | {error, term()}.

Store a signed record in the mesh DHT via a V2 pool.

Build the record via the typed constructors in macula_record (node_record/3,4, content_announcement/3,4, tombstone/3,4, realm_directory/3,4, procedure_advertisement/3,4, etc.) then sign it with macula_record:sign/2. The relay validates the signature on receipt; an invalid signature returns {error, bad_signature}. Successful stores propagate to the K-nearest peers in the DHT under the record's macula_record:storage_key/1.

recv(Stream)

-spec recv(stream()) -> {chunk, binary()} | {data, term()} | eof | {error, term()}.

Receive the next chunk (blocks).

recv(Stream, Timeout)

-spec recv(stream(), timeout()) -> {chunk, binary()} | {data, term()} | eof | {error, term()}.

send(Stream, Bin)

-spec send(stream(), binary()) -> ok | {error, term()}.

Send a binary chunk on the stream.

send(Stream, Body, Encoding)

-spec send(stream(), binary() | term(), raw | msgpack) -> ok | {error, term()}.

Send a chunk with explicit encoding.

set_cookie(Cookie)

-spec set_cookie(atom() | binary()) -> ok.

Set the Erlang cluster cookie.

set_reply(Stream, Result)

-spec set_reply(stream(), term()) -> ok.

Server-side: emit the terminal reply value.

status(Pool)

-spec status(pool()) -> {ok, macula_client:status()}.

Aggregate health snapshot of a V2 pool. Suitable for /health or /status endpoints; not for hot-loop polling. See macula_client:status/1 for the full shape.

subscribe(Pool, Realm, Topic, Subscriber)

-spec subscribe(pool(), realm(), topic(), pid()) -> {ok, reference()}.

Subscribe Subscriber to (Realm, Topic) on Pool. Equivalent to subscribe/5 with empty opts.

subscribe(Pool, Realm, Topic, Subscriber, Opts)

-spec subscribe(pool(), realm(), topic(), pid(), map()) -> {ok, reference()}.

Subscribe Subscriber to (Realm, Topic) on Pool with options. See macula_pubsub:subscribe/5.

subscribe_callback(Pool, Realm, Topic, Callback)

-spec subscribe_callback(pool(), realm(), topic(), macula_pubsub:callback()) ->
                            {ok, reference()} | {error, term()}.

Subscribe with a callback function. The SDK spawns a small receiver process internally and invokes the callback once per inbound event. See macula_pubsub:subscribe_callback/4.

subscribe_records(Pool, Type, Callback)

-spec subscribe_records(pool(), record_type(), fun((record()) -> any())) ->
                           {ok, reference()} | {error, term()}.

Subscribe to live record-stored events filtered by type.

The callback receives each newly-stored record of the given type. Returns a subscription reference for unsubscribe_records/2. Topic shape is _dht.records.<type>.stored, rendered with the type tag as a decimal integer for log friendliness.

unadvertise(Pool, Realm, Procedure)

-spec unadvertise(pool(), realm(), procedure()) -> ok.

Stop advertising a procedure on a V2 pool.

unadvertise_stream(Procedure)

-spec unadvertise_stream(procedure()) -> ok.

Stop advertising a LOCAL streaming procedure.

unadvertise_stream(Pool, Realm, Procedure)

-spec unadvertise_stream(pool(), realm(), procedure()) -> ok.

Stop advertising a streaming procedure on a V2 pool.

unmonitor_nodes()

-spec unmonitor_nodes() -> ok.

Unsubscribe from node up/down events.

unsubscribe(Pool, SubRef)

-spec unsubscribe(pool(), reference()) -> ok.

Drop a pool subscription. Idempotent.

unsubscribe_records(Pool, Ref)

-spec unsubscribe_records(pool(), reference()) -> ok.

Cancel a subscribe_records/3 subscription.