Broadway producer for KubeMQ Queues.
Polls queue messages via KubeMQ.Client.poll_queue/2 on demand and delivers
each KubeMQ.QueueMessage as a Broadway.Message. Includes a built-in
Broadway.Acknowledger that maps Broadway ack/reject to queue ack/nack.
Usage
defmodule MyQueuePipeline do
use Broadway
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {KubeMQ.Broadway.Queues, [
client: MyApp.KubeMQ,
channel: "tasks",
max_items: 10,
wait_timeout: 5_000,
auto_ack: false
]}
],
processors: [default: [concurrency: 2]]
)
end
@impl true
def handle_message(_processor, message, _context) do
queue_msg = message.data
IO.puts("Processing: #{queue_msg.channel}")
message
end
end
Summary
Types
@type option() :: {:client, GenServer.server()} | {:channel, String.t()} | {:max_items, pos_integer()} | {:wait_timeout, pos_integer()} | {:auto_ack, boolean()}