KubeMQ.Client (kubemq v1.0.1)

Copy Markdown View Source

Main KubeMQ client GenServer — primary API surface for all messaging patterns.

Owns a KubeMQ.Connection process and a DynamicSupervisor for subscriptions. All operations are delegated to the appropriate pattern modules and the transport layer.

Quick Start

{:ok, client} = KubeMQ.Client.start_link(address: "localhost:50000", client_id: "my-app")
:ok = KubeMQ.Client.send_event(client, %KubeMQ.Event{channel: "test", body: "hello"})
KubeMQ.Client.close(client)

Supervision

children = [
  {KubeMQ.Client, address: "localhost:50000", client_id: "my-app", name: MyApp.KubeMQ}
]
Supervisor.start_link(children, strategy: :one_for_one)

Configuration

See KubeMQ.Config for the full NimbleOptions schema. All timeouts are in milliseconds.

Architecture Note

The Client GenServer serializes all operations through a single mailbox. This is the standard OTP pattern for state management and connection lifecycle. For high-throughput scenarios, use the streaming APIs (EventStreamHandle, EventStoreStreamHandle, QueueUpstreamHandle) which bypass the Client GenServer and communicate directly with the gRPC transport.

Reconnection Behavior

The client automatically reconnects when the gRPC connection is lost.

State Machine

:connecting  :ready  (disconnect)  :reconnecting  :ready
                                                        
                                                      :closed (on close/1 or max attempts)

Operation Behavior During Reconnection

Operation TypeDuring Reconnection
Unary sendsBuffered (up to reconnect_buffer_size)
Stream sends:stream_broken error
SubscriptionsAuto-re-established on reconnect
PingReturns :transient error
Channel managementBuffered or :transient error

Configuration

Reconnection is controlled by the :reconnect_policy option:

reconnect_policy: [
  enabled: true,           # Enable/disable auto-reconnect
  initial_delay: 1_000,    # First retry delay (ms)
  max_delay: 30_000,       # Max retry delay (ms)
  max_attempts: 0,         # 0 = unlimited
  multiplier: 2.0          # Exponential backoff multiplier
]

Lifecycle Callbacks

Monitor connection state changes:

KubeMQ.Client.start_link(
  address: "localhost:50000",
  client_id: "my-app",
  on_connected: fn -> Logger.info("Connected") end,
  on_disconnected: fn -> Logger.warning("Disconnected") end,
  on_reconnecting: fn -> Logger.info("Reconnecting...") end,
  on_reconnected: fn -> Logger.info("Reconnected") end,
  on_closed: fn -> Logger.info("Closed") end
)

Summary

Functions

Acknowledge all messages on a queue channel.

Returns a child specification for use in supervision trees.

Close the client, stopping all subscriptions and the underlying connection.

Returns true if the underlying connection is in the :ready state.

Returns the current connection state atom.

Create a channel of the specified type.

Create a commands channel. See create_channel/3 for errors.

Create an events channel. See create_channel/3 for errors.

Create an events store channel. See create_channel/3 for errors.

Create a queries channel. See create_channel/3 for errors.

Create a queues channel. See create_channel/3 for errors.

Delete a channel of the specified type.

Delete a commands channel. See delete_channel/3 for errors.

Delete an events channel. See delete_channel/3 for errors.

Delete an events store channel. See delete_channel/3 for errors.

Delete a queries channel. See delete_channel/3 for errors.

Delete a queues channel. See delete_channel/3 for errors.

List channels of the specified type, optionally filtered by search pattern.

List commands channels. See list_channels/3 for errors.

List events channels. See list_channels/3 for errors.

List events store channels. See list_channels/3 for errors.

List queries channels. See list_channels/3 for errors.

List queues channels. See list_channels/3 for errors.

Ping the KubeMQ server and return server information.

Ping the KubeMQ server, raising KubeMQ.Error on failure.

Poll a queue for messages via the downstream stream API.

Purge all messages from a queue channel (ack all pending messages).

Open a queue upstream (send) stream.

Receive queue messages (pull mode).

