Sagents.Publisher (Sagents v0.8.0-rc.6)
Copy MarkdownDirect, monitored process subscriptions for GenServers that publish events.
This is the producer side of the Sagents publish/subscribe transport. It
provides per-server event channels (AgentServer, FileSystemServer) with a
point-to-point send/2 between the producer GenServer and a small set of
subscriber pids it tracks itself.
Why not Phoenix.PubSub?
Phoenix.PubSub is a cluster-aware broadcast bus. With a clustered adapter
every published message is serialized and shipped to every node, where each
node's local PubSub server filters by topic. For per-agent event channels this
is a lot of cross-node noise for what is, at the receiver, almost always a
single in-node process.
Direct send/2 works transparently across nodes when the pid is remote, but
routes one hop per known subscriber rather than a per-node broadcast.
Producer side
A producer GenServer embeds a Sagents.Publisher.State struct in its own
state and uses this module. The use macro installs the handle_call/3
clauses that handle subscribe/unsubscribe.
defmodule MyServer do
use GenServer
use Sagents.Publisher,
channels: [:main, :debug],
state_field: :publisher
defstruct [..., publisher: Sagents.Publisher.State.new([:main, :debug])]
def init(opts) do
{:ok, %__MODULE__{publisher: Sagents.Publisher.State.new([:main, :debug])}}
end
# Inside any handler that needs to broadcast:
defp emit(state, event) do
Sagents.Publisher.broadcast(state.publisher, :main, {:my_server, event})
state
end
endConsumer side
Subscribers locate the producer by name (typically via
Sagents.ProcessRegistry via_tuple), call Sagents.Publisher.subscribe/3,
and receive the wrapped events as ordinary process messages. The producer
monitors each subscriber to clean up on death.
Typical entry points
Most consumers should not call Publisher.subscribe/3 directly — there are
higher-level wrappers that take an agent id (or filesystem scope key) and
resolve the via-tuple internally:
Sagents.AgentServer.subscribe/3—subscribe(agent_id, channel \ :main, subscriber_pid \ nil). Covers the common case of subscribingself()to:main, plus:debug(used bysagents_live_debugger) and foreign-pid subscribers (e.g. a bridge GenServer proxying events to another transport).Sagents.FileSystemServer.subscribe/1— same shape for filesystem change events.Sagents.Subscriber— for hosts (LiveView, GenServer) that need crash-recovery + Phoenix.Presence-driven re-subscription. Wraps the AgentServer/FileSystemServer entry points and threads a subs map through the host's state.
Reach for Publisher.subscribe/3 directly only when implementing a new
producer module that needs to expose its own subscribe/N shorthand.
Returned subscription handle
subscribe/3 returns {:ok, server_pid, monitor_ref}. The subscriber is
expected to Process.monitor/1 the server pid (or use the included
monitor_ref from this side — the producer's monitor on the subscriber) to
detect server death and trigger re-subscription via Presence arrival.
Most consumers should use Sagents.Subscriber rather than calling this module
directly.
Summary
Functions
Inject subscribe/unsubscribe handlers into the host GenServer.
Broadcast a message to every subscriber of a channel.
Handle a {:DOWN, ref, :process, pid, reason} info message.
Subscribe a pid to a channel on the named producer.
Unsubscribe a pid from a channel.
Types
@type channel() :: atom()
A channel identifier used to partition subscribers within a single producer.
Functions
Inject subscribe/unsubscribe handlers into the host GenServer.
Options:
:state_field(required) — the field name on the host's state struct that holds the%Sagents.Publisher.State{}value.:channels(optional) — list of channel atoms the host supports. Defaults to[:main]. Used only for documentation/validation; the runtime data structure is created bySagents.Publisher.State.new/1from the host'sinit/1.
The macro injects two handle_call/3 clauses that match on
{:__publisher__, channel, :subscribe | :unsubscribe, pid}. These clauses
must come before any catch-all handle_call/3 clause in the host.
The macro also injects a default no-op on_subscribed/3 callback that the
host can override to send a state snapshot to a newly registered subscriber.
This is how late subscribers (e.g. a LiveView mounting after the producer
has already broadcast its initial state) get synced to the current state
without relying on out-of-band polling.
def on_subscribed(:main, subscriber_pid, state) do
send(subscriber_pid, {:agent, {:status_changed, state.status, state.data}})
state
end
def on_subscribed(_channel, _pid, state), do: stateThe callback runs inside the subscribe handle_call after registration
but before the reply, so the snapshot is guaranteed to arrive at the
subscriber before any later broadcasts on the same channel.
@spec broadcast(Sagents.Publisher.State.t(), channel(), term()) :: Sagents.Publisher.State.t()
Broadcast a message to every subscriber of a channel.
This is a fire-and-forget send/2 per subscriber. No filtering on
source pid — the producer is the source by construction.
Returns the publisher state unchanged.
@spec handle_down(Sagents.Publisher.State.t(), reference(), pid()) :: {:matched, Sagents.Publisher.State.t()} | :no_match
Handle a {:DOWN, ref, :process, pid, reason} info message.
If the ref belongs to a tracked subscriber, returns
{:matched, new_publisher_state}. Otherwise returns :no_match so the
host can delegate to its own DOWN handlers (e.g. for Task monitors).
@spec subscribe(GenServer.server(), channel(), pid() | nil) :: {:ok, pid(), reference()} | {:error, :process_not_found}
Subscribe a pid to a channel on the named producer.
server may be a pid, a registered name atom, or a :via tuple.
Defaults the subscriber to self().
Returns {:ok, server_pid, monitor_ref} on success, where monitor_ref
is the ref the producer uses to monitor this subscriber. The subscriber
may also Process.monitor/1 the returned server_pid to detect
producer death.
Returns {:error, :process_not_found} if the producer is not running.
@spec unsubscribe(GenServer.server(), channel(), pid() | nil) :: :ok
Unsubscribe a pid from a channel.
Returns :ok. If the producer is no longer running, returns :ok
(the subscriber would have been cleaned up anyway).