Amarula.Protocol.Socket.IQ (amarula v0.1.0)

View Source

IQ request/response correlation, extracted from Connection as a pure module over the pending-IQ map. CM stays the only process and owns the socket, timers, and the tracked-kind continuations; this module decides what to do with a reply/timeout and returns an effect for CM to perform.

An outbound IQ is registered with one of two intents, keyed by its id:

  • {:tracked, kind, timer} — an internal bootstrap step; on reply CM runs the continuation for kind (login/prekey/digest/app-state/…).
  • {:waiter, from, timer} / {:waiter, from, timer, transform} — a caller blocked in query_iq; on reply CM GenServer.replys (optionally mapping the result through transform).

resolve/2 and timeout/2 return {new_pending, effect} where effect is:

  • {:reply, from, result}GenServer.reply(from, result)
  • {:reply, from, transform.(result)} (already applied)
  • {:tracked, kind, result, timer} — cancel timer, run the continuation
  • {:cancel_and_reply, timer, from, result} — cancel timer, then reply
  • :none — nothing to do (no waiter for this id)

Timer cancellation is surfaced as part of the effect so this module performs no side effects.

Summary

Functions

Resolve an incoming IQ reply node. Returns {pending, effect}. The result is {:ok, node} for type=result, else {:error, node}.

Resolve an IQ timeout for id. Returns {pending, effect}.

Register a tracked IQ (internal bootstrap step) by id.

Register a blocking waiter (query_iq), optionally with a result transform.

Types

effect()

@type effect() ::
  {:reply, GenServer.from(), term(), reference()}
  | {:tracked, kind(), term(), reference()}
  | :none

entry()

@type entry() ::
  {:tracked, kind(), reference()}
  | {:waiter, GenServer.from(), reference()}
  | {:waiter, GenServer.from(), reference(), (term() -> term())}

kind()

@type kind() :: atom()

pending()

@type pending() :: %{optional(String.t()) => entry()}

Functions

resolve(pending, node)

@spec resolve(pending(), Amarula.Protocol.Binary.Node.t()) :: {pending(), effect()}

Resolve an incoming IQ reply node. Returns {pending, effect}. The result is {:ok, node} for type=result, else {:error, node}.

timeout(pending, id)

@spec timeout(pending(), String.t()) :: {pending(), effect()}

Resolve an IQ timeout for id. Returns {pending, effect}.

track(pending, id, kind, timer)

@spec track(pending(), String.t(), kind(), reference()) :: pending()

Register a tracked IQ (internal bootstrap step) by id.

wait(pending, id, from, timer, transform)

@spec wait(
  pending(),
  String.t(),
  GenServer.from(),
  reference(),
  (term() -> term()) | nil
) :: pending()

Register a blocking waiter (query_iq), optionally with a result transform.