emqtt (emqtt v1.15.1)

View Source

Summary

Functions

start_link() deprecated

Types

binary_host/0

-type binary_host() :: binary().

call/0

-type call() ::
          #call{id :: callid(), from :: gen_statem:from(), req :: term(), ts :: erlang:timestamp()}.

callid/0

-type callid() :: #callid{op :: opname(), via :: via(), packet_id :: packet_id() | undefined}.

client/0

-type client() :: pid() | atom().

conn_mod/0

-type conn_mod() :: emqtt_sock | emqtt_ws | emqtt_quic.

custom_auth_callbacks/0

-type custom_auth_callbacks() ::
          #{init := fun(() -> custom_auth_state()) | {function(), list()},
            handle_auth := custom_auth_handle_fn()}.

custom_auth_handle_fn/0

-type custom_auth_handle_fn() ::
          fun((custom_auth_state(), _Reason :: atom(), properties()) ->
                  {continue, _OutAuthPacket, custom_auth_state()} | {stop, _Reason :: term()}).

custom_auth_state/0

-type custom_auth_state() :: term().

expire_at/0

-type expire_at() :: non_neg_integer() | infinity.
in millisecond

host/0

-type host() :: inet:ip_address() | inet:hostname() | binary_host().

inflight_publish/0

-type inflight_publish() ::
          {publish,
           via(),
           #mqtt_msg{qos :: emqtt:qos(),
                     retain :: boolean(),
                     dup :: boolean(),
                     packet_id :: undefined | emqtt:packet_id(),
                     topic :: emqtt:topic(),
                     props :: emqtt:properties() | undefined,
                     payload :: binary()},
           sent_at(),
           expire_at(),
           mfas() | undefined}.

inflight_pubrel/0

-type inflight_pubrel() :: {pubrel, via(), packet_id(), sent_at(), expire_at()}.

mfas/0

-type mfas() :: {module(), atom(), list()} | {function(), list()}.

mqtt_msg/0

-opaque mqtt_msg()

msg_handler/0

-type msg_handler() ::
          #{publish => fun((_Publish :: map()) -> any()) | mfas(),
            pubrel => fun((_PubRel :: map()) -> any()) | mfas(),
            connected => fun((_Properties :: term()) -> any()) | mfas(),
            disconnected => fun(({reason_code(), _Properties :: term()}) -> any()) | mfas()}.

opname/0

-type opname() :: connect | subscribe | unsubscribe | ping.

option/0

-type option() ::
          {name, atom()} |
          {owner, pid()} |
          {msg_handler, msg_handler()} |
          {host, host()} |
          {hosts, [{host(), inet:port_number()}]} |
          {port, inet:port_number()} |
          {tcp_opts, [gen_tcp:option()]} |
          {ssl, boolean()} |
          {ssl_opts, [ssl:tls_client_option()]} |
          {quic_opts, {_, _}} |
          {ws_path, string()} |
          {connect_timeout, pos_integer()} |
          {bridge_mode, boolean()} |
          {clientid, iodata()} |
          {clean_start, boolean()} |
          {username, iodata()} |
          {password, iodata() | emqtt_secret:t(binary())} |
          {proto_ver, v3 | v4 | v5} |
          {keepalive, non_neg_integer()} |
          {max_inflight, pos_integer()} |
          {retry_interval, timeout()} |
          {will_topic, iodata()} |
          {will_payload, iodata()} |
          {will_retain, boolean()} |
          {will_qos, qos()} |
          {will_props, properties()} |
          {auto_ack, boolean() | never} |
          {ack_timeout, pos_integer()} |
          {force_ping, boolean()} |
          {low_mem, boolean()} |
          {reconnect, reconnect()} |
          {reconnect_timeout, pos_integer()} |
          {retry_calls_on_reconnect, boolean()} |
          {with_qoe_metrics, boolean()} |
          {properties, properties()} |
          {nst, binary()} |
          {custom_auth_callbacks, custom_auth_callbacks()}.

packet_id/0

-type packet_id() :: 0..65535.

payload/0

-type payload() :: iodata().

pendings/0

-type pendings() :: queue:queue(publish_req()).

properties/0

-type properties() :: #{atom() => term()}.

publish_reply/0

-type publish_reply() ::
          #{packet_id := packet_id(),
            reason_code := reason_code(),
            reason_code_name := atom(),
            properties => undefined | properties()}.

publish_req/0

-type publish_req() ::
          {publish,
           via(),
           #mqtt_msg{qos :: emqtt:qos(),
                     retain :: boolean(),
                     dup :: boolean(),
                     packet_id :: undefined | emqtt:packet_id(),
                     topic :: emqtt:topic(),
                     props :: emqtt:properties() | undefined,
                     payload :: binary()},
           expire_at(),
           mfas() | undefined}.

publish_success/0

-type publish_success() :: ok | {ok, publish_reply()}.

pubopt/0

-type pubopt() :: {retain, boolean()} | {qos, qos() | qos_name()}.

qos/0

-type qos() :: 0 | 1 | 2.

