KubeMQ.PollResponse (kubemq v1.0.1)

Copy Markdown View Source

Response from polling queue messages, with transaction management methods.

Tracks transaction state to prevent double-ack/nack. Transaction methods delegate to the downstream GenServer that owns the bidi stream.

Fields

  • transaction_id (String.t()) — Server-assigned transaction identifier.
  • messages ([KubeMQ.QueueMessage.t()]) — List of received queue messages.
  • is_error (boolean()) — Whether the poll itself returned an error.
  • error (String.t() | nil) — Error message if is_error is true.

  • state (state()) — Transaction state: :pending, :acked, :nacked, or :expired. Default: :pending.
  • downstream_pid (pid() | nil) — PID of the downstream GenServer managing the bidi stream.

Usage

{:ok, poll} = KubeMQ.Client.poll_queue(client, channel: "orders", max_items: 10)
# Process messages...
{:ok, _} = KubeMQ.PollResponse.ack_all(poll)

Summary

Functions

Acknowledge all messages in this poll response.

Acknowledge a specific range of messages by their sequence numbers.

Get the list of active (unacknowledged) message offsets in this transaction.

Reject (negative-acknowledge) all messages in this poll response.

Reject a specific range of messages by their sequence numbers.

Create a new PollResponse struct.

Requeue all messages to a different channel.

Requeue a specific range of messages to a different channel.

Check whether the transaction is still active on the server.

Types

state()

@type state() :: :pending | :acked | :nacked | :expired

t()

@type t() :: %KubeMQ.PollResponse{
  downstream_pid: pid() | nil,
  error: String.t() | nil,
  is_error: boolean(),
  messages: [KubeMQ.QueueMessage.t()],
  state: state(),
  transaction_id: String.t()
}

Functions

ack_all(poll)

@spec ack_all(t()) :: {:ok, t()} | {:error, KubeMQ.Error.t()}

Acknowledge all messages in this poll response.

Transitions the response state from :pending to :acked. Returns {:error, %KubeMQ.Error{}} if already acked/nacked or if the downstream process is not alive.

Errors

  • :validation — transaction is not in :pending state
  • :stream_broken — downstream process is not alive

ack_range(poll, sequences)

@spec ack_range(t(), sequences :: [integer()]) :: :ok | {:error, KubeMQ.Error.t()}

Acknowledge a specific range of messages by their sequence numbers.

Unlike ack_all/1, this does not transition the overall transaction state, allowing partial acknowledgment within a poll response.

Errors

  • :validation — transaction is not in :pending state
  • :stream_broken — downstream process is not alive

active_offsets(poll)

@spec active_offsets(t()) :: {:ok, [integer()]} | {:error, KubeMQ.Error.t()}

Get the list of active (unacknowledged) message offsets in this transaction.

Errors

  • :validation — transaction is not in :pending state
  • :stream_broken — downstream process is not alive

nack_all(poll)

@spec nack_all(t()) :: {:ok, t()} | {:error, KubeMQ.Error.t()}

Reject (negative-acknowledge) all messages in this poll response.

Transitions the response state from :pending to :nacked, making the messages available for redelivery.

Errors

  • :validation — transaction is not in :pending state
  • :stream_broken — downstream process is not alive

nack_range(poll, sequences)

@spec nack_range(t(), sequences :: [integer()]) :: :ok | {:error, KubeMQ.Error.t()}

Reject a specific range of messages by their sequence numbers.

Rejected messages become available for redelivery. Does not transition the overall transaction state.

Errors

  • :validation — transaction is not in :pending state
  • :stream_broken — downstream process is not alive

new(opts \\ [])

@spec new(keyword()) :: t()

Create a new PollResponse struct.

Options

  • :transaction_id — Transaction ID string (default: "")
  • :messages — List of KubeMQ.QueueMessage structs (default: [])
  • :is_error — Whether the poll returned an error (default: false)
  • :error — Error message string if is_error is true
  • :state — Transaction state atom (default: :pending)
  • :downstream_pid — PID of the downstream GenServer

Examples

iex> poll = KubeMQ.PollResponse.new(transaction_id: "tx-1", messages: [])
iex> poll.state
:pending
iex> poll.transaction_id
"tx-1"

requeue_all(poll, requeue_channel)

@spec requeue_all(t(), channel :: String.t()) ::
  {:ok, t()} | {:error, KubeMQ.Error.t()}

Requeue all messages to a different channel.

Moves all messages from this poll response to requeue_channel and transitions the state to :acked.

Errors

  • :validation — transaction is not in :pending state, or requeue channel is invalid
  • :stream_broken — downstream process is not alive

requeue_range(poll, sequences, requeue_channel)

@spec requeue_range(t(), sequences :: [integer()], channel :: String.t()) ::
  :ok | {:error, KubeMQ.Error.t()}

Requeue a specific range of messages to a different channel.

Moves messages identified by sequences to requeue_channel. Does not transition the overall transaction state.

Errors

  • :validation — transaction is not in :pending state, or requeue channel is invalid
  • :stream_broken — downstream process is not alive

transaction_status(poll)

@spec transaction_status(t()) :: {:ok, boolean()} | {:error, KubeMQ.Error.t()}

Check whether the transaction is still active on the server.

Returns {:ok, true} if the transaction is active, {:ok, false} if expired.

Errors

  • :stream_broken — downstream process is not alive