Central message orchestration — the single entry point for processing raw binary frames from any transport (BLE, UDP).
Pure functional module — caller threads state through. Two paths:
- Plaintext (session_id=0): PASE commissioning handshake
- Encrypted (session_id>0): IM messages over established sessions
Example
handler = MessageHandler.new(
device: MyDevice,
passcode: 20202021,
salt: salt,
iterations: 1000,
local_session_id: 1
)
# Process incoming frame from transport
{actions, handler} = MessageHandler.handle_frame(handler, raw_frame)
# Caller executes actions
for action <- actions do
case action do
{:send, frame} -> transport_send(frame)
{:schedule_mrp, sid, eid, attempt, ms} -> schedule_timer(sid, eid, attempt, ms)
{:session_established, sid} -> log_session(sid)
end
end
Summary
Functions
Check all sessions for subscriptions that are due for periodic reports.
Close a session, removing it and all its subscriptions.
Process an incoming raw binary frame.
Handle an MRP retransmission timer for a session's exchange.
Create a new MessageHandler.
Update CASE state with new credentials (e.g. after commissioning).
Update group keys from GroupKeyManagement cluster.
Types
@type action() :: {:send, binary()} | {:send, non_neg_integer(), binary()} | {:schedule_mrp, non_neg_integer(), non_neg_integer(), non_neg_integer(), non_neg_integer()} | {:session_established, non_neg_integer()} | {:session_closed, non_neg_integer()}
@type group_key_entry() :: %{ group_id: non_neg_integer(), session_id: non_neg_integer(), encrypt_key: binary() }
@type session_entry() :: %{ session: MatterEx.Session.t(), exchange_mgr: MatterEx.ExchangeManager.t(), subscription_mgr: MatterEx.IM.SubscriptionManager.t(), exchange_to_sub: %{required(non_neg_integer()) => non_neg_integer()} }
@type t() :: %MatterEx.MessageHandler{ case_states: %{required(non_neg_integer()) => MatterEx.CASE.t()}, device: module() | nil, group_keys: %{required(non_neg_integer()) => group_key_entry()}, pase: MatterEx.PASE.t() | nil, plaintext_counter: MatterEx.Protocol.Counter.t(), sessions: %{required(non_neg_integer()) => session_entry()} }
Functions
Check all sessions for subscriptions that are due for periodic reports.
For each due subscription, reads current attribute values, compares with last reported values, and sends a ReportData if changed.
Returns {actions, updated_state}.
@spec close_session(t(), non_neg_integer()) :: {[action()], t()}
Close a session, removing it and all its subscriptions.
Returns {actions, updated_state} where actions may include
{:session_closed, session_id}.
Process an incoming raw binary frame.
Peeks at the session_id in the message header to choose the plaintext (PASE) or encrypted (IM) path.
Returns {actions, updated_state}.
@spec handle_mrp_timeout(t(), non_neg_integer(), non_neg_integer(), non_neg_integer()) :: {action() | [action()] | nil, t()}
Handle an MRP retransmission timer for a session's exchange.
Returns {action_or_nil, updated_state}.
Create a new MessageHandler.
Required options:
:passcode— commissioning passcode (integer):salt— PBKDF2 salt (binary):iterations— PBKDF2 iterations (integer):local_session_id— session ID for PASE (integer)
Optional:
:device— device module for IM routing:noc— Node Operational Certificate (binary, for CASE):private_key— ECDSA private key (binary, for CASE):ipk— Identity Protection Key (binary, for CASE):node_id— node ID (integer, for CASE):fabric_id— fabric ID (integer, for CASE)
Update CASE state with new credentials (e.g. after commissioning).
Accepts the same keyword options as new/1 for CASE:
:noc, :private_key, :ipk, :node_id, :fabric_id.
@spec update_group_keys(t(), [group_key_entry()]) :: t()
Update group keys from GroupKeyManagement cluster.
Accepts a list of %{group_id, session_id, encrypt_key} entries.
Indexes by session_id for fast lookup on incoming group messages.