qos_name/0

-type qos_name() :: qos0 | at_most_once | qos1 | at_least_once | qos2 | exactly_once.

reason_code/0

-type reason_code() :: 0..255.

reconnect/0

-type reconnect() :: infinity | non_neg_integer().

sent_at/0

-type sent_at() :: non_neg_integer().
in millisecond

state/0

-type state() ::
          #state{name :: atom(),
                 owner :: undefined | pid(),
                 msg_handler :: undefined | msg_handler(),
                 host :: host(),
                 port :: inet:port_number(),
                 hosts :: [{host(), inet:port_number()}],
                 conn_mod :: conn_mod(),
                 socket ::
                     undefined |
                     ssl:sslsocket() |
                     inet:socket() |
                     emqtt_ws:connection() |
                     emqtt_quic:quic_sock(),
                 sock_opts :: [emqtt_sock:option() | emqtt_ws:option()],
                 connect_timeout :: pos_integer(),
                 bridge_mode :: boolean(),
                 clientid :: binary(),
                 clean_start :: boolean(),
                 username :: binary() | undefined,
                 password :: undefined | emqtt_secret:t(binary()),
                 proto_ver :: version(),
                 proto_name :: iodata(),
                 keepalive :: non_neg_integer(),
                 keepalive_timer :: undefined | tref(),
                 awaiting_pingresp :: boolean(),
                 force_ping :: boolean(),
                 paused :: boolean(),
                 will_msg :: undefined | mqtt_msg(),
                 properties :: properties(),
                 pending_calls :: [call()],
                 subscriptions :: map(),
                 inflight :: emqtt_inflight:inflight(inflight_publish() | inflight_pubrel()),
                 awaiting_rel :: map(),
                 auto_ack :: boolean() | never,
                 ack_timeout :: pos_integer(),
                 ack_timer :: tref() | undefined,
                 retry_interval :: pos_integer(),
                 retry_timer :: tref() | undefined,
                 session_present :: undefined | boolean(),
                 last_packet_id :: undefined | packet_id(),
                 low_mem :: boolean(),
                 parse_state :: undefined | emqtt_frame:parse_state(),
                 reconnect :: reconnect(),
                 reconnect_timeout :: pos_integer(),
                 qoe :: boolean() | map(),
                 nst :: undefined | binary(),
                 pendings :: pendings(),
                 extra :: map()}.

subopt/0

-type subopt() :: {rh, 0 | 1 | 2} | {rap, boolean()} | {nl, boolean()} | {qos, qos() | qos_name()}.

subscribe_ret/0

-type subscribe_ret() :: {ok, properties(), [reason_code()]} | {error, term()}.

topic/0

-type topic() :: binary().

tref/0

-type tref() :: reference().

version/0

-type version() :: 3 | 4 | 5.

via/0

-type via() ::
          default |
          {new_data_stream, quicer:stream_opts()} |
          {new_req_stream, quicer:stream_opts()} |
          {logic_stream_id, non_neg_integer(), quicer:stream_opts()} |
          inet:socket() |
          emqtt_quic:quic_sock() |
          socket_reconnecting.

Functions

callback_mode()

code_change(OldVsn, OldState, OldData, Extra)

connect(Client)

-spec connect(client()) -> {ok, properties() | undefined} | {error, term()}.

connected(EventType, SubReq, State)

disconnect(Client)

-spec disconnect(client()) -> ok | {error, any()}.

disconnect(Client, ReasonCode)

-spec disconnect(client(), reason_code()) -> ok | {error, any()}.

disconnect(Client, ReasonCode, Properties)

-spec disconnect(client(), reason_code(), properties()) -> ok | {error, any()}.

handle_event(EventType, Event, StateName, State)

info(Client)

info(Client, Attr)

-spec info(client(), n_queued) -> _NumQueuedMessages :: non_neg_integer();
          (client(), n_inflight) -> _NumInflightMessages :: non_neg_integer();
          (client(), max_inflight) -> _MaxInflightMessages :: pos_integer() | infinity.

init(_)

initialized(EventType, Via0, State)

open_quic_connection(Client)

-spec open_quic_connection(client()) -> ok | {error, term()}.

pause(Client)

ping(Client)

-spec ping(client()) -> pong.

puback(Client, PacketId)

puback(Client, PacketId, ReasonCode)

puback(Client, PacketId, ReasonCode, Properties)

pubcomp(Client, PacketId)

pubcomp(Client, PacketId, ReasonCode)

pubcomp(Client, PacketId, ReasonCode, Properties)

publish(Client, Mqtt_msg)

-spec publish(client(),
              #mqtt_msg{qos :: emqtt:qos(),
                        retain :: boolean(),
                        dup :: boolean(),
                        packet_id :: undefined | emqtt:packet_id(),
                        topic :: emqtt:topic(),
                        props :: emqtt:properties() | undefined,
                        payload :: binary()}) ->
                 publish_success() | {error, term()}.

publish(Client, Topic, Payload)

-spec publish(client(), topic(), payload()) -> publish_success() | {error, term()}.

