A Broadway producer that consumes from Kafka through Klife's consumer group manual mode.
Klife runs the full consumer-group machinery, heartbeats, rebalances, fetching and buffering,
while this producer drives delivery: it pulls buffered batches on demand, turns each Klife.Record
into a Broadway.Message, and commits offsets back as Broadway acknowledges them.
Usage
Define a Klife.Client (see Klife's docs for client configuration) and make
sure it is started in your supervision tree before the pipeline:
defmodule MyApp.KafkaClient do
use Klife.Client, otp_app: :my_app
endThen start Broadway with this producer, pointing it at the client:
defmodule MyApp.Pipeline do
use Broadway
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module:
{OffBroadwayKlife.Producer,
client: MyApp.KafkaClient,
group_name: "my-broadway-group",
topics: [[name: "orders"], [name: "events"]],
receive_interval: 500},
concurrency: 1
],
processors: [default: [concurrency: 10]]
)
end
@impl true
def handle_message(_processor, message, _context) do
IO.inspect(message.data)
message
end
endThe producer starts its built-in consumer group, on the client and supervises it as part of the pipeline, so there is no
consumer group to define or add to your supervision tree. The built-in group
is bound to the client on first start, which means all :client pipelines in
a node must use the same Klife client.
Using a consumer group module
To run Klife's consumer lifecycle callbacks (handle_consumer_start/3 and
handle_consumer_stop/4) alongside Broadway — or to consume through more
than one Klife client — define a consumer group module and pass it as
:consumer_group instead of :client (the two options are mutually
exclusive):
defmodule MyApp.KafkaConsumerGroup do
use Klife.Consumer.ConsumerGroup, client: MyApp.KafkaClient
@impl true
def handle_consumer_start(_topic, _partition, _group_name) do
# e.g. emit telemetry
:ok
end
end
producer: [
module:
{OffBroadwayKlife.Producer,
consumer_group: MyApp.KafkaConsumerGroup,
group_name: "my-broadway-group",
topics: [[name: "orders"]]}
]Do not implement handle_record_batch/4: Broadway drives fetching and
committing through Klife's manual mode, so that callback never runs. The
producer starts the group in either case — do not add it to your supervision
tree.
Options
:client- A module thatusesKlife.Client. The producer starts the default consumer group module on this client (a single group membership) and drives it in manual mode. All:clientpipelines in a node must use the same client.. Exactly one of:clientor:consumer_groupmust be set.:consumer_group- A module thatusesKlife.Consumer.ConsumerGroup, as an alternative to:clientfor when you also want the group's lifecycle callbacks. Started once by the producer (a single group membership) and driven in manual mode.:group_name(String.t/0) - Required. The Kafka consumer group name.:topics(list ofkeyword/0) - Required. List ofKlife.Consumer.ConsumerGroup.TopicConfigkeyword lists, e.g.[[name: "orders"], [name: "events", fetch_max_bytes: 500_000]]. Every topic is forced intomode: :manual.:receive_interval(non_neg_integer/0) - Milliseconds to wait before polling Klife again after a poll returned no records. The default value is1000.:message_format- Shape of the emittedBroadway.Messages. See the "Message format" section below. The default value is:klife.:fetch_strategy(term/0) - Forwarded to the consumer group. SeeKlife.Consumer.ConsumerGroup.:committers_count(pos_integer/0) - Forwarded to the consumer group. SeeKlife.Consumer.ConsumerGroup.:default_topic_config(keyword/0) - Forwarded to the consumer group. SeeKlife.Consumer.ConsumerGroup.:instance_id(String.t/0) - Forwarded to the consumer group (static membership). SeeKlife.Consumer.ConsumerGroup.:rebalance_timeout_ms(non_neg_integer/0) - Forwarded to the consumer group. SeeKlife.Consumer.ConsumerGroup.
Message format
:message_format controls the shape of each Broadway.Message:
:klife(default) -message.datais the fullKlife.Recordstruct, the same type used across Klife's produce and fetch APIs. It carries everything (value, key, headers as maps,timestamp,consumer_attempts,batch_attributes, ...), andmessage.metadatais empty. Prefer this for new pipelines: it is lossless and consistent with the rest of Klife.def handle_message(_, %{data: %Klife.Record{value: value}} = msg, _), do: msg:broadway_kafka-message.datais the raw value andmessage.metadatamirrors broadway_kafka:%{topic, partition, offset, key, ts, headers}with headers as{key, value}tuples. Use this to dropOffBroadwayKlife.Producerinto an existing broadway_kafka pipeline without changinghandle_message/3.
Either way, routing, batching, acknowledgement and offset commits are identical — only the user-facing message shape changes.
Producer concurrency
The Klife consumer group is started once and keeps a single membership no
matter how many producers run. You may set producer concurrency > 1: each
assigned {topic, partition} is claimed by exactly one producer via a stable
hash, so producers share the pull/commit load with no overlap and no extra group
members with no coordinator process overhead. For maximum concurrency raise it
up to the expected number of assigned partitions for a given member of the group,
any exceeding producer will stay idle.
Raise concurrency up to the number of partitions you expect to be
assigned to the application; producers beyond assigned stay idle.
Ordering
Kafka guarantees ordering per topic-partition. The connector preserves it end
to end: each partition is pulled by exactly one producer, which emits its
records in offset order, and it sets Broadway's :partition_by so that every
record of a given {topic, partition} is always routed to the same processor
(and batcher) stage. Records of different partitions still process
concurrently.
When you use a batcher, each batch also holds a single partition's records:
the connector defaults :batch_key to {topic, partition}, so a
handle_batch/4 call maps to one partition's contiguous offset range.
You may override :batch_key (or :batcher) in handle_message/3:
- Coarsening it (e.g. to the topic, or leaving it
:default) packs records from several partitions into onehandle_batchcall, giving fuller batches and fewer round-trips when a node owns many low-volume partitions. Per-partition ordering is still preserved — the batch just interleaves partitions, so group bypartitioninsidehandle_batchif your logic needs to. - Refining it (a sub-partition key) or routing to multiple
:batchers re-splits a partition across independently-flushing batches, which gives up strict per-partition ordering. Use it only when per-key (not per-partition) ordering is enough, or to fan message types out to different sinks (e.g. a dead-letter batcher).
Offset commits stay correct in every case regardless of how records are batched.
Because the connector manages :partition_by, you must not set it yourself —
doing so raises. Scale out across partitions with processor/batcher
concurrency.
Delivery semantics
Klife provides at-least-once delivery and this producer preserves it: an offset is only committed once it and every lower delivered offset on the same partition* have been acknowledged by Broadway.
Because Kafka tracks a single committed offset per partition, a failed
message cannot be skipped while committing past it. Both successful and
failed messages therefore advance the offset; handle failures explicitly via
Broadway.handle_failed/2 (for example, by producing to a dead-letter
topic) rather than relying on them blocking the partition.