MqttX.Server.Router (MqttX v0.10.0)

Copy Markdown View Source

Topic router for MQTT servers.

The router maintains a trie-based subscription table and provides efficient topic matching for message routing. Matching is O(L + K) where L is the topic depth and K is the number of matching subscriptions, compared to the previous O(N) linear scan across all subscriptions.

Usage

router = MqttX.Server.Router.new()
router = MqttX.Server.Router.subscribe(router, "sensors/+/temp", client_ref, qos: 1)
router = MqttX.Server.Router.subscribe(router, "alerts/#", client_ref, qos: 0)

matches = MqttX.Server.Router.match(router, "sensors/room1/temp")
# => [{client_ref, %{qos: 1}}]

Shared Subscriptions (MQTT 5.0)

Shared subscriptions allow load balancing messages across multiple clients. Use the $share/group_name/topic_filter format:

router = MqttX.Server.Router.subscribe(router, "$share/workers/jobs/#", client1, qos: 1)
router = MqttX.Server.Router.subscribe(router, "$share/workers/jobs/#", client2, qos: 1)

# Messages to "jobs/task1" are delivered to client1 or client2 (round-robin)
matches = MqttX.Server.Router.match(router, "jobs/task1")
# => [{client1, %{qos: 1}}] or [{client2, %{qos: 1}}]

Summary

Functions

Get the number of unique clients with subscriptions.

Get the total number of subscriptions.

Find all matching subscriptions for a topic.

Find all matching subscriptions and advance round-robin for shared groups.

Create a new empty router.

Add a subscription to the router.

Get all subscriptions for a client.

Remove a subscription from the router.

Remove all subscriptions for a client.

Types

shared_group()

@type shared_group() :: %{
  filter: MqttX.Topic.normalized_topic(),
  members: [{term(), map()}],
  index: non_neg_integer()
}

subscription()

@type subscription() :: %{
  filter: MqttX.Topic.normalized_topic(),
  client: term(),
  qos: 0 | 1 | 2,
  opts: map()
}

t()

@type t() :: %MqttX.Server.Router{
  by_client: %{
    required(term()) => [
      {:normal, MqttX.Topic.normalized_topic()}
      | {:shared, binary(), MqttX.Topic.normalized_topic()}
    ]
  },
  count: non_neg_integer(),
  shared_groups: %{required(binary()) => shared_group()},
  trie: map()
}

Functions

client_count(router)

@spec client_count(t()) :: non_neg_integer()

Get the number of unique clients with subscriptions.

count(router)

@spec count(t()) :: non_neg_integer()

Get the total number of subscriptions.

match(router, topic, publisher \\ nil)

@spec match(t(), binary() | MqttX.Topic.normalized_topic(), term()) :: [
  {term(), map()}
]

Find all matching subscriptions for a topic.

Returns a list of {client, opts} tuples for each matching subscription.

For shared subscriptions, only one client per group is selected (round-robin).

The optional publisher parameter enables no_local filtering: subscriptions with no_local: true are excluded when the publisher matches the subscriber.

match_and_advance(router, topic, publisher \\ nil)

@spec match_and_advance(t(), binary() | MqttX.Topic.normalized_topic(), term()) ::
  {[{term(), map()}], t()}

Find all matching subscriptions and advance round-robin for shared groups.

This is the same as match/3 but also updates the router state to advance the round-robin index for matched shared subscriptions.

Returns {matches, updated_router}.

new()

@spec new() :: t()

Create a new empty router.

subscribe(router, filter, client, opts \\ [])

@spec subscribe(t(), binary() | MqttX.Topic.normalized_topic(), term(), keyword()) ::
  t()

Add a subscription to the router.

Options

  • :qos - Maximum QoS level (default: 0)
  • Any additional options are stored with the subscription

Shared Subscriptions

Use $share/group_name/topic_filter format for shared subscriptions:

router = MqttX.Server.Router.subscribe(router, "$share/workers/jobs/#", client, qos: 1)

subscriptions_for(router, client)

@spec subscriptions_for(t(), term()) :: [subscription()]

Get all subscriptions for a client.

unsubscribe(router, filter, client)

@spec unsubscribe(t(), binary() | MqttX.Topic.normalized_topic(), term()) :: t()

Remove a subscription from the router.

unsubscribe_all(router, client)

@spec unsubscribe_all(t(), term()) :: t()

Remove all subscriptions for a client.