Broadway producer for KubeMQ Events Store (Persistent Pub/Sub).
Wraps KubeMQ.Client.subscribe_to_events_store/3 and delivers each
KubeMQ.EventStoreReceive as a Broadway.Message.
Usage
defmodule MyStorePipeline do
use Broadway
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {KubeMQ.Broadway.EventsStore, [
client: MyApp.KubeMQ,
channel: "orders",
group: "processor",
start_at: :start_from_first
]}
],
processors: [default: [concurrency: 2]]
)
end
@impl true
def handle_message(_processor, message, _context) do
event = message.data
IO.puts("Sequence #{event.sequence}: #{event.channel}")
message
end
end
Summary
Types
@type option() :: {:client, GenServer.server()} | {:channel, String.t()} | {:group, String.t()} | {:start_at, KubeMQ.EventsStoreType.start_position()} | {:max_buffer_size, pos_integer()}