macula_pubsub (macula v4.1.0)

View Source

Pubsub surface for the V2 SDK.

Thin delegation over macula_client (the pool). The pool owns the link state machine, replication, replay, and dedup; this module is the named public entry point that consumers reach for (or, more often, the macula facade re-exports of the same functions).

Realm-per-call

Per PLAN_V2_PARITY Q2 §2: every call carries its own 32-byte realm tag. There is no connect-time default realm. A single pool can multiplex any number of realms with no extra plumbing.

Quick start

  {ok, Pool} = macula:connect(Seeds, ConnectOpts),
  ok          = macula_pubsub:publish(Pool, Realm, Topic, Payload),
  {ok, Sub}   = macula_pubsub:subscribe(Pool, Realm, Topic, self()),
  receive
      {macula_event, Sub, Topic, Payload, Meta} -> ok
  end,
  ok          = macula_pubsub:unsubscribe(Pool, Sub).

See docs/guides/PUBSUB_GUIDE.md for a full guide and docs/migrations/V1_TO_V2_PUBSUB.md for the breaking changes from the pre-3.11.0 facade.

Summary

Functions

Publish to (Realm, Topic) on Pool. Equivalent to publish/5 with empty opts.

Publish to (Realm, Topic) on Pool.

Subscribe Subscriber to (Realm, Topic) via Pool. Equivalent to subscribe/5 with empty opts.

Subscribe Subscriber to (Realm, Topic) via Pool.

Subscribe with a callback function instead of a receiver pid. Spawns a small receiver process internally that drives the callback for every inbound event. The receiver monitors the caller; if the caller dies, the receiver follows and the subscription is cleaned up by the pool's standard subscriber-DOWN path.

Drop a subscription. Idempotent — unknown SubRef is a no-op.

Types

callback/0

-type callback() :: fun((Topic :: binary(), Payload :: term(), Meta :: map()) -> any()).

Functions

publish(Pool, Realm, Topic, Payload)

-spec publish(macula_client:pool(), <<_:256>>, binary(), term()) -> ok | {error, term()}.

Publish to (Realm, Topic) on Pool. Equivalent to publish/5 with empty opts.

publish(Pool, Realm, Topic, Payload, Opts)

-spec publish(macula_client:pool(), <<_:256>>, binary(), term(), map()) -> ok | {error, term()}.

Publish to (Realm, Topic) on Pool.

Opts currently honored:

  • timeout_ms — gen_server call timeout (default 5_000). Most apps leave this as default.

Returns ok as soon as one configured station accepts the PUBLISH frame (partial success = success, per PLAN_V2_PARITY §5.1.1). Returns {error, {transient, no_healthy_station}} when the pool has no spawned links; the caller may retry.

subscribe(Pool, Realm, Topic, Subscriber)

-spec subscribe(macula_client:pool(), <<_:256>>, binary(), pid()) -> {ok, reference()}.

Subscribe Subscriber to (Realm, Topic) via Pool. Equivalent to subscribe/5 with empty opts.

subscribe(Pool, Realm, Topic, Subscriber, Opts)

-spec subscribe(macula_client:pool(), <<_:256>>, binary(), pid(), map()) -> {ok, reference()}.

Subscribe Subscriber to (Realm, Topic) via Pool.

Returns {ok, SubRef}. Subscriber subsequently receives {macula_event, SubRef, Topic, Payload, Meta} for each delivered event; Meta is a map carrying realm, publisher, seq, and delivered_via. Stores receive {macula_event_gone, SubRef, Reason} once when the subscription terminates (pool close, subscriber pid death).

Opts is a forward-compatible map; Phase 1 honors no subscribe-time options. Future phases (history replay, server- side filters) will add named keys.

subscribe_callback(Pool, Realm, Topic, Callback)

-spec subscribe_callback(macula_client:pool(), <<_:256>>, binary(), callback()) ->
                            {ok, reference()} | {error, term()}.

Subscribe with a callback function instead of a receiver pid. Spawns a small receiver process internally that drives the callback for every inbound event. The receiver monitors the caller; if the caller dies, the receiver follows and the subscription is cleaned up by the pool's standard subscriber-DOWN path.

A crashing callback does NOT kill the receiver — the exception is logged and the next event is delivered. This is intentional: a transient bug in event handler N should not lose events N+1..M.

Caller cleanup: invoke unsubscribe(Pool, SubRef) with the returned ref. The receiver shuts down on the resulting macula_event_gone message.

unsubscribe(Pool, SubRef)

-spec unsubscribe(macula_client:pool(), reference()) -> ok.

Drop a subscription. Idempotent — unknown SubRef is a no-op.