Sagents.Publisher.State (Sagents v0.8.0-rc.2)
Copy MarkdownSubscriber bookkeeping for a Sagents.Publisher producer.
Tracks subscribers across one or more channels and maintains a reverse
index from monitor ref → {pid, channel} so {:DOWN, ref, ...} messages
can be cleaned up in O(1) without iterating channels.
Subscriber lookup by pid within a channel is also O(1), so duplicate subscribe calls dedupe trivially (returning the existing monitor ref rather than starting a second monitor).
Summary
Functions
Add a subscriber pid to a channel.
Total number of subscriptions across all channels (useful for tests/metrics).
Build a fresh publisher state with the given channel atoms pre-initialized to empty subscriber maps.
Remove a subscriber pid from a channel.
Remove a subscriber by monitor ref (for :DOWN cleanup).
Bulk-seed subscribers from a list of {channel, pid} tuples.
Whether a pid is subscribed to a channel.
List subscriber pids for a channel.
Types
Functions
Add a subscriber pid to a channel.
Idempotent — if the pid is already subscribed to the channel, the existing monitor ref is returned and no new monitor is set up.
Returns {ref, new_state}.
@spec count(t()) :: non_neg_integer()
Total number of subscriptions across all channels (useful for tests/metrics).
Build a fresh publisher state with the given channel atoms pre-initialized to empty subscriber maps.
Channels are auto-created on first subscribe, but pre-declaring them makes the supported set explicit at the host module level.
Remove a subscriber pid from a channel.
Demonitors the existing monitor (with :flush to drop any in-flight DOWN).
No-op if the pid is not subscribed.
Remove a subscriber by monitor ref (for :DOWN cleanup).
Returns {:ok, new_state} if the ref was tracked, :error otherwise.
Bulk-seed subscribers from a list of {channel, pid} tuples.
Useful in producer init/1 to enroll subscribers passed as
:initial_subscribers before the GenServer starts handling messages.
This closes the race between "agent started" and "subscriber missed
initial events" (e.g., the :status_changed and :node_transferred
broadcasts emitted from handle_continue/2).
Each entry establishes a monitor exactly as add/3 would. Duplicates
within the list are deduped per-channel — the existing monitor is
reused for the second occurrence.
Returns the updated state.
Whether a pid is subscribed to a channel.
List subscriber pids for a channel.