View Source KafkaEx.Support.Retry (kafka_ex v1.0.0)

Unified retry logic with exponential backoff for KafkaEx operations.

This module provides:

  • Exponential backoff calculation with optional cap
  • Generic retry wrapper function
  • Error classifiers for common Kafka error patterns

Usage

# Simple retry with defaults (3 retries, 100ms base delay)
Retry.with_retry(fn -> some_operation() end)

# Custom retry configuration
Retry.with_retry(
  fn -> some_operation() end,
  max_retries: 5,
  base_delay_ms: 200,
  max_delay_ms: 5000,
  retryable?: &Retry.transient_error?/1
)

# Just calculate backoff delay
delay = Retry.backoff_delay(attempt, 100, 5000)

Summary

Functions

Calculate exponential backoff delay.

Check if a commit error is fatal and requires rejoining the consumer group.

Check if error is safe to retry for commit operations.

Check if a commit error is terminal: the consumer must stop, NOT rejoin, NOT retry.

Check if error is safe to retry for consumer group operations.

Check if error is a coordinator-related error that may resolve with retry.

Check if error is a leadership-related error requiring metadata refresh.

Check if error is safe to retry for produce operations.

Check if error is a transient error that may resolve with retry.

Execute a function with retry and exponential backoff.

Types

@type error() :: atom() | term()
@type retry_opts() :: [
  max_retries: non_neg_integer(),
  base_delay_ms: non_neg_integer(),
  max_delay_ms: non_neg_integer() | :infinity,
  retryable?: (error() -> boolean()),
  on_retry: (error(), non_neg_integer(), non_neg_integer() -> :ok) | nil
]
@type retry_result() :: {:ok, term()} | {:error, error()}

Functions

Link to this function

backoff_delay(attempt, base_ms, max_ms \\ :infinity)

View Source
@spec backoff_delay(
  non_neg_integer(),
  non_neg_integer(),
  non_neg_integer() | :infinity
) ::
  non_neg_integer()

Calculate exponential backoff delay.

Parameters

  • attempt - Zero-based attempt number (0 = first retry)
  • base_ms - Base delay in milliseconds
  • max_ms - Maximum delay cap (default: :infinity)

Examples

iex> Retry.backoff_delay(0, 100)
100

iex> Retry.backoff_delay(1, 100)
200

iex> Retry.backoff_delay(2, 100)
400

iex> Retry.backoff_delay(10, 100, 5000)
5000
Link to this function

commit_fatal_error?(arg1)

View Source
@spec commit_fatal_error?(error()) :: boolean()

Check if a commit error is fatal and requires rejoining the consumer group.

These errors cannot be fixed by retry — the consumer's generation or member_id is no longer valid. Callers must propagate these to the group manager to trigger a rebalance.

Follows Java ConsumerCoordinator.OffsetCommitResponseHandler, brod stabilize/3, kafka-python _handle_offset_commit_response patterns.

Note: :rebalance_in_progress is intentionally NOT in this set — it is retryable; see commit_retryable?/1. :fenced_instance_id and several authorization/size errors are classified as commit_terminal_error?/1.

Link to this function

commit_retryable?(error)

View Source
@spec commit_retryable?(error()) :: boolean()

Check if error is safe to retry for commit operations.

Commits are idempotent so we can safely retry on transient errors.

:rebalance_in_progress is also retryable: per Java ConsumerCoordinator.OffsetCommitResponseHandler, the broker signals that a rebalance is in flight — we let the heartbeat path drive the eventual rebalance and simply let the commit retry succeed once the rebalance completes. Eager rejoin on this code wastes a round trip and can double- rebalance.

:unstable_offset_commit (KIP-447) — the broker is waiting on an in-progress transaction; retry is the correct handling.

Link to this function

commit_terminal_error?(arg1)

View Source
@spec commit_terminal_error?(error()) :: boolean()

Check if a commit error is terminal: the consumer must stop, NOT rejoin, NOT retry.

  • :fenced_instance_id (KIP-345) — another member has taken this group.instance.id. Rejoining would either split-brain (two consumers claim the same static slot) or be fenced again. Java raises FencedInstanceIdException; librdkafka and brod also treat as non-recoverable.

  • :group_authorization_failed / :topic_authorization_failed — the credentials don't grant access to the group or topic. Java raises GroupAuthorizationException / TopicAuthorizationException out of poll(). Retrying or rejoining will not help.

  • :offset_metadata_too_large / :invalid_commit_offset_size — malformed commit payload. Java treats both as non-retriable KafkaException. Retrying would hot-loop on a caller bug.

Link to this function

consumer_group_retryable?(error)

View Source
@spec consumer_group_retryable?(error()) :: boolean()

Check if error is safe to retry for consumer group operations.

Following Java client pattern (KAFKA-6829): includes coordinator errors, transient errors, and UNKNOWN_TOPIC_OR_PARTITION.

Link to this function

coordinator_error?(arg1)

View Source
@spec coordinator_error?(error()) :: boolean()

Check if error is a coordinator-related error that may resolve with retry.

These errors typically occur during consumer group operations when the coordinator is unavailable or changing.

@spec leadership_error?(error()) :: boolean()

Check if error is a leadership-related error requiring metadata refresh.

These errors indicate the client has stale partition leadership information. Safe to retry for ALL request types including produce.

Link to this function

produce_retryable?(error)

View Source
@spec produce_retryable?(error()) :: boolean()

Check if error is safe to retry for produce operations.

Produce retries are only safe for leadership errors where we know the message wasn't written. Timeout errors are NOT safe because the message may have been written but the response lost.

Note: For truly idempotent produces, enable enable.idempotence=true on the Kafka producer (requires Kafka 0.11+).

@spec transient_error?(error()) :: boolean()

Check if error is a transient error that may resolve with retry.

Includes timeouts, parse errors, connection issues, and coordinator errors.

Link to this function

with_retry(fun, opts \\ [])

View Source
@spec with_retry((-> retry_result()), retry_opts()) :: retry_result()

Execute a function with retry and exponential backoff.

Options

  • :max_retries - Maximum number of retry attempts (default: 3)
  • :base_delay_ms - Base delay for exponential backoff (default: 100)
  • :max_delay_ms - Maximum delay cap (default: :infinity)
  • :retryable? - Function to determine if error is retryable (default: always true)
  • :on_retry - Optional callback (error, attempt, delay) -> :ok for logging

Returns

  • {:ok, result} on success
  • {:error, last_error} after all retries exhausted or non-retryable error

Examples

# Retry any error up to 3 times
Retry.with_retry(fn -> fetch_data() end)

# Only retry specific errors
Retry.with_retry(
  fn -> commit_offset() end,
  retryable?: &Retry.coordinator_error?/1
)