Klife.Consumer.ConsumerGroup behaviour (Klife v1.1.0)

View Source

Defines a consumer group.

A consumer group coordinates the consumption of one or more topics across a set of cooperating member processes. Klife implements the new KIP-848 rebalance protocol introduced in Kafka 4.0, so partition assignment, heartbeats, and rebalances are handled by the broker coordinator and do not require any user-managed state machine.

Each use Klife.Consumer.ConsumerGroup defines one consumer group module. The module is started under your supervision tree and, once running, the broker coordinator assigns it topic-partitions. For each assigned partition, the group starts a dedicated consumer process that fetches records and delivers them to your handle_record_batch/4 callback.

Defining a consumer group

defmodule MyApp.MyConsumerGroup do
  use Klife.Consumer.ConsumerGroup,
    client: MyApp.MyClient,
    group_name: "my_group_name",
    topics: [
      [name: "my_topic_1"],
      [name: "my_topic_2"]
    ]

  @impl true
  def handle_record_batch(_topic, _partition, _group_name, records) do
    Enum.each(records, &IO.inspect(&1.value))
    :commit
  end
end

use Klife.Consumer.ConsumerGroup

When you use Klife.Consumer.ConsumerGroup, it will extend your module in two ways:

  • Implement the Klife.Consumer.ConsumerGroup behaviour, requiring at least the handle_record_batch/4 callback.

  • Define it as a GenServer that runs the consumer group machinery (heartbeats, assignment handling, fetcher and committer lifecycle) and can be started under your application's supervision tree.

All options passed to use are merged with the options passed to start_link/1, with start_link/1 arguments taking precedence. This allows the most common configuration to live alongside the module definition while still enabling per-instance overrides at startup time.

Consumer group module bindings

A consumer group module relates very differently to the group_name it runs under and the Klife.Client it talks to: the group name can vary freely from one start to the next, while the client is fixed for the life of the module.

Group name: one module, many groups

A module is not tied to a single group name. The :group_name may be set on use or passed to start_link/1 (which takes precedence), and you can start the same module more than once with a different :group_name each time. Each instance runs as an independent group member with its own process, keyed by {client, module, group_name}, and its own assignments, consumer, and committer processes. This lets a single module definition serve several independently tracked groups (each group has its own committed offsets in Kafka) without defining a new module per group.

Client: one module, one client

Every consumer group module is bound to exactly one Klife.Client, exposed through the generated klife_client/0. There are two ways to establish the binding:

  • At compile time, passing :client to use as in the example above. klife_client/0 is then a constant function.

  • At start time, omitting :client from use and passing it to start_link/1 instead. On the first start the module is bound to that client through a persistent term whose key is {Klife.Consumer.ConsumerGroup, YourModule}, and klife_client/0 reads it (returning nil before the first start). This exists so libraries can ship a single consumer group module that works with whatever client the host application configures (e.g. broadway_klife). Do not write to that persistent term yourself.

Unlike the group name, this binding is permanent: starting the module with a :client different from the one it is bound to raises an ArgumentError, since the running machinery and the module wrappers would otherwise disagree about which client to talk to. To consume through a different client, define another consumer group module.