Send a command and wait for a response.

Send a command, raising KubeMQ.Error on failure.

Send a command response (for manual response handling).

Send a single event (fire-and-forget).

Send a single event, raising KubeMQ.Error on failure.

Send a persistent event to Events Store.

Send a persistent event, raising KubeMQ.Error on failure.

Open a bidirectional event store stream with awaitable confirmation.

Open a bidirectional event stream. Returns a handle for sending events.

Open a bidirectional event stream, raising KubeMQ.Error on failure.

Send a query and wait for a response.

Send a query, raising KubeMQ.Error on failure.

Send a query response (for manual response handling).

Send a single queue message.

Send a single queue message, raising KubeMQ.Error on failure.

Send a batch of queue messages.

Send a batch of queue messages, raising KubeMQ.Error on failure.

Start a KubeMQ client process linked to the caller.

Subscribe to incoming commands on a channel.

Subscribe to commands, raising KubeMQ.Error on failure.

Subscribe to events on a channel.

Subscribe to events, raising KubeMQ.Error on failure.

Subscribe to Events Store on a channel.

Subscribe to incoming queries on a channel.

Subscribe to queries, raising KubeMQ.Error on failure.

Types

channel_type()

@type channel_type() :: :events | :events_store | :commands | :queries | :queues

t()

@type t() :: GenServer.server()

Functions

ack_all_queue_messages(client, channel, opts \\ [])

@spec ack_all_queue_messages(t(), String.t(), keyword()) ::
  {:ok, KubeMQ.QueueAckAllResult.t()} | {:error, KubeMQ.Error.t()}

Acknowledge all messages on a queue channel.

Options

  • :wait_timeout - Wait timeout in ms (default 5_000)

Errors

  • :validation — channel is empty or contains whitespace
  • :transient — temporary network/server issue (retryable)
  • :client_closed — client has been closed

child_spec(init_arg)

@spec child_spec(keyword()) :: Supervisor.child_spec()

Returns a child specification for use in supervision trees.

close(client)

@spec close(t()) :: :ok

Close the client, stopping all subscriptions and the underlying connection.

Always succeeds (calls GenServer.stop/2).

connected?(client)

@spec connected?(t()) :: boolean()

Returns true if the underlying connection is in the :ready state.

Always succeeds — returns a boolean.

connection_state(client)

@spec connection_state(t()) :: :connecting | :ready | :reconnecting | :closed

Returns the current connection state atom.

Always succeeds — returns :connecting, :ready, :reconnecting, or :closed.

create_channel(client, name, type)

@spec create_channel(t(), String.t(), channel_type()) ::
  :ok | {:error, KubeMQ.Error.t()}

Create a channel of the specified type.

Errors

  • :transient — temporary network/server issue (retryable)
  • :client_closed — client has been closed

create_commands_channel(client, name)

@spec create_commands_channel(t(), String.t()) :: :ok | {:error, KubeMQ.Error.t()}

Create a commands channel. See create_channel/3 for errors.

create_events_channel(client, name)

@spec create_events_channel(t(), String.t()) :: :ok | {:error, KubeMQ.Error.t()}

Create an events channel. See create_channel/3 for errors.

create_events_store_channel(client, name)

@spec create_events_store_channel(t(), String.t()) :: :ok | {:error, KubeMQ.Error.t()}

Create an events store channel. See create_channel/3 for errors.

create_queries_channel(client, name)

@spec create_queries_channel(t(), String.t()) :: :ok | {:error, KubeMQ.Error.t()}

Create a queries channel. See create_channel/3 for errors.

create_queues_channel(client, name)

@spec create_queues_channel(t(), String.t()) :: :ok | {:error, KubeMQ.Error.t()}

Create a queues channel. See create_channel/3 for errors.

delete_channel(client, name, type)

@spec delete_channel(t(), String.t(), channel_type()) ::
  :ok | {:error, KubeMQ.Error.t()}

Delete a channel of the specified type.

Errors

  • :transient — temporary network/server issue (retryable)
  • :not_found — channel does not exist
  • :client_closed — client has been closed

