Response from polling queue messages, with transaction management methods.
Tracks transaction state to prevent double-ack/nack. Transaction methods delegate to the downstream GenServer that owns the bidi stream.
Fields
transaction_id(String.t()) — Server-assigned transaction identifier.messages([KubeMQ.QueueMessage.t()]) — List of received queue messages.is_error(boolean()) — Whether the poll itself returned an error.error(String.t() | nil) — Error message ifis_erroris true.state(state()) — Transaction state::pending,:acked,:nacked, or:expired. Default::pending.downstream_pid(pid() | nil) — PID of the downstream GenServer managing the bidi stream.
Usage
{:ok, poll} = KubeMQ.Client.poll_queue(client, channel: "orders", max_items: 10)
# Process messages...
{:ok, _} = KubeMQ.PollResponse.ack_all(poll)
Summary
Functions
Acknowledge all messages in this poll response.
Acknowledge a specific range of messages by their sequence numbers.
Get the list of active (unacknowledged) message offsets in this transaction.
Reject (negative-acknowledge) all messages in this poll response.
Reject a specific range of messages by their sequence numbers.
Create a new PollResponse struct.
Requeue all messages to a different channel.
Requeue a specific range of messages to a different channel.
Check whether the transaction is still active on the server.
Types
Functions
@spec ack_all(t()) :: {:ok, t()} | {:error, KubeMQ.Error.t()}
Acknowledge all messages in this poll response.
Transitions the response state from :pending to :acked. Returns
{:error, %KubeMQ.Error{}} if already acked/nacked or if the downstream
process is not alive.
Errors
:validation— transaction is not in:pendingstate:stream_broken— downstream process is not alive
@spec ack_range(t(), sequences :: [integer()]) :: :ok | {:error, KubeMQ.Error.t()}
Acknowledge a specific range of messages by their sequence numbers.
Unlike ack_all/1, this does not transition the overall transaction state,
allowing partial acknowledgment within a poll response.
Errors
:validation— transaction is not in:pendingstate:stream_broken— downstream process is not alive
@spec active_offsets(t()) :: {:ok, [integer()]} | {:error, KubeMQ.Error.t()}
Get the list of active (unacknowledged) message offsets in this transaction.
Errors
:validation— transaction is not in:pendingstate:stream_broken— downstream process is not alive
@spec nack_all(t()) :: {:ok, t()} | {:error, KubeMQ.Error.t()}
Reject (negative-acknowledge) all messages in this poll response.
Transitions the response state from :pending to :nacked, making
the messages available for redelivery.
Errors
:validation— transaction is not in:pendingstate:stream_broken— downstream process is not alive
@spec nack_range(t(), sequences :: [integer()]) :: :ok | {:error, KubeMQ.Error.t()}
Reject a specific range of messages by their sequence numbers.
Rejected messages become available for redelivery. Does not transition the overall transaction state.
Errors
:validation— transaction is not in:pendingstate:stream_broken— downstream process is not alive
Create a new PollResponse struct.
Options
:transaction_id— Transaction ID string (default:""):messages— List ofKubeMQ.QueueMessagestructs (default:[]):is_error— Whether the poll returned an error (default:false):error— Error message string ifis_erroris true:state— Transaction state atom (default::pending):downstream_pid— PID of the downstream GenServer
Examples
iex> poll = KubeMQ.PollResponse.new(transaction_id: "tx-1", messages: [])
iex> poll.state
:pending
iex> poll.transaction_id
"tx-1"
@spec requeue_all(t(), channel :: String.t()) :: {:ok, t()} | {:error, KubeMQ.Error.t()}
Requeue all messages to a different channel.
Moves all messages from this poll response to requeue_channel and
transitions the state to :acked.
Errors
:validation— transaction is not in:pendingstate, or requeue channel is invalid:stream_broken— downstream process is not alive
@spec requeue_range(t(), sequences :: [integer()], channel :: String.t()) :: :ok | {:error, KubeMQ.Error.t()}
Requeue a specific range of messages to a different channel.
Moves messages identified by sequences to requeue_channel. Does not
transition the overall transaction state.
Errors
:validation— transaction is not in:pendingstate, or requeue channel is invalid:stream_broken— downstream process is not alive
@spec transaction_status(t()) :: {:ok, boolean()} | {:error, KubeMQ.Error.t()}
Check whether the transaction is still active on the server.
Returns {:ok, true} if the transaction is active, {:ok, false} if expired.
Errors
:stream_broken— downstream process is not alive