Klife.Consumer.ConsumerGroup behaviour (Klife v1.0.0)
View SourceDefines a consumer group.
A consumer group coordinates the consumption of one or more topics across a set of cooperating member processes. Klife implements the new KIP-848 rebalance protocol introduced in Kafka 4.0, so partition assignment, heartbeats, and rebalances are handled by the broker coordinator and do not require any user-managed state machine.
Each use Klife.Consumer.ConsumerGroup defines one consumer group module. The
module is started under your supervision tree and, once running, the broker
coordinator assigns it topic-partitions. For each assigned partition, the group
starts a dedicated consumer process that fetches records and delivers them to
your handle_record_batch/4 callback.
Defining a consumer group
defmodule MyApp.MyConsumerGroup do
use Klife.Consumer.ConsumerGroup,
client: MyApp.MyClient,
group_name: "my_group_name",
topics: [
[name: "my_topic_1"],
[name: "my_topic_2"]
]
@impl true
def handle_record_batch(_topic, _partition, _group_name, records) do
Enum.each(records, &IO.inspect(&1.value))
:commit
end
enduse Klife.Consumer.ConsumerGroup
When you use Klife.Consumer.ConsumerGroup, it will extend your module in two ways:
Implement the
Klife.Consumer.ConsumerGroupbehaviour, requiring at least thehandle_record_batch/4callback.Define it as a
GenServerthat runs the consumer group machinery (heartbeats, assignment handling, fetcher and committer lifecycle) and can be started under your application's supervision tree.
All options passed to use are merged with the options passed to start_link/1,
with start_link/1 arguments taking precedence. This allows the most common
configuration to live alongside the module definition while still enabling
per-instance overrides at startup time.
Configuration options
:client(atom/0) - Required. The name of the klife client to be used by the consumer group:topics(List ofKlife.Consumer.ConsumerGroup.TopicConfigconfigurations) - Required. List of topic configurations that will be handled by the consumer group:group_name(String.t/0) - Required. Name of the consumer group:instance_id(String.t/0) - Value to identify the consumer across restarts (static membership). See KIP-345:rebalance_timeout_ms(non_neg_integer/0) - The maximum time in milliseconds that the kafka broker coordinator will wait on the member to revoke it's partitions The default value is30000.:fetch_strategy({:exclusive, fetcher_options}or{:shared, fetcher_name}) - Defines whichKlife.Consumer.Fetcheris used by topics in this group that do not override the strategy in their ownKlife.Consumer.ConsumerGroup.TopicConfig.{:exclusive, fetcher_options}: starts a new fetcher dedicated to this group.fetcher_optionsfollowsKlife.Consumer.Fetcherconfiguration (:nameis managed automatically and cannot be set here).{:shared, fetcher_name}: reuses an existing fetcher (one already started by the client). All topics that do not override their strategy share it.
Defaults to
{:shared, <client default fetcher>}.:committers_count(pos_integer/0) - How many committer processes will be started for the consumer group The default value is1.:default_topic_config(keyword/0) -Klife.Consumer.ConsumerGroup.TopicConfigthat will serve as default for every topic on the group that does not explicitly set it. The default value is[].:fetch_strategy({:exclusive, fetcher_options}or{:shared, fetcher_name}) - Overrides the fetch strategy defined at the consumer group level for this topic.The override semantics differ slightly from the group-level option:
{:exclusive, fetcher_options}: starts a new fetcher dedicated to this single topic (not shared with other topics in the group).{:shared, fetcher_name}: uses the given pre-existing fetcher only for this topic.
If not set, the topic uses the fetcher selected by the consumer group's
:fetch_strategy.:isolation_level- May override the isolation level defined on the consumer group The default value is:read_committed.:offset_reset_policy- Define from which offset the consumer will start processing records when no previous committed offset is found. The default value is:latest.:fetch_max_bytes(non_neg_integer/0) - The maximum amount of bytes to fetch in a single request. Must be lower than fetcher configmax_bytes_per_request. The default value is1000000.:max_queue_size(non_neg_integer/0) - The maximum number of record batches that the consumer can keep on its internal queue. Defaults to 5 if:handler_max_batch_sizeis:dynamic, otherwise defaults to 20.:fetch_interval_ms(non_neg_integer/0) - Time in milliseconds that the consumer will wait before trying to fetch new data from the broker after it runs out of records to process.The consumer always tries to optimize fetch requests wait times by issuing requests before it's internal queue is empty (the current threshold is 1 batch). Therefore this option is only used for the wait time after a fetch request returns empty.
The default value is
1000.:handler_cooldown_ms(non_neg_integer/0) - Time in milliseconds that the consumer will wait before handling new records. Can be overrided for one cycle by the handler return value. The default value is0.:handler_max_unacked_commits(non_neg_integer/0) - Controls how many records can be waiting for commit confirmation before the consumer stops processing new records.When this limit is reached, processing pauses until confirmations are received.
Set it to 0 to process records one batch at a time so each processing cycle must be fully committed before starting the next.
The default value is
0.:handler_max_batch_size- The maximum amount of records that will be delivered to the handler in each processing cycle. If:dynamicall records retrieved in the fetch request will be delivered as one single batch to the handler. If positive integer, retrieved records will be chunked into the provided size. The default value is:dynamic.
Topic configuration
Each entry of :topics is a Klife.Consumer.ConsumerGroup.TopicConfig. The
group's :default_topic_config provides defaults applied to every topic that
does not explicitly override them:
use Klife.Consumer.ConsumerGroup,
client: MyApp.MyClient,
group_name: "my_group",
default_topic_config: [
handler_max_unacked_commits: 100,
offset_reset_policy: :earliest
],
topics: [
[name: "topic_a"],
[name: "topic_b", offset_reset_policy: :latest]
]Above, topic_a inherits both defaults, while topic_b overrides
:offset_reset_policy and inherits :handler_max_unacked_commits.
Delivery semantics
Klife provides at-least-once delivery: every record is guaranteed to be
delivered to handle_record_batch/4 at least once, but may be delivered
more than once after a hard failure (member crash, network split, liveness
timeout). Handlers must be idempotent to tolerate duplicates.
Graceful partition revocation does not cause reprocessing: the consumer waits for every in-flight record to be committed before releasing the partition. Reprocessing only occurs when the consumer is unable to commit before losing the assignment.
The window of records exposed to reprocessing is bounded by
:handler_max_unacked_commits (a TopicConfig option):
0(default): each batch must be fully committed before the next is delivered. Reprocessing on failure is limited to the records of the most recent in-flight batch.N > 0: up toNprocessed records may be waiting for commit at any time. This unlocks commit pipelining and higher throughput, but on a hard failure those records may be redelivered.
Retrying records
Klife's per-record :commit/:retry control on handle_record_batch/4
is more granular than what Kafka itself tracks: Kafka stores a single
committed offset per partition for each consumer group, and on resume the
consumer always picks up from there.
This gap leaves room for a misuse pattern where a batch returns :commit
for a record and :retry for one with a lower offset in the same
partition. For example, committing record 2 while asking record 1
to be retried. While the consumer is running, Klife keeps 1 in its
in-process queue and re-delivers it on the next cycle. But if the
consumer restarts or the partition is revoked before 1 is committed,
the broker only knows about the commit of 2: the retry queue is lost
and 1 would be silently skipped. To prevent this, Klife detects the
pattern and raises.
To stay aligned with Kafka semantics, never retry a record whose offset is lower than any record committed in the same batch. When a record fails, choose one of:
- Retry the failing record together with every successful record after it, accepting that the successful ones will be reprocessed once the failing one eventually succeeds.
- Commit the failing record and route it elsewhere (e.g. a dead-letter topic) so processing can move forward.
Fetch strategies
Every partition consumer fetches records through a Klife.Consumer.Fetcher,
and which fetcher serves a topic is decided in two layers: the consumer
group's :fetch_strategy sets a default for all its topics, and any
TopicConfig can override that default for itself.
The same two strategies, {:shared, fetcher_name} and
{:exclusive, fetcher_options}, are accepted at both levels, but the
scope of :exclusive differs:
{:exclusive, opts}at the group level starts one new fetcher dedicated to the consumer group. Every topic that does not override the strategy shares it.{:exclusive, opts}at the topic level starts one new fetcher dedicated to that single topic, not shared with anything else.
{:shared, fetcher_name} behaves identically at both levels: it reuses an
existing fetcher (one already started by the client) for whatever scope it
is set on.
When the group's :fetch_strategy is unset, it falls back to
{:shared, <client default fetcher>}, so topics in the group share the
client's default fetcher with every other consumer of it.
Override at the group level when the entire group needs an isolated fetch
pipeline (e.g. its own batchers or max_bytes_per_request). Override at
the topic level when a single noisy or latency-sensitive topic should not
share its fetcher with the rest of the group.
How many committers?
Offset commits are dispatched by dedicated committer processes shared across the partitions of the group. A single committer is enough for most workloads.
Before raising :committers_count, check whether the apparent bottleneck is
committer dispatch or simply commit RTT pacing the handler, see
Tuning for throughput.
Raising :handler_max_unacked_commits decouples handler cadence from commit
latency and resolves most apparent "commit bottleneck" symptoms without
adding parallelism.
Increase :committers_count only when the committer process itself can't
keep up, typically a group with many actively committing partitions where
profiling shows the committer as the bottleneck. This comes at the cost of
per-commit batching efficiency, since partitions are split across committers.
Summary
Callbacks
Called when a consumer process is started for a topic-partition after assignment.
Called when a consumer process is stopped for a topic-partition.
Called for each batch of records delivered to the consumer.
Types
@type action() :: :commit | :retry
@type callback_opts() :: [{:handler_cooldown_ms, non_neg_integer()}]
Callbacks
@callback handle_consumer_start( topic :: String.t(), partition :: integer(), group_name :: String.t() ) :: :ok
Called when a consumer process is started for a topic-partition after assignment.
@callback handle_consumer_stop( topic :: String.t(), partition :: integer(), group_name :: String.t(), reason :: term() ) :: :ok
Called when a consumer process is stopped for a topic-partition.
The reason reflects why the consumer terminated and depends on the trigger:
- On a partition revocation initiated by the group, the reason is
{:shutdown, {:assignment_revoked, topic_id, partition_index}}. - On a normal group shutdown, it follows the standard GenServer termination
semantics (e.g.
:normal,:shutdown). - On unexpected termination (callback raise, liveness timeout, internal error), it carries the underlying exit reason.
@callback handle_record_batch( topic :: String.t(), partition :: integer(), group_name :: String.t(), [Klife.Record.t()] ) :: action() | {action(), callback_opts()} | [{action(), Klife.Record.t()}] | {[{action(), Klife.Record.t()}], callback_opts()}
Called for each batch of records delivered to the consumer.
Receives the topic name, partition, group name, and a list of Klife.Record structs.
The return value controls what happens with the records.
Batch-level control
Return a single action to apply to the entire batch:
:commit: commits all records in the batch.:retry: retries the entire batch.{action, callback_opts}: same as above, with options.
Per-record control
Return a list of {action, record} tuples for fine-grained control. Records
tagged with :commit have their offsets advanced; records tagged with :retry
are re-delivered in the next batch.
The list must not commit any record at a higher offset than a retried one in the same batch — Klife raises in that case. See Retrying records in the moduledoc for the rationale and safe patterns. The example below applies the standard "stop on first failure" shape:
def handle_record_batch(_topic, _partition, _group_name, records) do
case Enum.split_while(records, fn record -> process(record) == :ok end) do
{_ok, []} ->
:commit
{ok, retry} ->
Enum.map(ok, &{:commit, &1}) ++ Enum.map(retry, &{:retry, &1})
end
endCallback options
Both batch-level and per-record return values can be wrapped in a tuple with
callback_opts as the second element. The available options:
handler_cooldown_ms: overrides the topic-levelhandler_cooldown_msfor the next processing cycle only. Useful for implementing backoff on transient failures.