Exchange correlation, protocol dispatch, ACK management, and MRP integration.
Pure functional module — caller threads state through. Bridges the gap between SecureChannel (encrypt/decrypt) and protocol handlers (IM, PASE).
Device usage
handler = fn(opcode, request) -> IM.Router.handle(device, opcode, request) end
mgr = ExchangeManager.new(handler: handler)
# Incoming frame already decrypted by SecureChannel.open
{actions, mgr} = ExchangeManager.handle_message(mgr, proto_header, message_counter)
# Caller executes actions: {:reply, proto}, {:schedule_mrp, ...}, {:ack, ...}
Summary
Functions
Process an incoming decrypted message.
Handle an MRP retransmission timer firing.
Initiate an outgoing exchange (initiator side).
Create a new ExchangeManager.
Types
@type action() :: {:reply, MatterEx.Protocol.MessageCodec.ProtoHeader.t()} | {:schedule_mrp, non_neg_integer(), non_neg_integer(), non_neg_integer()} | {:ack, non_neg_integer()}
@type exchange() :: %{role: :initiator | :responder, protocol: atom()}
@type t() :: %MatterEx.ExchangeManager{ exchanges: %{required(non_neg_integer()) => exchange()}, handler: (atom(), struct() -> struct()) | nil, mrp: MatterEx.Protocol.MRP.t(), next_exchange_id: non_neg_integer(), pending_acks: [non_neg_integer()], pending_chunks: term(), pending_subscribe_responses: term(), timed_exchanges: %{required(non_neg_integer()) => integer()} }
Functions
@spec handle_message( t(), MatterEx.Protocol.MessageCodec.ProtoHeader.t(), non_neg_integer() ) :: {[action()], t()}
Process an incoming decrypted message.
Takes the ProtoHeader from SecureChannel.open and the message_counter
from the message header (needed for ACK piggybacking).
Returns {actions, updated_state} where actions is a list of:
{:reply, proto}— send this ProtoHeader (caller seals with SecureChannel){:schedule_mrp, exchange_id, attempt, timeout_ms}— schedule retransmit timer{:ack, message_counter}— send standalone ACK for this counter
@spec handle_timeout(t(), non_neg_integer(), non_neg_integer()) :: {:retransmit, MatterEx.Protocol.MessageCodec.ProtoHeader.t(), t()} | {:retransmit_frame, binary(), non_neg_integer(), t()} | {:give_up, non_neg_integer(), t()} | {:already_acked, t()}
Handle an MRP retransmission timer firing.
Returns:
{:retransmit, proto, state}— resend this ProtoHeader{:retransmit_frame, frame, exchange_id, state}— resend the original sealed frame{:give_up, exchange_id, state}— max retransmissions reached{:already_acked, state}— exchange was already acknowledged
@spec initiate(t(), non_neg_integer(), atom(), binary(), keyword()) :: {MatterEx.Protocol.MessageCodec.ProtoHeader.t(), [action()], t()}
Initiate an outgoing exchange (initiator side).
Assigns the next exchange_id, builds a ProtoHeader with initiator: true,
and records it in MRP if reliable: true (default).
Returns {proto, actions, updated_state}.
Create a new ExchangeManager.
Options:
:handler—fn(opcode_atom, request_struct) -> response_struct. Called for IM request dispatch.