Configuration options

  • :client (atom/0) - Required. The name of the klife client to be used by the consumer group. May be set on use (binding the module at compile time) or passed to start_link/1 (binding it on first start). See "Consumer group module bindings" in the moduledoc.

  • :topics (List of Klife.Consumer.ConsumerGroup.TopicConfig configurations) - Required. List of topic configurations that will be handled by the consumer group

  • :group_name (String.t/0) - Required. Name of the consumer group

  • :instance_id (String.t/0) - Value to identify the consumer across restarts (static membership). See KIP-345

  • :rebalance_timeout_ms (non_neg_integer/0) - The maximum time in milliseconds that the kafka broker coordinator will wait on the member to revoke it's partitions The default value is 30000.

  • :fetch_strategy ({:exclusive, fetcher_options} or {:shared, fetcher_name}) - Defines which Klife.Consumer.Fetcher is used by topics in this group that do not override the strategy in their own Klife.Consumer.ConsumerGroup.TopicConfig.

    • {:exclusive, fetcher_options}: starts a new fetcher dedicated to this group. fetcher_options follows Klife.Consumer.Fetcher configuration (:name is managed automatically and cannot be set here).
    • {:shared, fetcher_name}: reuses an existing fetcher (one already started by the client). All topics that do not override their strategy share it.

    Defaults to {:shared, <client default fetcher>}.

  • :committers_count (pos_integer/0) - How many committer processes will be started for the consumer group The default value is 1.

  • :default_topic_config (keyword/0) - Klife.Consumer.ConsumerGroup.TopicConfig that will serve as default for every topic on the group that does not explicitly set it. The default value is [].

    • :fetch_strategy ({:exclusive, fetcher_options} or {:shared, fetcher_name}) - Overrides the fetch strategy defined at the consumer group level for this topic.

      The override semantics differ slightly from the group-level option:

      • {:exclusive, fetcher_options}: starts a new fetcher dedicated to this single topic (not shared with other topics in the group).
      • {:shared, fetcher_name}: uses the given pre-existing fetcher only for this topic.

      If not set, the topic uses the fetcher selected by the consumer group's :fetch_strategy.

    • :isolation_level - May override the isolation level defined on the consumer group The default value is :read_committed.

    • :offset_reset_policy - Define from which offset the consumer will start processing records when no previous committed offset is found. The default value is :latest.

    • :fetch_max_bytes (non_neg_integer/0) - The maximum amount of bytes to fetch in a single request. Must be lower than fetcher config max_bytes_per_request. The default value is 1000000.

    • :max_queue_size (non_neg_integer/0) - The maximum number of record batches that the consumer can keep on its internal queue. Defaults to 5 if :handler_max_batch_size is :dynamic, otherwise defaults to 20.

    • :fetch_interval_ms (non_neg_integer/0) - Time in milliseconds that the consumer will wait before trying to fetch new data from the broker after it runs out of records to process.

      The consumer always tries to optimize fetch requests wait times by issuing requests before it's internal queue is empty (the current threshold is 1 batch). Therefore this option is only used for the wait time after a fetch request returns empty.

      The default value is 1000.

    • :handler_cooldown_ms (non_neg_integer/0) - Time in milliseconds that the consumer will wait before handling new records. Can be overrided for one cycle by the handler return value.

      Ignored when :mode is :manual: pacing between pulls is the external driver's responsibility.

      The default value is 0.

    • :handler_max_unacked_commits (non_neg_integer/0) - Controls how many records can be waiting for commit confirmation before the consumer stops processing new records.

      When this limit is reached, processing pauses until confirmations are received.

      Set it to 0 to process records one batch at a time so each processing cycle must be fully committed before starting the next.

      Ignored when :mode is :manual: the external driver controls how many uncommitted batches it pulls, so the consumer never throttles pulls on pending commits.

      The default value is 0.

    • :handler_max_batch_size - The maximum amount of records that will be delivered to the handler in each processing cycle. If :dynamic all records retrieved in the fetch request will be delivered as one single batch to the handler. If positive integer, retrieved records will be chunked into the provided size.

      In :manual mode this still applies: it controls the size of each batch returned by pull/5 (chunking happens at fetch time, before the queue).

      The default value is :dynamic.

    • :mode - Controls how records are delivered to user code.

      Manual mode exists so external pipelines (e.g. a Broadway producer) can reuse Klife's fetch, buffering, membership and rebalance machinery while driving processing themselves.

      The default value is :auto.

Topic configuration

Each entry of :topics is a Klife.Consumer.ConsumerGroup.TopicConfig. The group's :default_topic_config provides defaults applied to every topic that does not explicitly override them:

use Klife.Consumer.ConsumerGroup,
  client: MyApp.MyClient,
  group_name: "my_group",
  default_topic_config: [
    handler_max_unacked_commits: 100,
    offset_reset_policy: :earliest
  ],
  topics: [
    [name: "topic_a"],
    [name: "topic_b", offset_reset_policy: :latest]
  ]

Above, topic_a inherits both defaults, while topic_b overrides :offset_reset_policy and inherits :handler_max_unacked_commits.

Delivery semantics

Klife provides at-least-once delivery: every record is guaranteed to be delivered to handle_record_batch/4 at least once, but may be delivered more than once after a hard failure (member crash, network split, liveness timeout). Handlers must be idempotent to tolerate duplicates.

Graceful partition revocation does not cause reprocessing: the consumer waits for every in-flight record to be committed before releasing the partition. Reprocessing only occurs when the consumer is unable to commit before losing the assignment.

