Amarula.Connection (amarula v0.1.0)

View Source

The per-connection process that owns the entire server conversation: the WebSocketClient, the noise cipher (frame encode/decode + counters), IQ correlation, login/handshake, send dispatch, and server-notification handling.

It is also the consumer's endpoint — the pid Amarula.connect/2 returns. Consumer calls (connect, send_text, group_*, …) land here directly, and consumer events go straight to the connection's parent_pid as {:whatsapp, type, data} (no relay process, no subscriber registry).

Per-send work stays tiny: Connection only frames + writes + correlates (acks, IQ replies). The heavy USync/bundle waits and Signal encrypt run on the per-recipient ConversationSender, which hands back a ready stanza to relay.

Summary

Functions

Cast a request to force-refresh sessions for newly mapped LIDs.

Remember a just-sent message (id → content + recipient) so it can be re-encrypted and resent if the recipient sends a type="retry" receipt. Bounded LRU; the ConversationSender calls this after a successful relay.

Canonicalize jid to its phone-number identity. If jid is a LID (<n>@lid) with a stored PN mapping, returns the equivalent <pn>@s.whatsapp.net (preserving any device). Any other jid — already a PN, a group, or a LID with no known mapping — is returned unchanged.

Returns a specification to start this module under a supervisor.

Connects to the WebSocket server.

Strip a decoded frame's 1-byte prefix, inflating when compressed. Bit 1 (0x02) of the prefix means the remainder is zlib-compressed (Baileys decompressingIfRequired); the server compresses larger frames. Public for testability.

Disconnects from the WebSocket server.

Ask the phone for older history of a chat (PEER_DATA_OPERATION on-demand).

Current auth creds (carries me.id/me.lid/account once logged in).

The connection's %Amarula.Conn{} — its storage scope + profile. Library code that needs to read/write the Storage seam (sessions, LID mappings) from the caller's process holds the pid; this hands it the %Conn{} those stores key by. Internal: consumers use the higher-level facade calls.

Gets the current connection state.

Fetch one group's metadata. group is an Address or the @g.us jid string.

Run a group management op: send the IQ Groups.Ops.<builder> produced and run transform on the reply. transform is fn {:ok, node} | {:error, node} -> result end. Used by the Amarula group_* API.

Fetch all groups we participate in.

Start a connection instance and return its pid — the consumer's handle. Starts the per-connection supervision tree (Registry, caches, sender supervisor, this Connection) and hands back the Connection child pid, so the public API (connect/send_text/... on that pid) lands here directly.

Send a read receipt for message_ids in chat jid (optional participant).

Whether msg is a message in our own chat (the "Message Yourself" chat) — i.e. it's from_me and addressed to our own account.

Subscribe to a contact's presence.

Send an IQ and block until the matching websocket reply arrives.

Frame and send a stanza over the websocket (fire-and-forget; no IQ reply awaited). Used by the send path to relay the final <message>.

Request a link-code (phone-number) pairing code for phone (digits only, E.164 without +).

Ask the phone to re-deliver a message by key (PEER_DATA_OPERATION resend).

Send a chat-state to jid (:composing/:recording/:paused).

Send a contact (display_name + vCard string) to jid.

Send multiple contacts: pairs is [{display_name, vcard}, ...].

Edit a previously-sent message, replacing its text with new_text.

Convenience: send_media(:image, ...).

Send a location to jid. opts: :name, :address, :url, :is_live.

Send a media message to jid. type is :image/:video/:audio/ :document/:sticker; data is the raw bytes. opts may carry :mimetype plus per-type extras. Encrypts + uploads + sends. {:ok, msg_id} or {:error, _}.

Send a pre-built %Proto.Message{} to jid (1:1 or group). Used for reactions, edits, deletes and media. Returns {:ok, msg_id}.

Send a poll to jid. Returns {:ok, msg_id, message_secret} — keep the secret to tally incoming votes. opts: :selectable, :announcement, :message_secret.

React to a message with emoji (pass "" to remove the reaction). target_key is the %Proto.MessageKey{} of the message being reacted to.

Delete a message for everyone (revoke). target_key is its %Proto.MessageKey{}.

