OffBroadway.EMQTT.Producer (off_broadway_emqtt v0.3.0)

Copy Markdown View Source

A Broadway producer for MQTT using emqtt.

Each producer instance maintains its own MQTT connection with protocol-level backpressure via max_inflight and delayed acknowledgements.

Producer options

  • :shared_group (String.t/0) - Shared subscription group name. When set, topics are subscribed as $share/{group}/{topic} which distributes messages across all producer instances. Required when using concurrency > 1.

  • :max_inflight (pos_integer/0) - Maximum unACKed QoS 1/2 messages per producer. This is the primary backpressure mechanism - the broker will not send more messages until existing ones are acknowledged. The default value is 100.

  • :on_success - Action when Broadway message succeeds. :ack sends PUBACK to broker. The default value is :ack.

  • :on_failure - Action when Broadway message fails.

    • :noop - don't ACK, broker will redeliver after timeout (for QoS 1/2)
    • :ack - ACK anyway, message won't be redelivered The default value is :noop.
  • :topics - Required. List of {topic, qos} tuples to subscribe to. Use QoS 1 or 2 for reliable delivery.

  • :message_handler - A module that implements the OffBroadway.EMQTT.MessageHandler behaviour The default value is OffBroadway.EMQTT.MessageHandler.

  • :config (non-empty keyword/0) - Required. Configuration options that will be sent to the :emqtt process.

    • :host - Required. The host of the MQTT broker

    • :port (pos_integer/0) - The port to connect to The default value is 1883.

    • :username (String.t/0) - Username to authenticate with

    • :password (String.t/0) - Password to authenticate with

    • :ssl (boolean/0) - Whether to use SSL

    • :ssl_opts (keyword/0)

      • :cacertfile (String.t/0) - Path to CA certificate file

      • :server_name_indication - Server name indication

      • :verify - Verify mode The default value is :verify_peer.

      • :certfile (String.t/0) - Path to client certificate file

      • :keyfile (String.t/0) - Path to client key file

    • :ws_path (String.t/0) - The path to the resource.

    • :connect_timeout (pos_integer/0) - The timeout in seconds for the connection The default value is 60.

    • :bridge_mode (boolean/0) - Enable bridge mode or not. The default value is false.

    • :clientid (String.t/0) - Specify the client identifier. Each producer will append '_N' where N is the producer index. The default value is "emqtt-aa-0-c-383-4ba7789f2a418ef5100b".

    • :clean_start (boolean/0) - Whether the broker should discard any existing session on connect. Defaults to false so the broker redelivers unACKed QoS 1/2 messages after a producer restart. Set to true only if you explicitly want a fresh session each time and are willing to lose in-flight messages on restart. The default value is false.

    • :proto_ver - The MQTT protocol version to use. The default value is :v4.

    • :keepalive (pos_integer/0) - The maximum time interval in seconds that is permitted to elapse between the client finishes transmitting one MQTT Control Packet and starts sending the next.

    • :retry_interval (pos_integer/0) - Interval in seconds to retry sending packets that have not received a response. The default value is 30.

    • :will_topic (String.t/0) - Topic of will message, sent by broker on unexpected disconnect.

    • :will_payload (String.t/0) - The payload of the will message.

    • :will_retain (boolean/0) - Whether the will message should be published as retained. The default value is false.

    • :will_qos - The QoS level of the will message. The default value is 0.

    • :ack_timeout (pos_integer/0) - The timeout in seconds for the ack package. The default value is 30.

    • :force_ping (boolean/0) - If true, ping is sent regardless of other packet activity. The default value is false.

    • :custom_auth_callbacks - A map of custom authentication callback MFAs for MQTT v5.

    • :reconnect - Number of reconnect attempts after a disconnection (0 = no reconnect, :infinity = unlimited). NOTE: emqtt reconnects the TCP connection but does NOT re-subscribe. You must set clean_start: false so the broker restores the session and redelivers subscriptions. Without clean_start: false, messages will silently stop arriving after a reconnect.

    • :reconnect_timeout (pos_integer/0) - Time in seconds to wait between reconnect attempts. Requires reconnect to be set.

    • :low_mem (boolean/0) - Enable low memory mode. Reduces memory usage at the cost of some performance.

    • :properties (map/0) - MQTT properties to include in the CONNECT packet (MQTT v5 only). Keys must be atoms matching MQTT property names, e.g. %{:"Receive-Maximum" => 2}.