delete_commands_channel(client, name)

@spec delete_commands_channel(t(), String.t()) :: :ok | {:error, KubeMQ.Error.t()}

Delete a commands channel. See delete_channel/3 for errors.

delete_events_channel(client, name)

@spec delete_events_channel(t(), String.t()) :: :ok | {:error, KubeMQ.Error.t()}

Delete an events channel. See delete_channel/3 for errors.

delete_events_store_channel(client, name)

@spec delete_events_store_channel(t(), String.t()) :: :ok | {:error, KubeMQ.Error.t()}

Delete an events store channel. See delete_channel/3 for errors.

delete_queries_channel(client, name)

@spec delete_queries_channel(t(), String.t()) :: :ok | {:error, KubeMQ.Error.t()}

Delete a queries channel. See delete_channel/3 for errors.

delete_queues_channel(client, name)

@spec delete_queues_channel(t(), String.t()) :: :ok | {:error, KubeMQ.Error.t()}

Delete a queues channel. See delete_channel/3 for errors.

list_channels(client, type, search \\ "")

@spec list_channels(t(), channel_type(), String.t()) ::
  {:ok, [KubeMQ.ChannelInfo.t()]} | {:error, KubeMQ.Error.t()}

List channels of the specified type, optionally filtered by search pattern.

Errors

  • :transient — temporary network/server issue (retryable)
  • :client_closed — client has been closed

list_commands_channels(client, search \\ "")

@spec list_commands_channels(t(), String.t()) ::
  {:ok, [KubeMQ.ChannelInfo.t()]} | {:error, KubeMQ.Error.t()}

List commands channels. See list_channels/3 for errors.

list_events_channels(client, search \\ "")

@spec list_events_channels(t(), String.t()) ::
  {:ok, [KubeMQ.ChannelInfo.t()]} | {:error, KubeMQ.Error.t()}

List events channels. See list_channels/3 for errors.

list_events_store_channels(client, search \\ "")

@spec list_events_store_channels(t(), String.t()) ::
  {:ok, [KubeMQ.ChannelInfo.t()]} | {:error, KubeMQ.Error.t()}

List events store channels. See list_channels/3 for errors.

list_queries_channels(client, search \\ "")

@spec list_queries_channels(t(), String.t()) ::
  {:ok, [KubeMQ.ChannelInfo.t()]} | {:error, KubeMQ.Error.t()}

List queries channels. See list_channels/3 for errors.

list_queues_channels(client, search \\ "")

@spec list_queues_channels(t(), String.t()) ::
  {:ok, [KubeMQ.ChannelInfo.t()]} | {:error, KubeMQ.Error.t()}

List queues channels. See list_channels/3 for errors.

ping(client)

@spec ping(t()) :: {:ok, KubeMQ.ServerInfo.t()} | {:error, KubeMQ.Error.t()}

Ping the KubeMQ server and return server information.

Errors

  • :transient — temporary network/server issue (retryable)
  • :timeout — operation timed out (retryable)
  • :client_closed — client has been closed

ping!(client)

@spec ping!(t()) :: KubeMQ.ServerInfo.t()

Ping the KubeMQ server, raising KubeMQ.Error on failure.

Same error conditions as ping/1 — see its ## Errors section.

poll_queue(client, opts)

@spec poll_queue(
  t(),
  keyword()
) :: {:ok, KubeMQ.PollResponse.t()} | {:error, KubeMQ.Error.t()}

Poll a queue for messages via the downstream stream API.

Options

  • :channel - Required. Queue channel name
  • :max_items - Max items (1–1024, default 1)
  • :wait_timeout - Wait timeout in ms (default 5_000)
  • :auto_ack - Auto-acknowledge (default false)

Errors

  • :validation — channel is empty, invalid max_items, or invalid wait_timeout
  • :transient — temporary network/server issue (retryable)
  • :timeout — operation timed out (retryable)
  • :client_closed — client has been closed

purge_queue_channel(client, name)

@spec purge_queue_channel(t(), String.t()) :: :ok | {:error, KubeMQ.Error.t()}

