Klife.Record (Klife v1.0.0)

View Source

Kafka record representation.

Represents a Kafka record struct used across all Klife.Client APIs for both producer and consumer.

Producer API

When used as input to a produce call, the following fields are relevant:

  • :value (required)
  • :topic (required)
  • :key (optional)
  • :headers (optional, defaults to [])
  • :partition (optional, if omitted, the configured partitioner will assign one)

On success, the returned record is enriched with:

  • :offset: the offset assigned by the broker
  • :partition: the partition the record was written to (if not provided as input)
  • :error_code: set on failure (see Kafka protocol error codes)

Consumer API

Records returned by the fetch functions or delivered to a consumer group callback are fully populated structs. In addition to the fields listed above, the following are also set:

  • :batch_attributes: metadata about the record batch (see Kafka protocol record batch)
  • :is_aborted: true if the record belongs to an aborted transaction
  • :consumer_attempts: number of times this record has been delivered to a consumer callback (starts at 0, incremented on each retry by the consumer group). This is a best-effort, in-memory counter. It may be reset to 0 after rebalances or unexpected crashes

Summary

Functions

Filters a list of Klife.Record structs returned by the fetch API.

t()

Utility function to verify if all records in a produce_batch/3 were successfully produced.

Same as verify_batch/1 but raises if any record fails and does not return ok/error tuple.

Types

t()

@type t() :: %Klife.Record{
  __batch_index: term(),
  __callback: term(),
  __estimated_size: term(),
  batch_attributes: term(),
  consumer_attempts: nil | non_neg_integer(),
  error_code: integer(),
  headers: [%{key: binary(), value: binary()}],
  is_aborted: term(),
  key: binary(),
  offset: non_neg_integer(),
  partition: non_neg_integer(),
  topic: String.t(),
  value: binary()
}

Functions

filter_records(rec_list, opts \\ [])

Filters a list of Klife.Record structs returned by the fetch API.

Useful when working with the standalone fetch functions to remove records that are not relevant to your application logic.

Options

  • :base_offset - Drop all records with an offset strictly less than this value. Defaults to -1 (no records dropped).
  • :exclude_control - When true, removes internal Kafka control records (e.g. transaction markers). Defaults to false.
  • :exclude_aborted - When true, removes records that belong to aborted transactions. Defaults to false.

t()

verify_batch(produce_resps)

Utility function to verify if all records in a produce_batch/3 were successfully produced.

Examples

iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> input = [rec1, rec2, rec3]
iex> {:ok, [%Klife.Record{value: "my_val_1"}, _r2, _r3]} = MyClient.produce_batch(input) |> Klife.Record.verify_batch()

Partial error example. Notice that records 1 and 3 were successfully produced and only record 2 has errors, so the function will return {:error, [rec1, rec2, rec3]}

iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: :rand.bytes(2_000_000), topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> input = [rec1, rec2, rec3]
iex> {:error, [_rec1, %Klife.Record{error_code: 10}, _rec3]} = MyClient.produce_batch(input) |> Klife.Record.verify_batch()

verify_batch!(produce_resps)

Same as verify_batch/1 but raises if any record fails and does not return ok/error tuple.

Examples

iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> input = [rec1, rec2, rec3]
iex> [_r1, _r2, _r3] = MyClient.produce_batch(input) |> Klife.Record.verify_batch!()