Sagents.Subscriber (Sagents v0.8.0-rc.6)

Copy Markdown

Consumer-side helpers for Sagents.Publisher producers.

Captures the boilerplate of:

  1. Subscribing to a producer by id rather than pid.
  2. Monitoring the producer so we know when it dies.
  3. Watching Phoenix.Presence arrivals for that id (to handle crash-restart with a new pid, or Horde migration to a new node).
  4. Re-subscribing automatically when the producer reappears.
  5. Cleaning up on caller exit (best-effort — the producer's monitor on the caller will clean us up anyway).

Targets two consumer shapes:

  • Plain GenServer / process — call subscribe_to_agent/2 and friends directly; pair with handle_publisher_down/3 and handle_presence_diff/3 from your handle_info/2.
  • LiveView (or anything with a socket)use Sagents.Subscriber injects handle_info/2 clauses for :DOWN and Phoenix presence diffs that delegate to those helpers.

Subscription handle

Each active subscription is tracked in the caller's local subs map (kept on the LiveView socket under socket.private.sagents_subs, or threaded through manually for plain-process callers).

Map shape:

%{
  {:agent, agent_id} => %{
    channel: :main | :debug,
    server_pid: pid() | nil,
    monitor_ref: reference() | nil,
    state: :subscribed | :pending
  },
  {:filesystem, scope_key} => %{...}
}

States:

  • :subscribed — we have a live subscription on server_pid, monitored.
  • :pending — we want to subscribe but the producer isn't running. Will retry on the next presence arrival.

Departure vs arrival

Monitors are reliable for departure (:DOWN fires within a scheduler tick of process death, even cross-node when the connection drops). Phoenix.Presence is reliable for arrival (presence_diff with joins). We never depend on presence_diff.leaves — if it's delayed, we keep the :subscribed state until :DOWN fires, which it will.

Summary

Functions

Handle a Phoenix.Presence diff for the agent presence topic.

Handle a :DOWN from one of the producer pids we subscribed to.

Returns the Phoenix.Presence topic string the agent presence layer uses.

Subscribe the calling process to an agent's main channel, threading the subscription map through.

Subscribe the calling process to filesystem change events for scope_key.

Unsubscribe from an agent. Tears down monitor and pending state.

Unsubscribe from a filesystem.

Types

sub_entry()

@type sub_entry() :: %{
  channel: atom(),
  server_pid: pid() | nil,
  monitor_ref: reference() | nil,
  state: :subscribed | :pending
}

sub_key()

@type sub_key() :: {:agent, String.t()} | {:filesystem, term()}

subs()

@type subs() :: %{required(sub_key()) => sub_entry()}

Functions

handle_presence_diff(subs, arg2, arg3)

@spec handle_presence_diff(subs(), String.t(), map()) :: subs()

Handle a Phoenix.Presence diff for the agent presence topic.

joins whose key matches a :pending agent subscription triggers a re-subscribe. Returns the (possibly updated) subs map.

handle_publisher_down(subs, ref, reason)

@spec handle_publisher_down(subs(), reference(), term()) ::
  {:matched, subs()} | :no_match

Handle a :DOWN from one of the producer pids we subscribed to.

Returns {:matched, new_subs} if the ref belonged to a tracked subscription (now flipped to :pending), otherwise :no_match.

presence_topic()

@spec presence_topic() :: String.t()

Returns the Phoenix.Presence topic string the agent presence layer uses.

Subscribe to this topic with Phoenix.PubSub.subscribe/2 to receive presence_diff broadcasts that drive auto-resubscription.

subscribe_to_agent(subs, agent_id, channel \\ :main)

@spec subscribe_to_agent(subs(), String.t(), :main | :debug) :: subs()

Subscribe the calling process to an agent's main channel, threading the subscription map through.

Returns the updated subs map. If the agent is not currently running, the subscription is recorded in :pending state and will become live as soon as the agent appears in Phoenix.Presence on the agent presence topic.

subscribe_to_filesystem(subs, scope_key, channel \\ :main)

@spec subscribe_to_filesystem(subs(), term(), :main) :: subs()

Subscribe the calling process to filesystem change events for scope_key.

unsubscribe_from_agent(subs, agent_id)

@spec unsubscribe_from_agent(subs(), String.t()) :: subs()

Unsubscribe from an agent. Tears down monitor and pending state.

unsubscribe_from_filesystem(subs, scope_key)

@spec unsubscribe_from_filesystem(subs(), term()) :: subs()

Unsubscribe from a filesystem.