Purge all messages from a queue channel (ack all pending messages).

Errors

  • :transient — temporary network/server issue (retryable)
  • :not_found — channel does not exist
  • :client_closed — client has been closed

queue_upstream(client)

@spec queue_upstream(t()) ::
  {:ok, KubeMQ.QueueUpstreamHandle.t()} | {:error, KubeMQ.Error.t()}

Open a queue upstream (send) stream.

Errors

  • :transient — temporary network/server issue (retryable)
  • :client_closed — client has been closed

See the "Reconnection Behavior" section for how this operation behaves during connection loss and recovery.

receive_queue_messages(client, channel, opts \\ [])

@spec receive_queue_messages(t(), String.t(), keyword()) ::
  {:ok, KubeMQ.QueueReceiveResult.t()} | {:error, KubeMQ.Error.t()}

Receive queue messages (pull mode).

Options

  • :max_messages - Max messages to receive (1–1024, default 1)
  • :wait_timeout - Wait timeout in ms (default 5_000)
  • :is_peek - Peek without consuming (default false)

Errors

  • :validation — channel is empty, invalid max_messages, or invalid wait_timeout
  • :transient — temporary network/server issue (retryable)
  • :timeout — operation timed out (retryable)
  • :client_closed — client has been closed

send_command(client, command)

@spec send_command(t(), KubeMQ.Command.t()) ::
  {:ok, KubeMQ.CommandResponse.t()} | {:error, KubeMQ.Error.t()}

Send a command and wait for a response.

Errors

  • :validation — channel is empty, missing body/metadata, or invalid timeout
  • :transient — temporary network/server issue (retryable)
  • :timeout — operation timed out (retryable)
  • :client_closed — client has been closed
  • :buffer_full — reconnect buffer overflow

See the "Reconnection Behavior" section for how this operation behaves during connection loss and recovery.

send_command!(client, command)

@spec send_command!(t(), KubeMQ.Command.t()) :: KubeMQ.CommandResponse.t()

Send a command, raising KubeMQ.Error on failure.

Same error conditions as send_command/2 — see its ## Errors section.

send_command_response(client, reply)

@spec send_command_response(t(), KubeMQ.CommandReply.t()) ::
  :ok | {:error, KubeMQ.Error.t()}

Send a command response (for manual response handling).

Errors

  • :validation — missing required response fields
  • :transient — temporary network/server issue (retryable)
  • :client_closed — client has been closed

send_event(client, event)

@spec send_event(t(), KubeMQ.Event.t()) :: :ok | {:error, KubeMQ.Error.t()}

Send a single event (fire-and-forget).

Errors

  • :validation — channel is empty or contains whitespace
  • :transient — temporary network/server issue (retryable)
  • :timeout — operation timed out (retryable)
  • :client_closed — client has been closed
  • :buffer_full — reconnect buffer overflow

See the "Reconnection Behavior" section for how this operation behaves during connection loss and recovery.

send_event!(client, event)

@spec send_event!(t(), KubeMQ.Event.t()) :: :ok

Send a single event, raising KubeMQ.Error on failure.

Same error conditions as send_event/2 — see its ## Errors section.

send_event_store(client, event)

@spec send_event_store(t(), KubeMQ.EventStore.t()) ::
  {:ok, KubeMQ.EventStoreResult.t()} | {:error, KubeMQ.Error.t()}

Send a persistent event to Events Store.

Errors

  • :validation — channel is empty or contains whitespace
  • :transient — temporary network/server issue (retryable)
  • :timeout — operation timed out (retryable)
  • :client_closed — client has been closed
  • :buffer_full — reconnect buffer overflow

See the "Reconnection Behavior" section for how this operation behaves during connection loss and recovery.

send_event_store!(client, event)

@spec send_event_store!(t(), KubeMQ.EventStore.t()) :: KubeMQ.EventStoreResult.t()

Send a persistent event, raising KubeMQ.Error on failure.

Same error conditions as send_event_store/2 — see its ## Errors section.

send_event_store_stream(client)

