Amarula.Protocol.Messages.ConversationSender (amarula v0.1.0)

View Source

Per-recipient send process. One ConversationSender exists per recipient JID; all sends to that recipient funnel through it and run one at a time, which serializes Signal ratchet advance for that recipient's session (no per-address lock needed). Different recipients run in parallel under the DynamicSupervisor.

A send is a branchless pipe of ctx -> ctx steps that block on IQ round-trips through Connection (the sole websocket owner):

ctx
|> resolve_devices()   # device-list cache, else a USync query
|> ensure_sessions()   # session files, else a prekey-bundle fetch
|> encrypt()           # per device; plain vs DSM; advance ratchet
|> relay()             # frame + send the <participants> stanza

Each step that needs server data calls Connection.query_iq/2, which blocks until the matching websocket reply arrives. A step failure crashes the process (the DynamicSupervisor reaps it); the pipe carries no error branches.

Lifecycle & registry presence

Identity. A sender's identity is its recipient JID. It is registered in the per-instance Registry under {registry, recipient_jid} — so at most one sender per recipient exists at a time, and deliver/2 is a find-or-start on that key.

Birth (lazy). Started on the first deliver/2 to a recipient with no live sender: find-or-start via Registry.lookup → else DynamicSupervisor.start_child. The {:error, {:already_started, pid}} branch makes the start race-safe even though, in practice, only Connection calls deliver (single process, so no concurrent start for the same recipient).

Registration. Automatic, via the :via name in start_link — the Registry registers the pid on start and auto-unregisters it on death (it monitors the pid). So a dead sender's key vanishes; there are never stale registry entries to reap.

Life. Serializes all sends to its recipient (one pipe at a time → ordered ratchet advance, no per-address lock). Different recipients run in parallel. Holds no durable state: sessions/keys live in Storage, the consumer's from is parked in Connection. So a sender is cheap to lose and cheap to respawn.

Death (three ways).

  1. Idle. After @idle_timeout_ms with an empty mailbox it {:stop, :normal}s.
  2. Crash. A raise in the pipe (Signal error, USync blowup, bad bundle) kills it. restart: :temporary ⇒ the supervisor does NOT restart it; the next deliver respawns a fresh one. In-flight + queued sends are lost.
  3. Shutdown. The connection's supervision tree going down takes it too. All three auto-unregister the Registry key (the Registry's pid monitor).

Rebirth. The next deliver/2 to that recipient starts a fresh sender — no carried state; it re-reads sessions from Storage.

Crash ⇒ parked-send recovery. Because the consumer's from lives in Connection (not in the dying sender), Connection monitors each sender and, on its :DOWN, fails every parked send for that recipient with {:error, {:sender_crashed, reason}} — promptly, instead of letting the caller hang to the ack-timeout. A :normal idle-stop fails nothing (no in-flight sends). See Amarula.Connection (ensure_sender_monitor / the :DOWN handler) and docs/plans/SENDER_CRASH_FIX.plan.md.

Summary

Functions

Returns a specification to start this module under a supervisor.

Hand a message to the recipient's sender (start-or-lookup), asynchronously. The send runs on the per-recipient process — serialized per recipient, parallel across recipients — so the CALLING process (Connection) is not blocked.

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

deliver(opts, msg)

@spec deliver(
  keyword(),
  map()
) :: {:ok, pid()} | {:error, term()}

Hand a message to the recipient's sender (start-or-lookup), asynchronously. The send runs on the per-recipient process — serialized per recipient, parallel across recipients — so the CALLING process (Connection) is not blocked.

msg carries :msg_id. The sender does NOT reply the consumer (ack-on-send, Design 2): it runs the pipe and reports the PIPE result back to Connection (state.cm), which owns the parked consumer from and replies it at ack time:

  • relay succeeded (frame written) → {:send_relayed, msg_id} — Connection keeps the parked entry and awaits the server's <ack>.
  • pipe failed (not_on_whatsapp / IQ timeout / encrypt error / plugin halt) → {:send_failed, msg_id, reason} — no frame went out, so Connection replies the parked caller the failure immediately.

Returns {:ok, pid} — the (started or reused) sender pid, so Connection can monitor it and fail the recipient's parked sends if it crashes mid-pipe — or {:error, reason} if the sender could not be started (e.g. :max_children). A start failure is a recoverable send failure: Connection maps it to a {:send_failed, msg_id, reason} for the parked caller rather than crashing.

start_link(opts)