publish(Client, Topic, Payload, QoS)

-spec publish(client(), topic(), payload(), qos() | qos_name() | [pubopt()]) ->
                 publish_success() | {error, term()}.

publish(Client, Topic, Properties, Payload, Opts)

-spec publish(client(), topic(), properties(), payload(), [pubopt()]) ->
                 publish_success() | {error, term()}.

publish_async(Client, Topic, Payload, Callback)

publish_async(Client, Topic, Payload, QoS, Callback)

-spec publish_async(client(), topic(), payload(), qos() | qos_name() | [pubopt()], mfas()) -> ok;
                   (client(),
                    via(),
                    topic() |
                    #mqtt_msg{qos :: emqtt:qos(),
                              retain :: boolean(),
                              dup :: boolean(),
                              packet_id :: undefined | emqtt:packet_id(),
                              topic :: emqtt:topic(),
                              props :: emqtt:properties() | undefined,
                              payload :: binary()},
                    payload() | timeout(),
                    mfas()) ->
                       ok.

publish_async(Client, Via, Topic, Payload, QoS, Callback)

-spec publish_async(client(), via(), topic(), payload(), qos() | qos_name() | [pubopt()], mfas()) -> ok.

publish_async(Client, Topic, Properties, Payload, Opts, Timeout, Callback)

-spec publish_async(client(), topic(), properties(), payload(), [pubopt()], timeout(), mfas()) -> ok.

publish_async(Client, Via, Topic, Properties, Payload, Opts, Timeout, Callback)

-spec publish_async(client(), via(), topic(), properties(), payload(), [pubopt()], timeout(), mfas()) ->
                       ok.

publish_via(Client, Via, Mqtt_msg)

-spec publish_via(client(),
                  via(),
                  #mqtt_msg{qos :: emqtt:qos(),
                            retain :: boolean(),
                            dup :: boolean(),
                            packet_id :: undefined | emqtt:packet_id(),
                            topic :: emqtt:topic(),
                            props :: emqtt:properties() | undefined,
                            payload :: binary()}) ->
                     publish_success() | {error, term()}.

publish_via(Client, Via, Topic, Properties, Payload, Opts)

-spec publish_via(client(), via(), topic(), properties(), payload(), [pubopt()]) ->
                     publish_success() | {error, term()}.

pubrec(Client, PacketId)

pubrec(Client, PacketId, ReasonCode)

pubrec(Client, PacketId, ReasonCode, Properties)

pubrel(Client, PacketId)

pubrel(Client, PacketId, ReasonCode)

pubrel(Client, PacketId, ReasonCode, Properties)

quic_connect(Client)

-spec quic_connect(client()) -> {ok, properties()} | {error, term()}.

quic_mqtt_connect(Client)

-spec quic_mqtt_connect(client()) -> ok | {error, term()}.

random_client_id()

reason_code_name(Code)

reconnect(EventType, _, State)

resume(Client)

start_data_stream(Client, StreamOpts)

-spec start_data_stream(client(), quicer:stream_opts()) -> {ok, via()} | {error, any()}.

start_link()

This function is deprecated. 1.13.1.
-spec start_link() -> gen_statem:start_ret().

start_link(Options)

-spec start_link(map() | [option()]) -> gen_statem:start_ret().

status(Client)

-spec status(client()) -> initialized | connected | waiting_for_connack | reconnect.

stop(Client)

subscribe(Client, Topic)

-spec subscribe(client(), topic() | {topic(), qos() | qos_name() | [subopt()]} | [{topic(), qos()}]) ->
                   subscribe_ret().

subscribe(Client, Topic, QoS)

-spec subscribe(client(), topic(), qos() | qos_name() | [subopt()]) -> subscribe_ret();
               (client(), properties(), [{topic(), qos() | [subopt()]}]) -> subscribe_ret().

subscribe(Client, Properties, Topic, QoS)

-spec subscribe(client(), properties(), topic(), qos() | qos_name() | [subopt()]) -> subscribe_ret().

subscribe_via(Client, Via, Properties, Topics)

-spec subscribe_via(client(), via(), properties(), [{topic(), subopt()}]) -> subscribe_ret().

subscriptions(Client)

sync_publish_result(Caller, Mref, Result)

terminate(Reason, StateName, State)

unsubscribe(Client, Topic)

-spec unsubscribe(client(), topic() | [topic()]) -> subscribe_ret().

unsubscribe(Client, Properties, Topic)

-spec unsubscribe(client(), properties(), topic() | [topic()]) -> subscribe_ret().

unsubscribe_via(Client, Via, Topic)

-spec unsubscribe_via(client(), via(), topic() | [topic()]) -> subscribe_ret().

unsubscribe_via(Client, Via, Properties, Topic)

-spec unsubscribe_via(client(), via(), properties(), topic() | [topic()]) -> subscribe_ret().

waiting_for_connack(EventType, Mqtt_packet, State)

ws_connect(Client)

-spec ws_connect(client()) -> {ok, properties() | undefined} | {error, term()}.