@spec send_event_store_stream(t()) ::
  {:ok, KubeMQ.EventStoreStreamHandle.t()} | {:error, KubeMQ.Error.t()}

Open a bidirectional event store stream with awaitable confirmation.

Errors

  • :transient — temporary network/server issue (retryable)
  • :client_closed — client has been closed

See the "Reconnection Behavior" section for how this operation behaves during connection loss and recovery.

send_event_stream(client)

@spec send_event_stream(t()) ::
  {:ok, KubeMQ.EventStreamHandle.t()} | {:error, KubeMQ.Error.t()}

Open a bidirectional event stream. Returns a handle for sending events.

Errors

  • :transient — temporary network/server issue (retryable)
  • :client_closed — client has been closed

See the "Reconnection Behavior" section for how this operation behaves during connection loss and recovery.

send_event_stream!(client)

@spec send_event_stream!(t()) :: KubeMQ.EventStreamHandle.t()

Open a bidirectional event stream, raising KubeMQ.Error on failure.

Same error conditions as send_event_stream/1 — see its ## Errors section.

send_query(client, query)

@spec send_query(t(), KubeMQ.Query.t()) ::
  {:ok, KubeMQ.QueryResponse.t()} | {:error, KubeMQ.Error.t()}

Send a query and wait for a response.

Errors

  • :validation — channel is empty, missing body/metadata, invalid timeout, or invalid cache params
  • :transient — temporary network/server issue (retryable)
  • :timeout — operation timed out (retryable)
  • :client_closed — client has been closed
  • :buffer_full — reconnect buffer overflow

See the "Reconnection Behavior" section for how this operation behaves during connection loss and recovery.

send_query!(client, query)

@spec send_query!(t(), KubeMQ.Query.t()) :: KubeMQ.QueryResponse.t()

Send a query, raising KubeMQ.Error on failure.

Same error conditions as send_query/2 — see its ## Errors section.

send_query_response(client, reply)

@spec send_query_response(t(), KubeMQ.QueryReply.t()) ::
  :ok | {:error, KubeMQ.Error.t()}

Send a query response (for manual response handling).

Errors

  • :validation — missing required response fields
  • :transient — temporary network/server issue (retryable)
  • :client_closed — client has been closed

send_queue_message(client, msg)

@spec send_queue_message(t(), KubeMQ.QueueMessage.t()) ::
  {:ok, KubeMQ.QueueSendResult.t()} | {:error, KubeMQ.Error.t()}

Send a single queue message.

Errors

  • :validation — channel is empty or missing body/metadata
  • :transient — temporary network/server issue (retryable)
  • :timeout — operation timed out (retryable)
  • :client_closed — client has been closed
  • :buffer_full — reconnect buffer overflow

See the "Reconnection Behavior" section for how this operation behaves during connection loss and recovery.

send_queue_message!(client, msg)

@spec send_queue_message!(t(), KubeMQ.QueueMessage.t()) :: KubeMQ.QueueSendResult.t()

Send a single queue message, raising KubeMQ.Error on failure.

Same error conditions as send_queue_message/2 — see its ## Errors section.

send_queue_messages(client, msgs)

@spec send_queue_messages(t(), [KubeMQ.QueueMessage.t()]) ::
  {:ok, KubeMQ.QueueBatchResult.t()} | {:error, KubeMQ.Error.t()}

Send a batch of queue messages.

Errors

  • :validation — empty batch or any message has an empty channel
  • :transient — temporary network/server issue (retryable)
  • :timeout — operation timed out (retryable)
  • :client_closed — client has been closed
  • :buffer_full — reconnect buffer overflow

See the "Reconnection Behavior" section for how this operation behaves during connection loss and recovery.

send_queue_messages!(client, msgs)

@spec send_queue_messages!(t(), [KubeMQ.QueueMessage.t()]) ::
  KubeMQ.QueueBatchResult.t()

Send a batch of queue messages, raising KubeMQ.Error on failure.

Same error conditions as send_queue_messages/2 — see its ## Errors section.

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Start a KubeMQ client process linked to the caller.