The window of records exposed to reprocessing is bounded by :handler_max_unacked_commits (a TopicConfig option):

  • 0 (default): each batch must be fully committed before the next is delivered. Reprocessing on failure is limited to the records of the most recent in-flight batch.
  • N > 0: up to N processed records may be waiting for commit at any time. This unlocks commit pipelining and higher throughput, but on a hard failure those records may be redelivered.

Retrying records

Klife's per-record :commit/:retry control on handle_record_batch/4 is more granular than what Kafka itself tracks: Kafka stores a single committed offset per partition for each consumer group, and on resume the consumer always picks up from there.

This gap leaves room for a misuse pattern where a batch returns :commit for a record and :retry for one with a lower offset in the same partition. For example, committing record 2 while asking record 1 to be retried. While the consumer is running, Klife keeps 1 in its in-process queue and re-delivers it on the next cycle. But if the consumer restarts or the partition is revoked before 1 is committed, the broker only knows about the commit of 2: the retry queue is lost and 1 would be silently skipped. To prevent this, Klife detects the pattern and raises.

To stay aligned with Kafka semantics, never retry a record whose offset is lower than any record committed in the same batch. When a record fails, choose one of:

  • Retry the failing record together with every successful record after it, accepting that the successful ones will be reprocessed once the failing one eventually succeeds.
  • Commit the failing record and route it elsewhere (e.g. a dead-letter topic) so processing can move forward.

Fetch strategies

Every partition consumer fetches records through a Klife.Consumer.Fetcher, and which fetcher serves a topic is decided in two layers: the consumer group's :fetch_strategy sets a default for all its topics, and any TopicConfig can override that default for itself.

The same two strategies, {:shared, fetcher_name} and {:exclusive, fetcher_options}, are accepted at both levels, but the scope of :exclusive differs:

  • {:exclusive, opts} at the group level starts one new fetcher dedicated to the consumer group. Every topic that does not override the strategy shares it.
  • {:exclusive, opts} at the topic level starts one new fetcher dedicated to that single topic, not shared with anything else.

{:shared, fetcher_name} behaves identically at both levels: it reuses an existing fetcher (one already started by the client) for whatever scope it is set on.

When the group's :fetch_strategy is unset, it falls back to {:shared, <client default fetcher>}, so topics in the group share the client's default fetcher with every other consumer of it.

Override at the group level when the entire group needs an isolated fetch pipeline (e.g. its own batchers or max_bytes_per_request). Override at the topic level when a single noisy or latency-sensitive topic should not share its fetcher with the rest of the group.

How many committers?

Offset commits are dispatched by dedicated committer processes shared across the partitions of the group. A single committer is enough for most workloads.

Before raising :committers_count, check whether the apparent bottleneck is committer dispatch or simply commit RTT pacing the handler, see Tuning for throughput. Raising :handler_max_unacked_commits decouples handler cadence from commit latency and resolves most apparent "commit bottleneck" symptoms without adding parallelism.

Increase :committers_count only when the committer process itself can't keep up, typically a group with many actively committing partitions where profiling shows the committer as the bottleneck. This comes at the cost of per-commit batching efficiency, since partitions are split across committers.

Summary

Callbacks

Called when a consumer process is started for a topic-partition after assignment.

Called when a consumer process is stopped for a topic-partition.

Called for each batch of records delivered to the consumer.

Functions

Returns the {topic_name, partition} tuples currently assigned to this node for the given consumer group.

Commits an offset for a :manual mode topic-partition consumer.

Pulls the next buffered batch of records from a :manual mode topic-partition consumer.

Types

action()

@type action() :: :commit | :retry

callback_opts()

@type callback_opts() :: [{:handler_cooldown_ms, non_neg_integer()}]

Callbacks

handle_consumer_start(topic, partition, group_name)

(optional)
@callback handle_consumer_start(
  topic :: String.t(),
  partition :: integer(),
  group_name :: String.t()
) :: :ok

Called when a consumer process is started for a topic-partition after assignment.

handle_consumer_stop(topic, partition, group_name, reason)

(optional)
@callback handle_consumer_stop(
  topic :: String.t(),
  partition :: integer(),
  group_name :: String.t(),
  reason :: term()
) :: :ok

Called when a consumer process is stopped for a topic-partition.

The reason reflects why the consumer terminated and depends on the trigger:

  • On a partition revocation initiated by the group, the reason is {:shutdown, {:assignment_revoked, topic_id, partition_index}}.
  • On a normal group shutdown, it follows the standard GenServer termination semantics (e.g. :normal, :shutdown).
  • On unexpected termination (callback raise, liveness timeout, internal error), it carries the underlying exit reason.

