OffBroadwayKlife
A Broadway connector for Apache Kafka, built on Klife.
It bridges Klife's consumer group manual mode with Broadway.
Installation
def deps do
[
{:off_broadway_klife, "~> 0.1.0"},
{:klife, "~> 1.1"}
]
endUsage
Configure and start a
Klife.Client(see the Klife docs) — this is your Kafka connection.Define the Broadway pipeline, pointing the producer at the client:
defmodule MyApp.Pipeline do use Broadway def start_link(_opts) do Broadway.start_link(__MODULE__, name: __MODULE__, producer: [ module: {OffBroadwayKlife.Producer, client: MyApp.KafkaClient, group_name: "my-broadway-group", topics: [[name: "orders"], [name: "events"]], receive_interval: 500}, concurrency: 1 ], processors: [default: [concurrency: 10]] ) end @impl true def handle_message(_processor, message, _context) do IO.inspect(message.data, label: "got record") message end endStart the client before the pipeline in your supervision tree. The producer starts its built-in Klife consumer group on the client and supervises it as part of the pipeline — there is no consumer group to define or add yourself.
children = [ MyApp.KafkaClient, MyApp.Pipeline ]
Message format
:message_format controls the shape of each Broadway.Message:
:klife(default) —message.datais the fullKlife.Record, the same struct used across Klife's produce/fetch APIs (value, key, map headers,timestamp,consumer_attempts,batch_attributes, …).message.metadatais empty. Lossless and consistent with the rest of Klife — prefer it for new pipelines.:broadway_kafka—message.datais the raw value andmessage.metadatamirrors broadway_kafka:%{topic, partition, offset, key, ts, headers}with headers as{key, value}tuples. DropOffBroadwayKlife.Producerwith this message format into an existing broadway_kafka pipeline without touchinghandle_message/3.
Routing, batching, acknowledgement and offset commits are identical either way only the user-facing message shape changes.
Delivery semantics
Klife is at-least-once, and this connector preserves it. An offset is committed only once it and every lower delivered offset on the same partition have been acknowledged by Broadway, so out-of-order acks never advance the committed offset past an unprocessed record.
Because Kafka tracks a single committed offset per partition, a failed message
cannot be skipped while committing past it. Both successful and failed messages
advance the offset; handle failures explicitly via Broadway.handle_failed/2
(e.g. a dead-letter topic) rather than relying on them to block the partition.
@impl true
def handle_failed(messages, _context) do
Enum.each(messages, fn %Broadway.Message{data: record} ->
# Send record to DLQ
end)
messages
endProducer concurrency
The Klife consumer group is started once and keeps a single membership no
matter how many producers run. You may set producer concurrency > 1: each
assigned {topic, partition} is claimed by exactly one producer via a stable
hash, so producers share the pull/commit load with no overlap and no extra group
members with no coordinator process overhead. For maximum concurrency raise it
up to the expected number of assigned partitions for a given member of the group,
any exceeding producer will stay idle.
Ordering
Kafka orders records per topic-partition, and the connector preserves that
ordering end to end. Each partition is pulled by exactly one producer, which
emits it in offset order, and the connector sets Broadway's :partition_by so
that all records of a given {topic, partition} are always handled by the same
processor (and batcher) stage. Different partitions are still processed
concurrently.