Klife.Consumer.ConsumerGroup.TopicConfig (Klife v1.0.0)
View SourceDefines a topic configuration for a Klife.Consumer.ConsumerGroup.
Each entry of the consumer group's :topics option is a TopicConfig keyword
list, controlling how a single topic is consumed inside the group.
Configuration options
:name(String.t/0) - Required. Name of the topic the consumer group will consume records from:fetch_strategy({:exclusive, fetcher_options}or{:shared, fetcher_name}) - Overrides the fetch strategy defined at the consumer group level for this topic.The override semantics differ slightly from the group-level option:
{:exclusive, fetcher_options}: starts a new fetcher dedicated to this single topic (not shared with other topics in the group).{:shared, fetcher_name}: uses the given pre-existing fetcher only for this topic.
If not set, the topic uses the fetcher selected by the consumer group's
:fetch_strategy.:isolation_level- May override the isolation level defined on the consumer group The default value is:read_committed.:offset_reset_policy- Define from which offset the consumer will start processing records when no previous committed offset is found. The default value is:latest.:fetch_max_bytes(non_neg_integer/0) - The maximum amount of bytes to fetch in a single request. Must be lower than fetcher configmax_bytes_per_request. The default value is1000000.:max_queue_size(non_neg_integer/0) - The maximum number of record batches that the consumer can keep on its internal queue. Defaults to 5 if:handler_max_batch_sizeis:dynamic, otherwise defaults to 20.:fetch_interval_ms(non_neg_integer/0) - Time in milliseconds that the consumer will wait before trying to fetch new data from the broker after it runs out of records to process.The consumer always tries to optimize fetch requests wait times by issuing requests before it's internal queue is empty (the current threshold is 1 batch). Therefore this option is only used for the wait time after a fetch request returns empty.
The default value is
1000.:handler_cooldown_ms(non_neg_integer/0) - Time in milliseconds that the consumer will wait before handling new records. Can be overrided for one cycle by the handler return value. The default value is0.:handler_max_unacked_commits(non_neg_integer/0) - Controls how many records can be waiting for commit confirmation before the consumer stops processing new records.When this limit is reached, processing pauses until confirmations are received.
Set it to 0 to process records one batch at a time so each processing cycle must be fully committed before starting the next.
The default value is
0.:handler_max_batch_size- The maximum amount of records that will be delivered to the handler in each processing cycle. If:dynamicall records retrieved in the fetch request will be delivered as one single batch to the handler. If positive integer, retrieved records will be chunked into the provided size. The default value is:dynamic.
Configuration inheritance
Each option resolves in the following order, from highest to lowest precedence:
- The value set directly on the topic entry inside
:topics. - The value set on the consumer group's
:default_topic_config. - The built-in default shown above.
Tuning for throughput
Effective throughput is the product of how many records each handler call
processes (:handler_max_batch_size) and how often the handler runs. With
the default :handler_max_unacked_commits of 0, each batch must be fully
committed before the next is delivered, so every batch incurs one commit
roundtrip to the coordinator. This means small batches under the default
commit policy degenerate into one network roundtrip per record.
Two ways to keep throughput up:
- Leave
:handler_max_batch_sizeas:dynamic(the default) or a high value, amortizing the commit roundtrip across many records. - Raise
:handler_max_unacked_commitswhen smaller, fixed-size batches are required for memory or latency reasons. The handler keeps processing while a commit is in flight, and pending higher offsets are coalesced into the next commit request, decoupling handler cadence from commit latency.
:max_queue_size caps how many fetched batches the consumer may buffer
in-process before throttling further fetches. Its default is paired with
:handler_max_batch_size: 5 for dynamic batches (each can be large), 20
for fixed-size ones. Override it when you need a tighter memory bound or
deeper prefetch.