Sagents.Publisher.State (Sagents v0.8.0-rc.1)

Copy Markdown

Subscriber bookkeeping for a Sagents.Publisher producer.

Tracks subscribers across one or more channels and maintains a reverse index from monitor ref → {pid, channel} so {:DOWN, ref, ...} messages can be cleaned up in O(1) without iterating channels.

Subscriber lookup by pid within a channel is also O(1), so duplicate subscribe calls dedupe trivially (returning the existing monitor ref rather than starting a second monitor).

Summary

Types

A channel identifier.

t()

Functions

Add a subscriber pid to a channel.

Total number of subscriptions across all channels (useful for tests/metrics).

Build a fresh publisher state with the given channel atoms pre-initialized to empty subscriber maps.

Remove a subscriber pid from a channel.

Remove a subscriber by monitor ref (for :DOWN cleanup).

Bulk-seed subscribers from a list of {channel, pid} tuples.

Whether a pid is subscribed to a channel.

List subscriber pids for a channel.

Types

channel()

@type channel() :: atom()

A channel identifier.

t()

@type t() :: %Sagents.Publisher.State{
  channels: %{required(channel()) => %{required(pid()) => reference()}},
  monitors: %{required(reference()) => {pid(), channel()}}
}

Functions

add(state, channel, pid)

@spec add(t(), channel(), pid()) :: {reference(), t()}

Add a subscriber pid to a channel.

Idempotent — if the pid is already subscribed to the channel, the existing monitor ref is returned and no new monitor is set up.

Returns {ref, new_state}.

count(state)

@spec count(t()) :: non_neg_integer()

Total number of subscriptions across all channels (useful for tests/metrics).

new(channels \\ [:main])

@spec new([channel()]) :: t()

Build a fresh publisher state with the given channel atoms pre-initialized to empty subscriber maps.

Channels are auto-created on first subscribe, but pre-declaring them makes the supported set explicit at the host module level.

remove_pid(state, channel, pid)

@spec remove_pid(t(), channel(), pid()) :: t()

Remove a subscriber pid from a channel.

Demonitors the existing monitor (with :flush to drop any in-flight DOWN). No-op if the pid is not subscribed.

remove_ref(state, ref)

@spec remove_ref(t(), reference()) :: {:ok, t()} | :error

Remove a subscriber by monitor ref (for :DOWN cleanup).

Returns {:ok, new_state} if the ref was tracked, :error otherwise.

seed(state, entries)

@spec seed(t(), [{channel(), pid()}]) :: t()

Bulk-seed subscribers from a list of {channel, pid} tuples.

Useful in producer init/1 to enroll subscribers passed as :initial_subscribers before the GenServer starts handling messages. This closes the race between "agent started" and "subscriber missed initial events" (e.g., the :status_changed and :node_transferred broadcasts emitted from handle_continue/2).

Each entry establishes a monitor exactly as add/3 would. Duplicates within the list are deduped per-channel — the existing monitor is reused for the second occurrence.

Returns the updated state.

subscribed?(state, channel, pid)

@spec subscribed?(t(), channel(), pid()) :: boolean()

Whether a pid is subscribed to a channel.

subscribers(state, channel)

@spec subscribers(t(), channel()) :: [pid()]

List subscriber pids for a channel.