Sagents.Subscriber (Sagents v0.8.0-rc.1)
Copy MarkdownConsumer-side helpers for Sagents.Publisher producers.
Captures the boilerplate of:
- Subscribing to a producer by id rather than pid.
- Monitoring the producer so we know when it dies.
- Watching
Phoenix.Presencearrivals for that id (to handle crash-restart with a new pid, or Horde migration to a new node). - Re-subscribing automatically when the producer reappears.
- Cleaning up on caller exit (best-effort — the producer's monitor on the caller will clean us up anyway).
Targets two consumer shapes:
- Plain GenServer / process — call
subscribe_to_agent/2and friends directly; pair withhandle_publisher_down/3andhandle_presence_diff/3from yourhandle_info/2. - LiveView (or anything with a socket) —
use Sagents.Subscriberinjectshandle_info/2clauses for:DOWNand Phoenix presence diffs that delegate to those helpers.
Subscription handle
Each active subscription is tracked in the caller's local subs map (kept on
the LiveView socket under socket.private.sagents_subs, or threaded through
manually for plain-process callers).
Map shape:
%{
{:agent, agent_id} => %{
channel: :main | :debug,
server_pid: pid() | nil,
monitor_ref: reference() | nil,
state: :subscribed | :pending
},
{:filesystem, scope_key} => %{...}
}States:
:subscribed— we have a live subscription onserver_pid, monitored.:pending— we want to subscribe but the producer isn't running. Will retry on the next presence arrival.
Departure vs arrival
Monitors are reliable for departure (:DOWN fires within a scheduler tick of
process death, even cross-node when the connection drops). Phoenix.Presence is
reliable for arrival (presence_diff with joins). We never depend on
presence_diff.leaves — if it's delayed, we keep the :subscribed state
until :DOWN fires, which it will.
Summary
Functions
Handle a Phoenix.Presence diff for the agent presence topic.
Handle a :DOWN from one of the producer pids we subscribed to.
Returns the Phoenix.Presence topic string the agent presence layer uses.
Subscribe the calling process to an agent's main channel, threading the subscription map through.
Subscribe the calling process to filesystem change events for scope_key.
Unsubscribe from an agent. Tears down monitor and pending state.
Unsubscribe from a filesystem.
Types
Functions
Handle a Phoenix.Presence diff for the agent presence topic.
joins whose key matches a :pending agent subscription triggers a
re-subscribe. Returns the (possibly updated) subs map.
Handle a :DOWN from one of the producer pids we subscribed to.
Returns {:matched, new_subs} if the ref belonged to a tracked subscription
(now flipped to :pending), otherwise :no_match.
@spec presence_topic() :: String.t()
Returns the Phoenix.Presence topic string the agent presence layer uses.
Subscribe to this topic with Phoenix.PubSub.subscribe/2 to receive
presence_diff broadcasts that drive auto-resubscription.
Subscribe the calling process to an agent's main channel, threading the subscription map through.
Returns the updated subs map. If the agent is not currently running, the
subscription is recorded in :pending state and will become live as soon
as the agent appears in Phoenix.Presence on the agent presence topic.
Subscribe the calling process to filesystem change events for scope_key.
Unsubscribe from an agent. Tears down monitor and pending state.
Unsubscribe from a filesystem.