KubeMQ.Broadway.Queues (kubemq v1.0.1)

Copy Markdown View Source

Broadway producer for KubeMQ Queues.

Polls queue messages via KubeMQ.Client.poll_queue/2 on demand and delivers each KubeMQ.QueueMessage as a Broadway.Message. Includes a built-in Broadway.Acknowledger that maps Broadway ack/reject to queue ack/nack.

Usage

defmodule MyQueuePipeline do
  use Broadway

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {KubeMQ.Broadway.Queues, [
          client: MyApp.KubeMQ,
          channel: "tasks",
          max_items: 10,
          wait_timeout: 5_000,
          auto_ack: false
        ]}
      ],
      processors: [default: [concurrency: 2]]
    )
  end

  @impl true
  def handle_message(_processor, message, _context) do
    queue_msg = message.data
    IO.puts("Processing: #{queue_msg.channel}")
    message
  end
end

Summary

Types

option()

@type option() ::
  {:client, GenServer.server()}
  | {:channel, String.t()}
  | {:max_items, pos_integer()}
  | {:wait_timeout, pos_integer()}
  | {:auto_ack, boolean()}