KubeMQ.Broadway.EventsStore (kubemq v1.0.1)

Copy Markdown View Source

Broadway producer for KubeMQ Events Store (Persistent Pub/Sub).

Wraps KubeMQ.Client.subscribe_to_events_store/3 and delivers each KubeMQ.EventStoreReceive as a Broadway.Message.

Usage

defmodule MyStorePipeline do
  use Broadway

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {KubeMQ.Broadway.EventsStore, [
          client: MyApp.KubeMQ,
          channel: "orders",
          group: "processor",
          start_at: :start_from_first
        ]}
      ],
      processors: [default: [concurrency: 2]]
    )
  end

  @impl true
  def handle_message(_processor, message, _context) do
    event = message.data
    IO.puts("Sequence #{event.sequence}: #{event.channel}")
    message
  end
end

Summary

Types

option()

@type option() ::
  {:client, GenServer.server()}
  | {:channel, String.t()}
  | {:group, String.t()}
  | {:start_at, KubeMQ.EventsStoreType.start_position()}
  | {:max_buffer_size, pos_integer()}

Functions

handle_continue(atom, state)