Klife.Consumer.ConsumerGroup.TopicConfig (Klife v1.1.0)
View SourceDefines a topic configuration for a Klife.Consumer.ConsumerGroup.
Each entry of the consumer group's :topics option is a TopicConfig keyword
list, controlling how a single topic is consumed inside the group.
Configuration options
:name(String.t/0) - Required. Name of the topic the consumer group will consume records from:fetch_strategy({:exclusive, fetcher_options}or{:shared, fetcher_name}) - Overrides the fetch strategy defined at the consumer group level for this topic.The override semantics differ slightly from the group-level option:
{:exclusive, fetcher_options}: starts a new fetcher dedicated to this single topic (not shared with other topics in the group).{:shared, fetcher_name}: uses the given pre-existing fetcher only for this topic.
If not set, the topic uses the fetcher selected by the consumer group's
:fetch_strategy.:isolation_level- May override the isolation level defined on the consumer group The default value is:read_committed.:offset_reset_policy- Define from which offset the consumer will start processing records when no previous committed offset is found. The default value is:latest.:fetch_max_bytes(non_neg_integer/0) - The maximum amount of bytes to fetch in a single request. Must be lower than fetcher configmax_bytes_per_request. The default value is1000000.:max_queue_size(non_neg_integer/0) - The maximum number of record batches that the consumer can keep on its internal queue. Defaults to 5 if:handler_max_batch_sizeis:dynamic, otherwise defaults to 20.:fetch_interval_ms(non_neg_integer/0) - Time in milliseconds that the consumer will wait before trying to fetch new data from the broker after it runs out of records to process.The consumer always tries to optimize fetch requests wait times by issuing requests before it's internal queue is empty (the current threshold is 1 batch). Therefore this option is only used for the wait time after a fetch request returns empty.
The default value is
1000.:handler_cooldown_ms(non_neg_integer/0) - Time in milliseconds that the consumer will wait before handling new records. Can be overrided for one cycle by the handler return value.Ignored when
:modeis:manual: pacing between pulls is the external driver's responsibility.The default value is
0.:handler_max_unacked_commits(non_neg_integer/0) - Controls how many records can be waiting for commit confirmation before the consumer stops processing new records.When this limit is reached, processing pauses until confirmations are received.
Set it to 0 to process records one batch at a time so each processing cycle must be fully committed before starting the next.
Ignored when
:modeis:manual: the external driver controls how many uncommitted batches it pulls, so the consumer never throttles pulls on pending commits.The default value is
0.:handler_max_batch_size- The maximum amount of records that will be delivered to the handler in each processing cycle. If:dynamicall records retrieved in the fetch request will be delivered as one single batch to the handler. If positive integer, retrieved records will be chunked into the provided size.In
:manualmode this still applies: it controls the size of each batch returned bypull/5(chunking happens at fetch time, before the queue).The default value is
:dynamic.:mode- Controls how records are delivered to user code.:auto(default): the consumer drivesKlife.Consumer.ConsumerGroup.handle_record_batch/4and routes its return value into commits and retries.:manual: the consumer fetches and buffers records but does not invoke any callback. An external driver pulls batches viaKlife.Consumer.ConsumerGroup.pull/5and triggers commits viaKlife.Consumer.ConsumerGroup.commit/6. The:handler_cooldown_msand:handler_max_unacked_commitsoptions are ignored in this mode;:handler_max_batch_sizestill applies and controls the size of each batch returned bypull/5.
Manual mode exists so external pipelines (e.g. a Broadway producer) can reuse Klife's fetch, buffering, membership and rebalance machinery while driving processing themselves.
The default value is
:auto.
Configuration inheritance
Each option resolves in the following order, from highest to lowest precedence:
- The value set directly on the topic entry inside
:topics. - The value set on the consumer group's
:default_topic_config. - The built-in default shown above.
Tuning for throughput
Effective throughput is the product of how many records each handler call
processes (:handler_max_batch_size) and how often the handler runs. With
the default :handler_max_unacked_commits of 0, each batch must be fully
committed before the next is delivered, so every batch incurs one commit
roundtrip to the coordinator. This means small batches under the default
commit policy degenerate into one network roundtrip per record.
Two ways to keep throughput up:
- Leave
:handler_max_batch_sizeas:dynamic(the default) or a high value, amortizing the commit roundtrip across many records. - Raise
:handler_max_unacked_commitswhen smaller, fixed-size batches are required for memory or latency reasons. The handler keeps processing while a commit is in flight, and pending higher offsets are coalesced into the next commit request, decoupling handler cadence from commit latency.
:max_queue_size caps how many fetched batches the consumer may buffer
in-process before throttling further fetches. Its default is paired with
:handler_max_batch_size: 5 for dynamic batches (each can be large), 20
for fixed-size ones. Override it when you need a tighter memory bound or
deeper prefetch.
Tuning for memory
A separate consumer process is started for every topic/partition assigned to this node, and each one buffers records independently. The total memory footprint of a consumer group on a node is therefore roughly:
assigned_partitions * per_consumer_bufferwhere per_consumer_buffer is dominated by the in-process records queue.
When sizing, remember that partition assignments shift as members join and
leave the group: a node may transiently hold many more partitions than its
steady-state share, so budget for the worst case rather than the average.
The per-consumer upper bound depends on :handler_max_batch_size:
:dynamic(default): each fetch produces a single batch enqueued as-is. The queue can hold up to:max_queue_sizesuch batches, each up to:fetch_max_bytesin size, so the worst-case buffer ismax_queue_size * fetch_max_bytesbytes per consumer.Fixed integer: records from a fetch are chunked into batches of that size and pushed onto the queue. Bytes still come from fetches, but the queue cap is enforced in chunks, so the worst-case buffer is roughly
max_queue_size * handler_max_batch_size * avg_record_sizebytes (and is additionally bounded by what a single fetch can deliver).
Levers to reduce memory pressure, in order of impact:
- Lower
:fetch_max_bytesto shrink each fetch payload. This is the most direct knob since it caps the bytes pulled per request. - Lower
:max_queue_sizeto reduce how many batches sit in-process at once. Setting it too low can make consumers idle between fetch requests. - With a fixed
:handler_max_batch_size, smaller batches reduce per-chunk memory but increase commit overhead unless:handler_max_unacked_commitsis also raised (see "Tuning for throughput").
Note that :handler_max_unacked_commits does not add buffered records on
top of the queue: records being processed have already been dequeued, so
raising it trades commit-roundtrip latency for throughput without changing
the buffer ceiling.