Amarula.Protocol.Messages.ConversationSender (amarula v0.1.0)
View SourcePer-recipient send process. One ConversationSender exists per recipient JID;
all sends to that recipient funnel through it and run one at a time, which
serializes Signal ratchet advance for that recipient's session (no per-address
lock needed). Different recipients run in parallel under the DynamicSupervisor.
A send is a branchless pipe of ctx -> ctx steps that block on IQ round-trips
through Connection (the sole websocket owner):
ctx
|> resolve_devices() # device-list cache, else a USync query
|> ensure_sessions() # session files, else a prekey-bundle fetch
|> encrypt() # per device; plain vs DSM; advance ratchet
|> relay() # frame + send the <participants> stanzaEach step that needs server data calls Connection.query_iq/2, which
blocks until the matching websocket reply arrives. A step failure crashes the
process (the DynamicSupervisor reaps it); the pipe carries no error branches.
Lifecycle & registry presence
Identity. A sender's identity is its recipient JID. It is registered in
the per-instance Registry under {registry, recipient_jid} — so at most one
sender per recipient exists at a time, and deliver/2 is a find-or-start on
that key.
Birth (lazy). Started on the first deliver/2 to a recipient with no live
sender: find-or-start via Registry.lookup → else DynamicSupervisor.start_child.
The {:error, {:already_started, pid}} branch makes the start race-safe even
though, in practice, only Connection calls deliver (single process, so no
concurrent start for the same recipient).
Registration. Automatic, via the :via name in start_link — the Registry
registers the pid on start and auto-unregisters it on death (it monitors the
pid). So a dead sender's key vanishes; there are never stale registry entries to
reap.
Life. Serializes all sends to its recipient (one pipe at a time → ordered
ratchet advance, no per-address lock). Different recipients run in parallel.
Holds no durable state: sessions/keys live in Storage, the consumer's from
is parked in Connection. So a sender is cheap to lose and cheap to respawn.
Death (three ways).
- Idle. After
@idle_timeout_mswith an empty mailbox it{:stop, :normal}s. - Crash. A raise in the pipe (Signal error, USync blowup, bad bundle) kills
it.
restart: :temporary⇒ the supervisor does NOT restart it; the nextdeliverrespawns a fresh one. In-flight + queued sends are lost. - Shutdown. The connection's supervision tree going down takes it too. All three auto-unregister the Registry key (the Registry's pid monitor).
Rebirth. The next deliver/2 to that recipient starts a fresh sender — no
carried state; it re-reads sessions from Storage.
Crash ⇒ parked-send recovery. Because the consumer's from lives in
Connection (not in the dying sender), Connection monitors each sender and,
on its :DOWN, fails every parked send for that recipient with
{:error, {:sender_crashed, reason}} — promptly, instead of letting the caller
hang to the ack-timeout. A :normal idle-stop fails nothing (no in-flight
sends). See Amarula.Connection (ensure_sender_monitor / the :DOWN handler)
and docs/plans/SENDER_CRASH_FIX.plan.md.
Summary
Functions
Returns a specification to start this module under a supervisor.
Hand a message to the recipient's sender (start-or-lookup), asynchronously. The send runs on the per-recipient process — serialized per recipient, parallel across recipients — so the CALLING process (Connection) is not blocked.
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
Hand a message to the recipient's sender (start-or-lookup), asynchronously. The send runs on the per-recipient process — serialized per recipient, parallel across recipients — so the CALLING process (Connection) is not blocked.
msg carries :msg_id. The sender does NOT reply the consumer (ack-on-send,
Design 2): it runs the pipe and reports the PIPE result back to Connection
(state.cm), which owns the parked consumer from and replies it at ack time:
- relay succeeded (frame written) →
{:send_relayed, msg_id}— Connection keeps the parked entry and awaits the server's<ack>. - pipe failed (not_on_whatsapp / IQ timeout / encrypt error / plugin halt)
→
{:send_failed, msg_id, reason}— no frame went out, so Connection replies the parked caller the failure immediately.
Returns {:ok, pid} — the (started or reused) sender pid, so Connection can
monitor it and fail the recipient's parked sends if it crashes mid-pipe — or
{:error, reason} if the sender could not be started (e.g. :max_children).
A start failure is a recoverable send failure: Connection maps it to a
{:send_failed, msg_id, reason} for the parked caller rather than crashing.