Options

See KubeMQ.Config for the full schema. Required: :client_id.

Errors

  • :validation — invalid configuration (NimbleOptions validation failure)
  • :transient — unable to establish initial connection

Examples

{:ok, client} = KubeMQ.Client.start_link(
  address: "localhost:50000",
  client_id: "my-app"
)

subscribe_to_commands(client, channel, opts \\ [])

@spec subscribe_to_commands(t(), String.t(), keyword()) ::
  {:ok, KubeMQ.Subscription.t()} | {:error, KubeMQ.Error.t()}

Subscribe to incoming commands on a channel.

Options

  • :group - Consumer group name
  • :on_command - Required. fn %CommandReceive{} -> %CommandReply{} end
  • :on_error - Error callback

Errors

  • :validation — channel is empty or contains whitespace
  • :transient — temporary network/server issue (retryable)
  • :client_closed — client has been closed

See the "Reconnection Behavior" section for how this operation behaves during connection loss and recovery.

subscribe_to_commands!(client, channel, opts \\ [])

@spec subscribe_to_commands!(t(), String.t(), keyword()) :: KubeMQ.Subscription.t()

Subscribe to commands, raising KubeMQ.Error on failure.

Same error conditions as subscribe_to_commands/3 — see its ## Errors section.

subscribe_to_events(client, channel, opts \\ [])

@spec subscribe_to_events(t(), String.t(), keyword()) ::
  {:ok, KubeMQ.Subscription.t()} | {:error, KubeMQ.Error.t()}

Subscribe to events on a channel.

Options

  • :group - Consumer group name
  • :on_event - Callback fn %EventReceive{} -> :ok end
  • :on_error - Error callback fn %Error{} -> :ok end
  • :notify - PID to receive {:kubemq_event, event} messages

Errors

  • :validation — channel is empty or contains whitespace
  • :transient — temporary network/server issue (retryable)
  • :client_closed — client has been closed

See the "Reconnection Behavior" section for how this operation behaves during connection loss and recovery.

subscribe_to_events!(client, channel, opts \\ [])

@spec subscribe_to_events!(t(), String.t(), keyword()) :: KubeMQ.Subscription.t()

Subscribe to events, raising KubeMQ.Error on failure.

Same error conditions as subscribe_to_events/3 — see its ## Errors section.

subscribe_to_events_store(client, channel, opts \\ [])

@spec subscribe_to_events_store(t(), String.t(), keyword()) ::
  {:ok, KubeMQ.Subscription.t()} | {:error, KubeMQ.Error.t()}

Subscribe to Events Store on a channel.

Options

  • :start_at - Required. Start position (see KubeMQ.start_position())
  • :group - Consumer group name
  • :on_event - Callback fn %EventStoreReceive{} -> :ok end
  • :on_error - Error callback
  • :notify - PID to receive {:kubemq_event_store, event} messages

Errors

  • :validation — channel is empty or start position is invalid
  • :transient — temporary network/server issue (retryable)
  • :client_closed — client has been closed

See the "Reconnection Behavior" section for how this operation behaves during connection loss and recovery.

subscribe_to_queries(client, channel, opts \\ [])

@spec subscribe_to_queries(t(), String.t(), keyword()) ::
  {:ok, KubeMQ.Subscription.t()} | {:error, KubeMQ.Error.t()}

Subscribe to incoming queries on a channel.

Options

  • :group - Consumer group name
  • :on_query - Required. fn %QueryReceive{} -> %QueryReply{} end
  • :on_error - Error callback

Errors

  • :validation — channel is empty or contains whitespace
  • :transient — temporary network/server issue (retryable)
  • :client_closed — client has been closed

See the "Reconnection Behavior" section for how this operation behaves during connection loss and recovery.

subscribe_to_queries!(client, channel, opts \\ [])

@spec subscribe_to_queries!(t(), String.t(), keyword()) :: KubeMQ.Subscription.t()

Subscribe to queries, raising KubeMQ.Error on failure.

Same error conditions as subscribe_to_queries/3 — see its ## Errors section.