handle_record_batch(topic, partition, group_name, list)

(optional)
@callback handle_record_batch(
  topic :: String.t(),
  partition :: integer(),
  group_name :: String.t(),
  [Klife.Record.t()]
) ::
  action()
  | {action(), callback_opts()}
  | [{action(), Klife.Record.t()}]
  | {[{action(), Klife.Record.t()}], callback_opts()}

Called for each batch of records delivered to the consumer.

Receives the topic name, partition, group name, and a list of Klife.Record structs. The return value controls what happens with the records.

Batch-level control

Return a single action to apply to the entire batch:

  • :commit: commits all records in the batch.
  • :retry: retries the entire batch.
  • {action, callback_opts}: same as above, with options.

Per-record control

Return a list of {action, record} tuples for fine-grained control. Records tagged with :commit have their offsets advanced; records tagged with :retry are re-delivered in the next batch.

The list must not commit any record at a higher offset than a retried one in the same batch — Klife raises in that case. See Retrying records in the moduledoc for the rationale and safe patterns. The example below applies the standard "stop on first failure" shape:

def handle_record_batch(_topic, _partition, _group_name, records) do
  case Enum.split_while(records, fn record -> process(record) == :ok end) do
    {_ok, []} ->
      :commit

    {ok, retry} ->
      Enum.map(ok, &{:commit, &1}) ++ Enum.map(retry, &{:retry, &1})
  end
end

Callback options

Both batch-level and per-record return values can be wrapped in a tuple with callback_opts as the second element. The available options:

  • handler_cooldown_ms: overrides the topic-level handler_cooldown_ms for the next processing cycle only. Useful for implementing backoff on transient failures.

Functions

assigned_partitions(client_name, cg_mod, group_name)

@spec assigned_partitions(
  client :: atom(),
  cg_mod :: module(),
  group_name :: String.t()
) :: [{String.t(), integer()}]

Returns the {topic_name, partition} tuples currently assigned to this node for the given consumer group.

commit(client_name, cg_mod, group_name, topic_name, partition, offset)

@spec commit(
  client :: atom(),
  cg_mod :: module(),
  group_name :: String.t(),
  topic :: String.t(),
  partition :: integer(),
  offset :: integer()
) :: :ok

Commits an offset for a :manual mode topic-partition consumer.

Only valid for topic-partitions whose Klife.Consumer.ConsumerGroup.TopicConfig has mode: :manual. The commit is dispatched through the consumer group's shared Klife.Consumer.Committer and is fire-and-forget from the caller's perspective — there is no synchronous confirmation that the offset reached the coordinator. The returned :ok only means the commit request was queued.

The committed offset must not exceed the highest offset returned by pull/5 for this partition; passing an unprocessed offset will leak commits ahead of what was actually processed.

pull(client_name, cg_mod, group_name, topic_name, partition, opts \\ [])

@spec pull(
  client :: atom(),
  cg_mod :: module(),
  group_name :: String.t(),
  topic :: String.t(),
  partition :: integer(),
  opts :: [{:owner, pid()}]
) :: {:ok, [Klife.Record.t()] | :empty} | {:error, :restarting}

Pulls the next buffered batch of records from a :manual mode topic-partition consumer.

Only valid for topic-partitions whose Klife.Consumer.ConsumerGroup.TopicConfig has mode: :manual.

Pulled records must be committed via commit/6 once processed. The consumer's revoke handshake waits for all pulled offsets to be committed or timeout before releasing the partition, this behaviour may lead to slower rebalances if not handled correctly.

Ownership

Pulled records exist only in the receiving process memory until their offsets are committed, so the consumer treats them as leased to an owner process:

  • The owner defaults to the calling process and can be overridden with the :owner option (e.g. pull from a helper process while a long-lived process owns the lease and issues the commits).
  • The consumer monitors the owner. If the owner dies while the lease has uncommitted offsets, the consumer restarts and the records are redelivered from the last committed offset (at-least-once is preserved). If the owner dies with everything committed, the lease is simply released.
  • One owner per topic-partition at a time: pulling from a new owner while a previous owner still has uncommitted offsets restarts the consumer the same way, returning {:error, :restarting} to the new owner.

Options

  • :owner - pid that owns the pulled records lease. Defaults to the caller.