Backpressure

This producer uses MQTT's max_inflight setting for backpressure. The broker will not send more than max_inflight unacknowledged QoS 1/2 messages. Messages are only acknowledged after Broadway successfully processes them.

For QoS 0 messages, there is no protocol-level backpressure. Consider using QoS 1 for high-throughput scenarios.

Shared Subscriptions

When using concurrency > 1, you must configure a shared_group to distribute messages across producer instances. Without shared subscriptions, each producer would receive all messages (duplicates).

Reconnection

By default, if the MQTT connection is lost the producer stops and Broadway's supervisor restarts it, which creates a fresh connection and re-subscribes to all topics.

You can instead configure emqtt's built-in reconnect via config: [reconnect: :infinity, ...]. If you do this, you MUST also set clean_start: false. Otherwise the broker discards the session on reconnect and no messages will arrive after the reconnect completes.

Telemetry

This library exposes the following telemetry events:

  • [:off_broadway_emqtt, :producer, :init] - Dispatched when a producer instance has finished init/1 and scheduled its first connection attempt.

    • measurement: %{time: System.system_time}
    • metadata: %{broadway_name: term, producer_index: integer}
  • [:off_broadway_emqtt, :producer, :terminate] - Dispatched when a producer instance is terminating (Broadway shutdown, crash, or supervisor restart).

    • measurement: %{time: System.system_time}
    • metadata: %{broadway_name: term, producer_index: integer, client_id: string | nil, reason: term}

  • [:off_broadway_emqtt, :connection, :up] - Dispatched when connected to broker.

    • measurement: %{time: System.system_time}
    • metadata: %{client_id: string, producer_index: integer}
  • [:off_broadway_emqtt, :connection, :down] - Dispatched when connection lost, including when the initial connect fails and when emqtt signals a disconnect during reconnect. See "Common connection.down reasons" below.

    • measurement: %{time: System.system_time}
    • metadata: %{client_id: string, producer_index: integer, reason: term}
  • [:off_broadway_emqtt, :subscription, :success] - Dispatched for each topic the broker granted a subscription on. granted_qos is the actual QoS the broker assigned, which may be lower than requested.

    • measurement: %{time: System.system_time}
    • metadata: %{client_id: string, producer_index: integer, topic: string, granted_qos: 0..2}
  • [:off_broadway_emqtt, :subscription, :error] - Dispatched when subscribing to a topic fails (either a transport error or a SUBACK reason code >= 128).

    • measurement: %{time: System.system_time}
    • metadata: %{client_id: string, producer_index: integer, topic: string, reason: term}
  • [:off_broadway_emqtt, :receive_message, :start] - Dispatched when a PUBLISH arrives from the broker, before Broadway dispatch. Pairs with receive_message.ack for end-to-end latency measurement.

    • measurement: %{time: System.system_time, count: 1}
    • metadata: %{client_id: string, producer_index: integer, topic: string, qos: integer}
  • [:off_broadway_emqtt, :receive_message, :ack] - Dispatched when acknowledging a message to the MQTT broker.

    • measurement: %{time: System.system_time, count: 1}
    • metadata: %{topic: string, qos: integer, status: :on_success | :on_failure}

Common connection.down reasons

The reason field carries the raw error returned by emqtt or the MQTT broker. Common values a consumer may want to pattern-match on:

  • Authentication failures
    • :bad_username_or_password (MQTT 3.1.1 CONNACK code 4)
    • :not_authorized (MQTT 3.1.1 code 5, MQTT 5 code 135)
    • {:unacceptable_protocol_version, _}
  • TLS/certificate failures
    • {:tls_alert, {:unknown_ca, _}}
    • {:tls_alert, {:bad_certificate, _}}
    • {:tls_alert, {:handshake_failure, _}} - often SNI or hostname mismatch
    • {:tls_alert, {:certificate_expired, _}}
  • Network/transport failures
    • :econnrefused - broker not listening on host:port
    • :nxdomain - DNS resolution failed
    • :timeout - CONNACK did not arrive within connect_timeout
    • :closed / :tcp_closed - broker closed the socket
  • Server/session failures (MQTT 5 reason codes surface as integers or atoms)
    • :server_unavailable, :server_busy, :quota_exceeded

When emqtt's built-in reconnect is enabled, connection.down fires with the CONNACK reason code integer, not an atom.