Klife.Record (Klife v1.0.0)
View SourceKafka record representation.
Represents a Kafka record struct used across all Klife.Client APIs for
both producer and consumer.
Producer API
When used as input to a produce call, the following fields are relevant:
:value(required):topic(required):key(optional):headers(optional, defaults to[]):partition(optional, if omitted, the configured partitioner will assign one)
On success, the returned record is enriched with:
:offset: the offset assigned by the broker:partition: the partition the record was written to (if not provided as input):error_code: set on failure (see Kafka protocol error codes)
Consumer API
Records returned by the fetch functions or delivered to a consumer group callback are fully populated structs. In addition to the fields listed above, the following are also set:
:batch_attributes: metadata about the record batch (see Kafka protocol record batch):is_aborted:trueif the record belongs to an aborted transaction:consumer_attempts: number of times this record has been delivered to a consumer callback (starts at0, incremented on each retry by the consumer group). This is a best-effort, in-memory counter. It may be reset to0after rebalances or unexpected crashes
Summary
Functions
Filters a list of Klife.Record structs returned by the fetch API.
Utility function to verify if all records in a produce_batch/3 were successfully produced.
Same as verify_batch/1 but raises if any record fails and does not return ok/error tuple.
Types
@type t() :: %Klife.Record{ __batch_index: term(), __callback: term(), __estimated_size: term(), batch_attributes: term(), consumer_attempts: nil | non_neg_integer(), error_code: integer(), headers: [%{key: binary(), value: binary()}], is_aborted: term(), key: binary(), offset: non_neg_integer(), partition: non_neg_integer(), topic: String.t(), value: binary() }
Functions
Filters a list of Klife.Record structs returned by the fetch API.
Useful when working with the standalone fetch functions to remove records that are not relevant to your application logic.
Options
:base_offset- Drop all records with an offset strictly less than this value. Defaults to-1(no records dropped).:exclude_control- Whentrue, removes internal Kafka control records (e.g. transaction markers). Defaults tofalse.:exclude_aborted- Whentrue, removes records that belong to aborted transactions. Defaults tofalse.
Utility function to verify if all records in a produce_batch/3 were successfully produced.
Examples
iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> input = [rec1, rec2, rec3]
iex> {:ok, [%Klife.Record{value: "my_val_1"}, _r2, _r3]} = MyClient.produce_batch(input) |> Klife.Record.verify_batch()Partial error example. Notice that records 1 and 3 were successfully produced and only record 2
has errors, so the function will return {:error, [rec1, rec2, rec3]}
iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: :rand.bytes(2_000_000), topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> input = [rec1, rec2, rec3]
iex> {:error, [_rec1, %Klife.Record{error_code: 10}, _rec3]} = MyClient.produce_batch(input) |> Klife.Record.verify_batch()
Same as verify_batch/1 but raises if any record fails and does not return ok/error tuple.
Examples
iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> input = [rec1, rec2, rec3]
iex> [_r1, _r2, _r3] = MyClient.produce_batch(input) |> Klife.Record.verify_batch!()