Send a 1:1/group text message to jid. Encrypts and relays (fetching the recipient's prekey bundle first if we have no session). Returns {:ok, msg_id}.

Send global presence (:available/:unavailable). Needs me.name.

Starts the connection — the per-connection process that owns the websocket, the noise cipher, IQ correlation, login, sends, and consumer-event delivery.

Stop the whole connection tree (this Connection, its caches + sender supervisor), freeing the profile registration. Unlike disconnect/1 (which only closes the websocket, leaving the supervised tree up to reconnect), this releases the profile so it can be started again elsewhere. Returns :ok | {:error, :not_found}.

Updates authentication credentials.

Unlink this companion server-side, wipe ALL local storage, then disconnect. Destructive.

Types

t()

@type t() :: %Amarula.Connection{
  auth_creds: Amarula.Protocol.Auth.AuthUtils.auth_creds() | nil,
  config: Amarula.Protocol.Socket.Types.socket_config(),
  conn: term(),
  connection_state: Amarula.Protocol.Socket.Types.connection_state(),
  connection_timeout_timer: reference() | nil,
  handshake_state:
    Amarula.Protocol.Socket.ConnectionValidator.handshake_state() | nil,
  instance_id: reference() | nil,
  keep_alive_timer: reference() | nil,
  last_error: term() | nil,
  last_recv_time: non_neg_integer() | nil,
  max_retries: non_neg_integer(),
  message_counter: non_neg_integer(),
  message_epoch: non_neg_integer(),
  message_tag_prefix: String.t(),
  msg_retry_counts: %{required(String.t()) => non_neg_integer()},
  noise_state: Amarula.Protocol.Crypto.NoiseHandler.noise_state() | nil,
  parent_pid: pid() | nil,
  pending_acks: %{
    required(String.t()) =>
      {GenServer.from(), (:ok -> term()), reference(), String.t()}
  },
  pending_iqs: %{required(String.t()) => {atom(), reference()}},
  pending_sends: %{
    required(String.t()) => %{
      msg_id: String.t(),
      text: String.t(),
      target_jid: String.t(),
      devices: [map()]
    }
  },
  qr_refs: [String.t()],
  qr_timer: reference() | nil,
  retry_count: non_neg_integer(),
  retry_delay: non_neg_integer(),
  retry_timer: reference() | nil,
  sender_monitors: %{required(String.t()) => reference()},
  server_response_timeout_timer: reference() | nil,
  waiting_for_server_response: boolean(),
  websocket_client: pid() | nil
}

Functions

assert_lid_sessions(pid, lids)

@spec assert_lid_sessions(GenServer.server(), [String.t()]) :: :ok

Cast a request to force-refresh sessions for newly mapped LIDs.

cache_sent_message(pid, msg_id, recipient_jid, message)

@spec cache_sent_message(GenServer.server(), String.t(), String.t(), struct()) :: :ok

Remember a just-sent message (id → content + recipient) so it can be re-encrypted and resent if the recipient sends a type="retry" receipt. Bounded LRU; the ConversationSender calls this after a successful relay.

canonical_jid(pid \\ __MODULE__, jid)

@spec canonical_jid(GenServer.server(), String.t()) :: String.t()

Canonicalize jid to its phone-number identity. If jid is a LID (<n>@lid) with a stored PN mapping, returns the equivalent <pn>@s.whatsapp.net (preserving any device). Any other jid — already a PN, a group, or a LID with no known mapping — is returned unchanged.

This is the public entry point to the LID↔PN mapping the library maintains internally; consumers no longer need to reach into Protocol.Signal.*.

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

connect(pid \\ __MODULE__)

Connects to the WebSocket server.

decompress_frame(frame)

@spec decompress_frame(binary()) :: binary()

Strip a decoded frame's 1-byte prefix, inflating when compressed. Bit 1 (0x02) of the prefix means the remainder is zlib-compressed (Baileys decompressingIfRequired); the server compresses larger frames. Public for testability.

disconnect(pid \\ __MODULE__)

Disconnects from the WebSocket server.

fetch_history(pid \\ __MODULE__, oldest_key, oldest_ts, count)

Ask the phone for older history of a chat (PEER_DATA_OPERATION on-demand).

get_auth_creds(pid)

Current auth creds (carries me.id/me.lid/account once logged in).

get_conn(pid \\ __MODULE__)

@spec get_conn(GenServer.server()) :: Amarula.Conn.t()

The connection's %Amarula.Conn{} — its storage scope + profile. Library code that needs to read/write the Storage seam (sessions, LID mappings) from the caller's process holds the pid; this hands it the %Conn{} those stores key by. Internal: consumers use the higher-level facade calls.

get_connection_state(pid \\ __MODULE__)

Gets the current connection state.

group_metadata(pid \\ __MODULE__, group)

@spec group_metadata(GenServer.server(), Amarula.jid()) ::
  {:ok, Amarula.Group.t()} | {:error, term()}

Fetch one group's metadata. group is an Address or the @g.us jid string.

group_op(pid \\ __MODULE__, iq, transform)

@spec group_op(GenServer.server(), Amarula.Protocol.Binary.Node.t(), (term() ->
                                                                  term())) ::
  term()

Run a group management op: send the IQ Groups.Ops.<builder> produced and run transform on the reply. transform is fn {:ok, node} | {:error, node} -> result end. Used by the Amarula group_* API.

list_groups(pid \\ __MODULE__)

@spec list_groups(GenServer.server()) :: {:ok, [Amarula.Group.t()]} | {:error, term()}

Fetch all groups we participate in.

make_socket(conn, opts \\ [])

@spec make_socket(
  Amarula.Conn.t(),
  keyword()
) :: {:ok, pid()} | {:error, {:already_running, pid()}} | {:error, term()}

Start a connection instance and return its pid — the consumer's handle. Starts the per-connection supervision tree (Registry, caches, sender supervisor, this Connection) and hands back the Connection child pid, so the public API (connect/send_text/... on that pid) lands here directly.

opts may carry :parent_pid (events sink, default the caller).

mark_read(pid \\ __MODULE__, message_ids, jid, participant \\ nil)

@spec mark_read(
  GenServer.server(),
  [String.t(), ...],
  Amarula.jid(),
  Amarula.jid() | nil
) :: :ok

Send a read receipt for message_ids in chat jid (optional participant).

own_chat?(pid \\ __MODULE__, msg)

@spec own_chat?(GenServer.server(), Amarula.Msg.t()) :: boolean()

Whether msg is a message in our own chat (the "Message Yourself" chat) — i.e. it's from_me and addressed to our own account.

This is the check a self-chat command channel needs (drive an agent by messaging yourself), and it handles the LID/PN duality for you: WhatsApp may address the self chat by either our PN (me.id, always present) or our LID (me.lid, present once the server has sent it), so it matches msg.to against both of our own identities (device ignored) rather than forcing a single normalized form.

On a single connection there is no feedback loop: a reply this connection sends to the self chat is delivered to our other devices but not back to us (the send path excludes our own sending device), so you don't need to filter your own sends. Only when running two connections on the same account do their self-chat sends reach each other — dedupe those cross-connection by the msg_id from the send.

presence_subscribe(pid \\ __MODULE__, jid)

@spec presence_subscribe(GenServer.server(), Amarula.jid()) :: :ok

Subscribe to a contact's presence.

query_iq(pid, node, timeout \\ 25000)

Send an IQ and block until the matching websocket reply arrives.

Returns {:ok, node} on a type="result" reply, {:error, node} on an error reply, or {:error, :timeout} if no reply comes within the IQ timeout. The caller (a ConversationSender) blocks; Connection keeps owning the socket and just routes the reply back. This is the only correlation primitive the send path needs — no continuation logic lives here.

relay_stanza(pid, node)

@spec relay_stanza(GenServer.server(), Amarula.Protocol.Binary.Node.t()) :: :ok

Frame and send a stanza over the websocket (fire-and-forget; no IQ reply awaited). Used by the send path to relay the final <message>.

request_pairing_code(pid \\ __MODULE__, phone, opts \\ [])

@spec request_pairing_code(GenServer.server(), String.t(), keyword()) ::
  {:ok, String.t()} | {:error, term()}

Request a link-code (phone-number) pairing code for phone (digits only, E.164 without +).

Call this during the QR window while unregistered (on the first :connection_update carrying a qr). Returns {:ok, code} with an 8-char code the user types into WhatsApp → Linked Devices → "Link with phone number". The server later pushes a link_code_companion_reg notification, which we finish internally; the usual 515 restart then logs in.

Pass custom_code: "ABCD2345" to use a fixed 8-char code instead of a random one.

request_resend(pid \\ __MODULE__, message_key)

Ask the phone to re-deliver a message by key (PEER_DATA_OPERATION resend).

send_chatstate(pid \\ __MODULE__, jid, type)

@spec send_chatstate(
  GenServer.server(),
  Amarula.jid(),
  :composing | :recording | :paused
) :: :ok

Send a chat-state to jid (:composing/:recording/:paused).

send_contact(pid \\ __MODULE__, jid, display_name, vcard)

Send a contact (display_name + vCard string) to jid.

send_contacts(pid \\ __MODULE__, jid, display_name, pairs)

Send multiple contacts: pairs is [{display_name, vcard}, ...].

send_edit(pid \\ __MODULE__, target_key, new_text)

Edit a previously-sent message, replacing its text with new_text.

send_image(pid \\ __MODULE__, jid, data, opts \\ [])

Convenience: send_media(:image, ...).

send_location(pid \\ __MODULE__, jid, lat, lng, opts \\ [])

Send a location to jid. opts: :name, :address, :url, :is_live.

send_media(pid \\ __MODULE__, type, jid, data, opts \\ [])

Send a media message to jid. type is :image/:video/:audio/ :document/:sticker; data is the raw bytes. opts may carry :mimetype plus per-type extras. Encrypts + uploads + sends. {:ok, msg_id} or {:error, _}.

send_message(pid \\ __MODULE__, jid, message)

Send a pre-built %Proto.Message{} to jid (1:1 or group). Used for reactions, edits, deletes and media. Returns {:ok, msg_id}.

send_poll(pid \\ __MODULE__, jid, name, options, opts \\ [])

Send a poll to jid. Returns {:ok, msg_id, message_secret} — keep the secret to tally incoming votes. opts: :selectable, :announcement, :message_secret.

send_reaction(pid \\ __MODULE__, target_key, emoji)

React to a message with emoji (pass "" to remove the reaction). target_key is the %Proto.MessageKey{} of the message being reacted to.

send_revoke(pid \\ __MODULE__, target_key)

Delete a message for everyone (revoke). target_key is its %Proto.MessageKey{}.

send_text(pid \\ __MODULE__, jid, text)

Send a 1:1/group text message to jid. Encrypts and relays (fetching the recipient's prekey bundle first if we have no session). Returns {:ok, msg_id}.

set_presence(pid \\ __MODULE__, type)

@spec set_presence(GenServer.server(), :available | :unavailable) ::
  :ok | {:error, term()}

Send global presence (:available/:unavailable). Needs me.name.

start_link(conn, opts \\ [])

Starts the connection — the per-connection process that owns the websocket, the noise cipher, IQ correlation, login, sends, and consumer-event delivery.

opts:

  • :name — registered name (default __MODULE__)
  • :parent_pid — process to receive {:whatsapp, type, data} events

stop(pid)

@spec stop(pid()) :: :ok | {:error, :not_found}

Stop the whole connection tree (this Connection, its caches + sender supervisor), freeing the profile registration. Unlike disconnect/1 (which only closes the websocket, leaving the supervised tree up to reconnect), this releases the profile so it can be started again elsewhere. Returns :ok | {:error, :not_found}.

update_auth_creds(pid, new_creds)

Updates authentication credentials.

Called when credentials are updated (e.g., after successful pairing).

wipe_credentials(pid \\ __MODULE__)

@spec wipe_credentials(GenServer.server()) :: :ok | {:error, term()}

Unlink this companion server-side, wipe ALL local storage, then disconnect. Destructive.