emqtt (emqtt v1.15.1)
View SourceSummary
Functions
start_link()
deprecated
Types
-type binary_host() :: binary().
-type call() :: #call{id :: callid(), from :: gen_statem:from(), req :: term(), ts :: erlang:timestamp()}.
-type conn_mod() :: emqtt_sock | emqtt_ws | emqtt_quic.
-type custom_auth_callbacks() :: #{init := fun(() -> custom_auth_state()) | {function(), list()}, handle_auth := custom_auth_handle_fn()}.
-type custom_auth_handle_fn() :: fun((custom_auth_state(), _Reason :: atom(), properties()) -> {continue, _OutAuthPacket, custom_auth_state()} | {stop, _Reason :: term()}).
-type custom_auth_state() :: term().
-type expire_at() :: non_neg_integer() | infinity.
-type host() :: inet:ip_address() | inet:hostname() | binary_host().
-type inflight_publish() :: {publish, via(), #mqtt_msg{qos :: emqtt:qos(), retain :: boolean(), dup :: boolean(), packet_id :: undefined | emqtt:packet_id(), topic :: emqtt:topic(), props :: emqtt:properties() | undefined, payload :: binary()}, sent_at(), expire_at(), mfas() | undefined}.
-opaque mqtt_msg()
-type opname() :: connect | subscribe | unsubscribe | ping.
-type option() :: {name, atom()} | {owner, pid()} | {msg_handler, msg_handler()} | {host, host()} | {hosts, [{host(), inet:port_number()}]} | {port, inet:port_number()} | {tcp_opts, [gen_tcp:option()]} | {ssl, boolean()} | {ssl_opts, [ssl:tls_client_option()]} | {quic_opts, {_, _}} | {ws_path, string()} | {connect_timeout, pos_integer()} | {bridge_mode, boolean()} | {clientid, iodata()} | {clean_start, boolean()} | {username, iodata()} | {password, iodata() | emqtt_secret:t(binary())} | {proto_ver, v3 | v4 | v5} | {keepalive, non_neg_integer()} | {max_inflight, pos_integer()} | {retry_interval, timeout()} | {will_topic, iodata()} | {will_payload, iodata()} | {will_retain, boolean()} | {will_qos, qos()} | {will_props, properties()} | {auto_ack, boolean() | never} | {ack_timeout, pos_integer()} | {force_ping, boolean()} | {low_mem, boolean()} | {reconnect, reconnect()} | {reconnect_timeout, pos_integer()} | {retry_calls_on_reconnect, boolean()} | {with_qoe_metrics, boolean()} | {properties, properties()} | {nst, binary()} | {custom_auth_callbacks, custom_auth_callbacks()}.
-type packet_id() :: 0..65535.
-type payload() :: iodata().
-type pendings() :: queue:queue(publish_req()).
-type publish_reply() :: #{packet_id := packet_id(), reason_code := reason_code(), reason_code_name := atom(), properties => undefined | properties()}.
-type publish_req() :: {publish, via(), #mqtt_msg{qos :: emqtt:qos(), retain :: boolean(), dup :: boolean(), packet_id :: undefined | emqtt:packet_id(), topic :: emqtt:topic(), props :: emqtt:properties() | undefined, payload :: binary()}, expire_at(), mfas() | undefined}.
-type publish_success() :: ok | {ok, publish_reply()}.
-type qos() :: 0 | 1 | 2.
-type qos_name() :: qos0 | at_most_once | qos1 | at_least_once | qos2 | exactly_once.
-type reason_code() :: 0..255.
-type reconnect() :: infinity | non_neg_integer().
-type sent_at() :: non_neg_integer().
-type state() :: #state{name :: atom(), owner :: undefined | pid(), msg_handler :: undefined | msg_handler(), host :: host(), port :: inet:port_number(), hosts :: [{host(), inet:port_number()}], conn_mod :: conn_mod(), socket :: undefined | ssl:sslsocket() | inet:socket() | emqtt_ws:connection() | emqtt_quic:quic_sock(), sock_opts :: [emqtt_sock:option() | emqtt_ws:option()], connect_timeout :: pos_integer(), bridge_mode :: boolean(), clientid :: binary(), clean_start :: boolean(), username :: binary() | undefined, password :: undefined | emqtt_secret:t(binary()), proto_ver :: version(), proto_name :: iodata(), keepalive :: non_neg_integer(), keepalive_timer :: undefined | tref(), awaiting_pingresp :: boolean(), force_ping :: boolean(), paused :: boolean(), will_msg :: undefined | mqtt_msg(), properties :: properties(), pending_calls :: [call()], subscriptions :: map(), inflight :: emqtt_inflight:inflight(inflight_publish() | inflight_pubrel()), awaiting_rel :: map(), auto_ack :: boolean() | never, ack_timeout :: pos_integer(), ack_timer :: tref() | undefined, retry_interval :: pos_integer(), retry_timer :: tref() | undefined, session_present :: undefined | boolean(), last_packet_id :: undefined | packet_id(), low_mem :: boolean(), parse_state :: undefined | emqtt_frame:parse_state(), reconnect :: reconnect(), reconnect_timeout :: pos_integer(), qoe :: boolean() | map(), nst :: undefined | binary(), pendings :: pendings(), extra :: map()}.
-type subscribe_ret() :: {ok, properties(), [reason_code()]} | {error, term()}.
-type topic() :: binary().
-type tref() :: reference().
-type version() :: 3 | 4 | 5.
-type via() :: default | {new_data_stream, quicer:stream_opts()} | {new_req_stream, quicer:stream_opts()} | {logic_stream_id, non_neg_integer(), quicer:stream_opts()} | inet:socket() | emqtt_quic:quic_sock() | socket_reconnecting.
Functions
-spec connect(client()) -> {ok, properties() | undefined} | {error, term()}.
-spec disconnect(client(), reason_code()) -> ok | {error, any()}.
-spec disconnect(client(), reason_code(), properties()) -> ok | {error, any()}.
-spec info(client(), n_queued) -> _NumQueuedMessages :: non_neg_integer(); (client(), n_inflight) -> _NumInflightMessages :: non_neg_integer(); (client(), max_inflight) -> _MaxInflightMessages :: pos_integer() | infinity.
-spec ping(client()) -> pong.
-spec publish(client(), #mqtt_msg{qos :: emqtt:qos(), retain :: boolean(), dup :: boolean(), packet_id :: undefined | emqtt:packet_id(), topic :: emqtt:topic(), props :: emqtt:properties() | undefined, payload :: binary()}) -> publish_success() | {error, term()}.
-spec publish(client(), topic(), payload()) -> publish_success() | {error, term()}.
-spec publish(client(), topic(), properties(), payload(), [pubopt()]) -> publish_success() | {error, term()}.
-spec publish_async(client(), topic(), payload(), qos() | qos_name() | [pubopt()], mfas()) -> ok; (client(), via(), topic() | #mqtt_msg{qos :: emqtt:qos(), retain :: boolean(), dup :: boolean(), packet_id :: undefined | emqtt:packet_id(), topic :: emqtt:topic(), props :: emqtt:properties() | undefined, payload :: binary()}, payload() | timeout(), mfas()) -> ok.
-spec publish_via(client(), via(), #mqtt_msg{qos :: emqtt:qos(), retain :: boolean(), dup :: boolean(), packet_id :: undefined | emqtt:packet_id(), topic :: emqtt:topic(), props :: emqtt:properties() | undefined, payload :: binary()}) -> publish_success() | {error, term()}.
-spec publish_via(client(), via(), topic(), properties(), payload(), [pubopt()]) -> publish_success() | {error, term()}.
-spec quic_connect(client()) -> {ok, properties()} | {error, term()}.
-spec start_data_stream(client(), quicer:stream_opts()) -> {ok, via()} | {error, any()}.
This function is deprecated. 1.13.1.
-spec start_link() -> gen_statem:start_ret().
-spec start_link(map() | [option()]) -> gen_statem:start_ret().
-spec status(client()) -> initialized | connected | waiting_for_connack | reconnect.
-spec subscribe(client(), topic(), qos() | qos_name() | [subopt()]) -> subscribe_ret(); (client(), properties(), [{topic(), qos() | [subopt()]}]) -> subscribe_ret().
-spec subscribe(client(), properties(), topic(), qos() | qos_name() | [subopt()]) -> subscribe_ret().
-spec subscribe_via(client(), via(), properties(), [{topic(), subopt()}]) -> subscribe_ret().
-spec unsubscribe(client(), topic() | [topic()]) -> subscribe_ret().
-spec unsubscribe(client(), properties(), topic() | [topic()]) -> subscribe_ret().
-spec unsubscribe_via(client(), via(), topic() | [topic()]) -> subscribe_ret().
-spec unsubscribe_via(client(), via(), properties(), topic() | [topic()]) -> subscribe_ret().
-spec ws_connect(client()) -> {ok, properties() | undefined} | {error, term()}.