MatterEx.ExchangeManager (matter_ex v0.3.0)

Copy Markdown View Source

Exchange correlation, protocol dispatch, ACK management, and MRP integration.

Pure functional module — caller threads state through. Bridges the gap between SecureChannel (encrypt/decrypt) and protocol handlers (IM, PASE).

Device usage

handler = fn(opcode, request) -> IM.Router.handle(device, opcode, request) end
mgr = ExchangeManager.new(handler: handler)

# Incoming frame already decrypted by SecureChannel.open
{actions, mgr} = ExchangeManager.handle_message(mgr, proto_header, message_counter)

# Caller executes actions: {:reply, proto}, {:schedule_mrp, ...}, {:ack, ...}

Summary

Functions

Process an incoming decrypted message.

Handle an MRP retransmission timer firing.

Initiate an outgoing exchange (initiator side).

Create a new ExchangeManager.

Types

action()

@type action() ::
  {:reply, MatterEx.Protocol.MessageCodec.ProtoHeader.t()}
  | {:schedule_mrp, non_neg_integer(), non_neg_integer(), non_neg_integer()}
  | {:ack, non_neg_integer()}

exchange()

@type exchange() :: %{role: :initiator | :responder, protocol: atom()}

t()

@type t() :: %MatterEx.ExchangeManager{
  exchanges: %{required(non_neg_integer()) => exchange()},
  handler: (atom(), struct() -> struct()) | nil,
  mrp: MatterEx.Protocol.MRP.t(),
  next_exchange_id: non_neg_integer(),
  pending_acks: [non_neg_integer()],
  pending_chunks: term(),
  pending_subscribe_responses: term(),
  timed_exchanges: %{required(non_neg_integer()) => integer()}
}

Functions

handle_message(state, proto, message_counter)

@spec handle_message(
  t(),
  MatterEx.Protocol.MessageCodec.ProtoHeader.t(),
  non_neg_integer()
) ::
  {[action()], t()}

Process an incoming decrypted message.

Takes the ProtoHeader from SecureChannel.open and the message_counter from the message header (needed for ACK piggybacking).

Returns {actions, updated_state} where actions is a list of:

  • {:reply, proto} — send this ProtoHeader (caller seals with SecureChannel)
  • {:schedule_mrp, exchange_id, attempt, timeout_ms} — schedule retransmit timer
  • {:ack, message_counter} — send standalone ACK for this counter

handle_timeout(state, message_counter, attempt)

@spec handle_timeout(t(), non_neg_integer(), non_neg_integer()) ::
  {:retransmit, MatterEx.Protocol.MessageCodec.ProtoHeader.t(), t()}
  | {:retransmit_frame, binary(), non_neg_integer(), t()}
  | {:give_up, non_neg_integer(), t()}
  | {:already_acked, t()}

Handle an MRP retransmission timer firing.

Returns:

  • {:retransmit, proto, state} — resend this ProtoHeader
  • {:retransmit_frame, frame, exchange_id, state} — resend the original sealed frame
  • {:give_up, exchange_id, state} — max retransmissions reached
  • {:already_acked, state} — exchange was already acknowledged

initiate(state, protocol_id, opcode_name, payload, opts \\ [])

Initiate an outgoing exchange (initiator side).

Assigns the next exchange_id, builds a ProtoHeader with initiator: true, and records it in MRP if reliable: true (default).

Returns {proto, actions, updated_state}.

new(opts \\ [])

@spec new(keyword()) :: t()

Create a new ExchangeManager.

Options:

  • :handlerfn(opcode_atom, request_struct) -> response_struct. Called for IM request dispatch.