View Source KafkaEx.Support.Retry (kafka_ex v1.0.0)
Unified retry logic with exponential backoff for KafkaEx operations.
This module provides:
- Exponential backoff calculation with optional cap
- Generic retry wrapper function
- Error classifiers for common Kafka error patterns
Usage
# Simple retry with defaults (3 retries, 100ms base delay)
Retry.with_retry(fn -> some_operation() end)
# Custom retry configuration
Retry.with_retry(
fn -> some_operation() end,
max_retries: 5,
base_delay_ms: 200,
max_delay_ms: 5000,
retryable?: &Retry.transient_error?/1
)
# Just calculate backoff delay
delay = Retry.backoff_delay(attempt, 100, 5000)
Summary
Functions
Calculate exponential backoff delay.
Check if a commit error is fatal and requires rejoining the consumer group.
Check if error is safe to retry for commit operations.
Check if a commit error is terminal: the consumer must stop, NOT rejoin, NOT retry.
Check if error is safe to retry for consumer group operations.
Check if error is a coordinator-related error that may resolve with retry.
Check if error is a leadership-related error requiring metadata refresh.
Check if error is safe to retry for produce operations.
Check if error is a transient error that may resolve with retry.
Execute a function with retry and exponential backoff.
Types
@type retry_opts() :: [ max_retries: non_neg_integer(), base_delay_ms: non_neg_integer(), max_delay_ms: non_neg_integer() | :infinity, retryable?: (error() -> boolean()), on_retry: (error(), non_neg_integer(), non_neg_integer() -> :ok) | nil ]
Functions
@spec backoff_delay( non_neg_integer(), non_neg_integer(), non_neg_integer() | :infinity ) :: non_neg_integer()
Calculate exponential backoff delay.
Parameters
attempt- Zero-based attempt number (0 = first retry)base_ms- Base delay in millisecondsmax_ms- Maximum delay cap (default::infinity)
Examples
iex> Retry.backoff_delay(0, 100)
100
iex> Retry.backoff_delay(1, 100)
200
iex> Retry.backoff_delay(2, 100)
400
iex> Retry.backoff_delay(10, 100, 5000)
5000
Check if a commit error is fatal and requires rejoining the consumer group.
These errors cannot be fixed by retry — the consumer's generation or member_id is no longer valid. Callers must propagate these to the group manager to trigger a rebalance.
Follows Java ConsumerCoordinator.OffsetCommitResponseHandler, brod stabilize/3, kafka-python _handle_offset_commit_response patterns.
Note: :rebalance_in_progress is intentionally NOT in this set — it is
retryable; see commit_retryable?/1. :fenced_instance_id and several
authorization/size errors are classified as commit_terminal_error?/1.
Check if error is safe to retry for commit operations.
Commits are idempotent so we can safely retry on transient errors.
:rebalance_in_progress is also retryable: per Java
ConsumerCoordinator.OffsetCommitResponseHandler, the broker signals that
a rebalance is in flight — we let the heartbeat path drive the eventual
rebalance and simply let the commit retry succeed once the rebalance
completes. Eager rejoin on this code wastes a round trip and can double-
rebalance.
:unstable_offset_commit (KIP-447) — the broker is waiting on an
in-progress transaction; retry is the correct handling.
Check if a commit error is terminal: the consumer must stop, NOT rejoin, NOT retry.
:fenced_instance_id(KIP-345) — another member has taken thisgroup.instance.id. Rejoining would either split-brain (two consumers claim the same static slot) or be fenced again. Java raisesFencedInstanceIdException; librdkafka and brod also treat as non-recoverable.:group_authorization_failed/:topic_authorization_failed— the credentials don't grant access to the group or topic. Java raisesGroupAuthorizationException/TopicAuthorizationExceptionout ofpoll(). Retrying or rejoining will not help.:offset_metadata_too_large/:invalid_commit_offset_size— malformed commit payload. Java treats both as non-retriableKafkaException. Retrying would hot-loop on a caller bug.
Check if error is safe to retry for consumer group operations.
Following Java client pattern (KAFKA-6829): includes coordinator errors, transient errors, and UNKNOWN_TOPIC_OR_PARTITION.
Check if error is a coordinator-related error that may resolve with retry.
These errors typically occur during consumer group operations when the coordinator is unavailable or changing.
Check if error is a leadership-related error requiring metadata refresh.
These errors indicate the client has stale partition leadership information. Safe to retry for ALL request types including produce.
Check if error is safe to retry for produce operations.
Produce retries are only safe for leadership errors where we know the message wasn't written. Timeout errors are NOT safe because the message may have been written but the response lost.
Note: For truly idempotent produces, enable enable.idempotence=true
on the Kafka producer (requires Kafka 0.11+).
Check if error is a transient error that may resolve with retry.
Includes timeouts, parse errors, connection issues, and coordinator errors.
@spec with_retry((-> retry_result()), retry_opts()) :: retry_result()
Execute a function with retry and exponential backoff.
Options
:max_retries- Maximum number of retry attempts (default: 3):base_delay_ms- Base delay for exponential backoff (default: 100):max_delay_ms- Maximum delay cap (default::infinity):retryable?- Function to determine if error is retryable (default: always true):on_retry- Optional callback(error, attempt, delay) -> :okfor logging
Returns
{:ok, result}on success{:error, last_error}after all retries exhausted or non-retryable error
Examples
# Retry any error up to 3 times
Retry.with_retry(fn -> fetch_data() end)
# Only retry specific errors
Retry.with_retry(
fn -> commit_offset() end,
retryable?: &Retry.coordinator_error?/1
)