Sagents.Publisher (Sagents v0.8.0-rc.6)

Copy Markdown

Direct, monitored process subscriptions for GenServers that publish events.

This is the producer side of the Sagents publish/subscribe transport. It provides per-server event channels (AgentServer, FileSystemServer) with a point-to-point send/2 between the producer GenServer and a small set of subscriber pids it tracks itself.

Why not Phoenix.PubSub?

Phoenix.PubSub is a cluster-aware broadcast bus. With a clustered adapter every published message is serialized and shipped to every node, where each node's local PubSub server filters by topic. For per-agent event channels this is a lot of cross-node noise for what is, at the receiver, almost always a single in-node process.

Direct send/2 works transparently across nodes when the pid is remote, but routes one hop per known subscriber rather than a per-node broadcast.

Producer side

A producer GenServer embeds a Sagents.Publisher.State struct in its own state and uses this module. The use macro installs the handle_call/3 clauses that handle subscribe/unsubscribe.

defmodule MyServer do
  use GenServer
  use Sagents.Publisher,
    channels: [:main, :debug],
    state_field: :publisher

  defstruct [..., publisher: Sagents.Publisher.State.new([:main, :debug])]

  def init(opts) do
    {:ok, %__MODULE__{publisher: Sagents.Publisher.State.new([:main, :debug])}}
  end

  # Inside any handler that needs to broadcast:
  defp emit(state, event) do
    Sagents.Publisher.broadcast(state.publisher, :main, {:my_server, event})
    state
  end
end

Consumer side

Subscribers locate the producer by name (typically via Sagents.ProcessRegistry via_tuple), call Sagents.Publisher.subscribe/3, and receive the wrapped events as ordinary process messages. The producer monitors each subscriber to clean up on death.

Typical entry points

Most consumers should not call Publisher.subscribe/3 directly — there are higher-level wrappers that take an agent id (or filesystem scope key) and resolve the via-tuple internally:

  • Sagents.AgentServer.subscribe/3subscribe(agent_id, channel \ :main, subscriber_pid \ nil). Covers the common case of subscribing self() to :main, plus :debug (used by sagents_live_debugger) and foreign-pid subscribers (e.g. a bridge GenServer proxying events to another transport).
  • Sagents.FileSystemServer.subscribe/1 — same shape for filesystem change events.
  • Sagents.Subscriber — for hosts (LiveView, GenServer) that need crash-recovery + Phoenix.Presence-driven re-subscription. Wraps the AgentServer/FileSystemServer entry points and threads a subs map through the host's state.

Reach for Publisher.subscribe/3 directly only when implementing a new producer module that needs to expose its own subscribe/N shorthand.

Returned subscription handle

subscribe/3 returns {:ok, server_pid, monitor_ref}. The subscriber is expected to Process.monitor/1 the server pid (or use the included monitor_ref from this side — the producer's monitor on the subscriber) to detect server death and trigger re-subscription via Presence arrival.

Most consumers should use Sagents.Subscriber rather than calling this module directly.

Summary

Types

A channel identifier used to partition subscribers within a single producer.

Functions

Inject subscribe/unsubscribe handlers into the host GenServer.

Broadcast a message to every subscriber of a channel.

Handle a {:DOWN, ref, :process, pid, reason} info message.

Subscribe a pid to a channel on the named producer.

Types

channel()

@type channel() :: atom()

A channel identifier used to partition subscribers within a single producer.

Functions

__using__(opts)

(macro)

Inject subscribe/unsubscribe handlers into the host GenServer.

Options:

  • :state_field (required) — the field name on the host's state struct that holds the %Sagents.Publisher.State{} value.
  • :channels (optional) — list of channel atoms the host supports. Defaults to [:main]. Used only for documentation/validation; the runtime data structure is created by Sagents.Publisher.State.new/1 from the host's init/1.

The macro injects two handle_call/3 clauses that match on {:__publisher__, channel, :subscribe | :unsubscribe, pid}. These clauses must come before any catch-all handle_call/3 clause in the host.

The macro also injects a default no-op on_subscribed/3 callback that the host can override to send a state snapshot to a newly registered subscriber. This is how late subscribers (e.g. a LiveView mounting after the producer has already broadcast its initial state) get synced to the current state without relying on out-of-band polling.

def on_subscribed(:main, subscriber_pid, state) do
  send(subscriber_pid, {:agent, {:status_changed, state.status, state.data}})
  state
end

def on_subscribed(_channel, _pid, state), do: state

The callback runs inside the subscribe handle_call after registration but before the reply, so the snapshot is guaranteed to arrive at the subscriber before any later broadcasts on the same channel.

broadcast(pub, channel, message)

Broadcast a message to every subscriber of a channel.

This is a fire-and-forget send/2 per subscriber. No filtering on source pid — the producer is the source by construction.

Returns the publisher state unchanged.

handle_down(pub, ref, pid)

@spec handle_down(Sagents.Publisher.State.t(), reference(), pid()) ::
  {:matched, Sagents.Publisher.State.t()} | :no_match

Handle a {:DOWN, ref, :process, pid, reason} info message.

If the ref belongs to a tracked subscriber, returns {:matched, new_publisher_state}. Otherwise returns :no_match so the host can delegate to its own DOWN handlers (e.g. for Task monitors).

subscribe(server, channel \\ :main, subscriber_pid \\ nil)

@spec subscribe(GenServer.server(), channel(), pid() | nil) ::
  {:ok, pid(), reference()} | {:error, :process_not_found}

Subscribe a pid to a channel on the named producer.

server may be a pid, a registered name atom, or a :via tuple. Defaults the subscriber to self().

Returns {:ok, server_pid, monitor_ref} on success, where monitor_ref is the ref the producer uses to monitor this subscriber. The subscriber may also Process.monitor/1 the returned server_pid to detect producer death.

Returns {:error, :process_not_found} if the producer is not running.

unsubscribe(server, channel \\ :main, subscriber_pid \\ nil)

@spec unsubscribe(GenServer.server(), channel(), pid() | nil) :: :ok

Unsubscribe a pid from a channel.

Returns :ok. If the producer is no longer running, returns :ok (the subscriber would have been cleaned up anyway).