Klife.Client behaviour (Klife v1.0.0)

View Source

Defines a kafka client.

To use it you must do 3 steps:

  • Use it in a module
  • Config the module on your config file
  • Start the module on your supervision tree

Using it in a module

When used it expects an :otp_app option that is the OTP application that has the client configuration.

defmodule MyApp.MyClient do
  use Klife.Client, otp_app: :my_app
end

use Klife.Client

When you use Klife.Client, it will extend your module in two ways:

  • Define it as a proxy to a subset of the functions on Klife module, using its module name as the client_name parameter. One example of this is MyApp.MyClient.produce/2 that forwards both arguments to the same function and injects MyApp.MyClient as an argument.

  • Define it as a supervisor by calling use Supervisor and implementing some related functions such as start_link/1 and init/1, so it can be started under on your app supervision tree.

Configuration

The client has a bunch of configuration options, you can read more below. But it will look somehting like this:

config :my_app, MyApp.MyClient,
  connection: [
    bootstrap_servers: ["localhost:19092", "localhost:29092"],
    ssl: false
  ],
  producers: [
    [
      name: :my_custom_producer,
      linger_ms: 5,
      max_in_flight_requests: 10
    ]
  ],
  topics: [
    [name: "my_topic_0", producer: :my_custom_producer]
  ]

You can see more configuration examples on the "Client configuration examples" section or an working application example on the example folder on the project's repository.

Configuration options:

  • :connection (non-empty keyword/0) - Required.

    • :bootstrap_servers (list of String.t/0) - Required. List of servers to establish the initial connection. (eg: ["localhost:9092", "localhost:9093"])

    • :ssl (boolean/0) - Specify the underlying socket module. Use :ssl if true and :gen_tcp if false. The default value is false.

    • :connection_count (pos_integer/0) - How many TCP connections the client will maintain for each broker. The default value is 1.

    • :connect_opts (list of term/0) - Options used to configure the socket connection, which are forwarded to the connect/3 function of the underlying socket module (see ssl option above.). The default value is [inet_backend: :socket, active: false].

    • :socket_opts (list of term/0) - Options used to configure the open socket, which are forwarded to the setopts/2 function of the underlying socket module :inet for :gen_tcp and :ssl for :ssl (see ssl option above.). The default value is [keepalive: true].

    • :sasl_opts (list of term/0) - Options to configure SASL authentication, see SASL section for supported mechanisms and examples. The default value is [].

    • :default_request_timeout_ms (pos_integer/0) - The default request timeout ms to be used on all requests to brokers. Some other options may override this value in specific scenarios The default value is 30000.

  • :default_producer (atom/0) - Name of the producer to be used on produce API calls when a specific producer is not provided via configuration or option. If not provided a default producer will be started automatically. The default value is :klife_default_producer.

  • :producers (List of Klife.Producer configurations) - List of configurations, each starting a new producer for use with produce api. The default value is [].

  • :default_partitioner (atom/0) - Partitioner module to be used on produce API calls when a specific partitioner is not provided via configuration or option. The default value is Klife.Producer.DefaultPartitioner.

  • :default_txn_pool (atom/0) - Name of the txn pool to be used on transactions when a :pool_name is not provided as an option. If not provided a default txn pool will be started automatically. The default value is :klife_default_txn_pool.

  • :txn_pools (List of Klife.TxnProducerPool configurations) - List of configurations, each starting a pool of transactional producers for use with transactional api. The default value is [].

  • :topics (List of Klife.Topic configurations) - List of topics that may have special configurations The default value is [].

  • :default_fetcher (atom/0) - Name of the default fetcher to be used on the consumer API when a specific fetcher is not provided via configuration or option. If not provided a default fetcher will be started automatically. The default value is :klife_default_fetcher.

  • :fetchers (List of Klife.Consumer.Fetcher configurations) - List of configurations, each starting a new fetcher to be used with consumer api. The default value is [].

  • :disabled_features (List atoms representing a features to disable.) - :producer disable producer feature. :txn_producer disables transactions. The default value is [].

  • :enable_unkown_topics (boolean/0) - Define if the client will be able to work with non configured topics. When true the client will collect metadata for all known topics of the cluster, this may have performance impacts on large clusters. When set to false the client will collect metadata only for topics defined on the topics options. The default value is true.

Starting it

Finally, it must be started on your application. It will look something like this:

defmodule MyApp.Application do
  def start(_type, _args) do
    children = [
      # some other modules...,
      MyApp.MyClient
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Producer API overview

In order to interact with the producer API you will work with Klife.Record module as your main input and output data structure.

Usually you will give an record to some producer API function and it will return an enriched record with some new attributes based on what happened.

So in summary the interaction goes like this:

  • Build one or more Klife.Record
  • Pass it to some producer API function
  • Receive an enriched version of the provided records
rec = %Klife.Record{value: "some_val", topic: "my_topic_1"}
{:ok, %Klife.Record{offset: offset, partition: partition}} = MyClient.produce(rec)

Consumer API overview

For continuous consumption of records, the recommended approach is to use Klife.Consumer.ConsumerGroup. It handles partition assignment, offset tracking, rebalancing, and fault tolerance automatically. See the Klife.Consumer.ConsumerGroup docs for details.

The client also exposes lower-level fetch functions fetch/4, fetch/2, and fetch_async/4, for cases where a full consumer group is not needed. These are intended for standalone use: one-off reads, backfilling, debugging, or any workflow where you manage offsets yourself.

The fetch functions return Klife.Record structs, the same data structure used throughout the producer API. Each returned record is fully enriched with all the relevant data available.

# Fetch a batch of records starting at a given offset
{:ok, records} = MyClient.fetch("my_topic_1", 0, 42)

# Fetch from multiple topic/partition/offset combinations in one request
tpo_list = [{"my_topic_1", 0, 42}, {"my_topic_2", 3, 0}]
%{{"my_topic_1", 0, 42} => {:ok, _}} = MyClient.fetch(tpo_list)

# Async fetch result delivered as a message to the calling process
{:ok, _pid} = MyClient.fetch_async("my_topic_1", 0, 42)

Summary

Consumer API

Fetch records from multiple topic/partition/offset combinations in a single request.

Fetch records from a single topic/partition starting at the given offset.

Sends an asynchronous fetch request for a single topic/partition/offset.

Producer API

Produce a single record.

Produce a single record asynchronoulsy.

Produce a batch of records.

Produce a batch of records asynchronoulsy.

Transaction API

Transactionally produce a batch of records.

Runs the given function inside a transaction.

Consumer API

fetch(tpo_list, opts)

@callback fetch(
  tpo_list :: [
    {topic :: String.t(), partition :: non_neg_integer(),
     offset :: non_neg_integer()}
  ],
  opts :: Keyword.t()
) :: %{
  required({String.t(), non_neg_integer(), non_neg_integer()}) =>
    {:ok, list_of_records()} | {:error, any()}
}

Fetch records from multiple topic/partition/offset combinations in a single request.

Accepts a list of {topic, partition, offset} tuples and returns a map where each key is the input {topic, partition, offset} tuple and the value is an {:ok, list_of_records} or {:error, reason} tuple.

This is more efficient than calling fetch/4 multiple times in a loop because requests targeting the same broker are automatically batched into a single TCP request.

Options

  • :fetcher (atom/0) - Fetcher's name that will override the default_fetcher configuration.

  • :isolation_level - Controls whether the fetch response includes uncommitted transactional records. Defaults to :read_committed.

  • :max_bytes (non_neg_integer/0) - Maximum number of bytes to return per fetch request. Defaults to 100_000 for fetch/4 and 500_000 for fetch_async/4.

Examples

iex> rec1 = %Klife.Record{value: "val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "val_2", topic: "my_topic_2"}
iex> [{:ok, %Klife.Record{offset: o1, partition: p1}}, {:ok, %Klife.Record{offset: o2, partition: p2}}] = MyClient.produce_batch([rec1, rec2])
iex> tpo_list = [{"my_topic_1", p1, o1}, {"my_topic_2", p2, o2}]
iex> %{{"my_topic_1", ^p1, ^o1} => {:ok, resp1}, {"my_topic_2", ^p2, ^o2} => {:ok, resp2}} = MyClient.fetch(tpo_list)
iex> [%Klife.Record{value: "val_1"}] = resp1
iex> [%Klife.Record{value: "val_2"}] = resp2

fetch(topic, partition, offset, opts)

@callback fetch(
  topic :: String.t(),
  partition :: non_neg_integer(),
  offset :: non_neg_integer(),
  opts :: Keyword.t()
) :: {:ok, list_of_records()} | {:error, any()}

Fetch records from a single topic/partition starting at the given offset.

Returns an {:ok, list_of_records} tuple where the list contains all records available from offset up to max_bytes worth of data. Returns {:error, reason} on failure.

Options

  • :fetcher (atom/0) - Fetcher's name that will override the default_fetcher configuration.

  • :isolation_level - Controls whether the fetch response includes uncommitted transactional records. Defaults to :read_committed.

  • :max_bytes (non_neg_integer/0) - Maximum number of bytes to return per fetch request. Defaults to 100_000 for fetch/4 and 500_000 for fetch_async/4.

Examples

iex> rec1 = %Klife.Record{value: "val_1", topic: "my_topic_1", partition: 0}
iex> rec2 = %Klife.Record{value: "val_2", topic: "my_topic_1", partition: 0}
iex> [{:ok, %Klife.Record{offset: o1}}, {:ok, _}] = MyClient.produce_batch([rec1, rec2])
iex> {:ok, records} = MyClient.fetch("my_topic_1", 0, o1)
iex> [%Klife.Record{value: "val_1"}, %Klife.Record{value: "val_2"}] = records

fetch_async(topic, partition, offset, opts)

@callback fetch_async(
  topic :: String.t(),
  partition :: non_neg_integer(),
  offset :: non_neg_integer(),
  opts :: Keyword.t()
) :: {:ok, pid()} | {:error, any()}

Sends an asynchronous fetch request for a single topic/partition/offset.

Returns {:ok, pid} where pid is the batcher process handling the request, or {:error, reason} if the request could not be enqueued.

The result is delivered as a message to the calling process in the form:

{:klife_fetch_response, {topic, partition, offset}, {:ok, records} | {:error, reason}}

This is useful for building standalone consumers or any process that needs non-blocking fetch semantics. The same primitive that Klife.Consumer.ConsumerGroup uses internally.

Options

  • :fetcher (atom/0) - Fetcher's name that will override the default_fetcher configuration.

  • :isolation_level - Controls whether the fetch response includes uncommitted transactional records. Defaults to :read_committed.

  • :max_bytes (non_neg_integer/0) - Maximum number of bytes to return per fetch request. Defaults to 100_000 for fetch/4 and 500_000 for fetch_async/4.

Examples

iex> {:ok, %Klife.Record{partition: p, offset: o}} = MyClient.produce(%Klife.Record{value: "async_val", topic: "my_topic_1"})
iex> {:ok, _pid} = MyClient.fetch_async("my_topic_1", p, o)
iex> receive do
...>   {:klife_fetch_response, {"my_topic_1", ^p, ^o}, {:ok, records}} ->
...>     [%Klife.Record{value: "async_val"} | _] = records
...> after
...>   5_000 -> raise "timeout"
...> end

Producer API

produce(record, opts)

@callback produce(record(), opts :: Keyword.t()) :: {:ok, record()} | {:error, record()}

Produce a single record.

It expects a Klife.Record struct containg at least :value and :topic and returns an ok/error tuple along side with the enriched version of the input record as described in "Producer API Overview".

Options

  • :producer (atom/0) - Producer's name that will override the default_producer configuration. Ignored inside transactions.

  • :partitioner (atom/0) - Module that will override default_partitioner configuration.

Examples

iex> rec = %Klife.Record{value: "my_val", topic: "my_topic_1"}
iex> {:ok, %Klife.Record{} = enriched_rec} = MyClient.produce(rec)
iex> true = is_number(enriched_rec.offset)
iex> true = is_number(enriched_rec.partition)

produce_async(record, opts)

@callback produce_async(record(), opts :: Keyword.t()) :: :ok

Produce a single record asynchronoulsy.

The same as produce/2 but returns immediately. Accepts a callback option to execute arbitrary code after response is obtained.

Semantics and guarantees

This functions executes the callback using Task.start/1. Therefore there is no guarantees about record delivery or callback execution.

Options

  • :producer (atom/0) - Producer's name that will override the default_producer configuration. Ignored inside transactions.

  • :partitioner (atom/0) - Module that will override default_partitioner configuration.

  • :callback (term/0) - MFA or function/1 that will be called with the produce result. The result is injected as the first argument on MFA and is the only argument for anonymous functions

Examples

Anonymous Function:

iex> rec = %Klife.Record{value: "my_val", topic: "my_topic_1"}
iex> callback = fn resp ->
...>    {:ok, enriched_rec} = resp
...>    true = is_number(enriched_rec.offset)
...>    true = is_number(enriched_rec.partition)
...> end
iex> :ok = MyClient.produce_async(rec, callback: callback)

Using MFA:

iex> defmodule CB do
...>    def exec(resp, my_arg1, my_arg2) do
...>      "my_arg1" = my_arg1
...>      "my_arg2" = my_arg2
...>      {:ok, enriched_rec} = resp
...>      true = is_number(enriched_rec.offset)
...>      true = is_number(enriched_rec.partition)
...>    end
...> end
iex> rec = %Klife.Record{value: "my_val", topic: "my_topic_1"}
iex> :ok = MyClient.produce_async(rec, callback: {CB, :exec, ["my_arg1", "my_arg2"]})

produce_batch(list_of_records, opts)

@callback produce_batch(list_of_records(), opts :: Keyword.t()) :: [
  {:ok | :error, record()}
]

Produce a batch of records.

It expects a list of Klife.Record structs containg at least :value and :topic and returns a list of ok/error tuples along side with the enriched version of the input record as described in "Producer API Overview".

The order of the response tuples on the returning list is the same as the input list. That means the first response tuple will be related to the first record on the input and so on.

Semantics and guarantees

This functions is semantically equivalent to call produce/2 multiple times and wait for all responses. Which means that 2 records sent on the same batch may succeed or fail independently.

In other words that is no atomicity guarentees. If you need it see produce_batch_txn/2.

The input list may contain records related to any topic/partition, for records of the same topic/partition the order between them is guaranteed to be the same of the input, for records of different topic/partition no order is guaranteed between them.

See the partial error example below for more cotext.

Options

  • :producer (atom/0) - Producer's name that will override the default_producer configuration. Ignored inside transactions.

  • :partitioner (atom/0) - Module that will override default_partitioner configuration.

Examples

iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> [{:ok, _resp1}, {:ok, _resp2}, {:ok, _resp3}] = MyClient.produce_batch([rec1, rec2, rec3])

Partial error example:

iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: :rand.bytes(2_000_000), topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> [{:ok, _resp1}, {:error, %Klife.Record{error_code: 10}}, {:ok, _resp3}] = MyClient.produce_batch([rec1, rec2, rec3])

In order to facilitate the response handling you can use Klife.Record.verify_batch/1 or Klife.Record.verify_batch!/1 functions.

produce_batch_async(list, opts)

@callback produce_batch_async([record()], opts :: Keyword.t()) :: :ok

Produce a batch of records asynchronoulsy.

The same as produce_batch/2 but returns immediately. Accepts a callback option to execute arbitrary code after response is obtained

Semantics and guarantees

When callback is provided this functions is implemented as Task.start/1 calling produce_batch/2 and executing the callback right after. Therefore there is no guarantees about record delivery or callback execution.

Beware of process limits

Because this function spawns a new process for every new call with a callback defined it may lead to a high number of processes to be spawned if it is executed inside loops.

In order to avoid this you can increase the batch size you are calling it or increase the system's process limit erlang flag.

Options

  • :producer (atom/0) - Producer's name that will override the default_producer configuration. Ignored inside transactions.

  • :partitioner (atom/0) - Module that will override default_partitioner configuration.

  • :callback (term/0) - MFA or function/1 that will be called with the produce result. The result is injected as the first argument on MFA and is the only argument for anonymous functions

Examples

Anonymous Function:

iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> input = [rec1, rec2, rec3]
iex> :ok = MyClient.produce_batch_async(input, callback: fn resp ->
...>  [{:ok, _resp1}, {:ok, _resp2}, {:ok, _resp3}] = resp
...> end)

Using MFA:

iex> defmodule CB2 do
...>    def exec(resp, my_arg1, my_arg2) do
...>      "arg1" = my_arg1
...>      "arg2" = my_arg2
...>       [{:ok, _resp1}, {:ok, _resp2}, {:ok, _resp3}] = resp
...>    end
...> end
iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> input = [rec1, rec2, rec3]
iex> :ok = MyClient.produce_batch_async(input, callback: {CB2, :exec, ["arg1", "arg2"]})

Transaction API

produce_batch_txn(list_of_records, opts)

@callback produce_batch_txn(list_of_records(), opts :: Keyword.t()) ::
  {:ok, list_of_records()} | {:error, list_of_records()}

Transactionally produce a batch of records.

It expects a list of Klife.Record structs containg at least :value and :topic and returns a tuple ok/error tuple along side with the enriched version of the input records as described in "Producer API Overview".

The order of the response tuples on the returning list is the same as the input list. That means the first response tuple will be related to the first record on the input and so on.

Beware of performance costs

Each produce_batch_txn/2 will have 2 extra network roundtrips to the broker than a non transactional produce_batch/2. One for adding topic/partitions to the transaction and other to commit or abort it.

Options

  • :pool_name (atom/0) - Txn pool's name that will override the default_txn_pool configuration.

Examples

iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> {:ok, [_resp1, _resp2, _resp3]} = MyClient.produce_batch_txn([rec1, rec2, rec3])

transaction(fun, opts)

@callback transaction(fun :: function(), opts :: Keyword.t()) :: any()

Runs the given function inside a transaction.

Every produce API call made inside the given function will be part of a transaction that will only commit if the returning value of fun is :ok or {:ok, _any}, any other return value will abort all records produced inside the given function.

Beware of performance costs

Each produce call inside the input function may have 1 extra network roundtrip to the broker than a normal non transactional call.

At the end of the transaction another round trip is needed in order to commit or abort the transaction.

Produce semantics inside transaction

All produce API calls keeps the same semantics as they have outside a transaction. This means that records produced using produce_batch/2 may still succeed/fail independently and a produce/2 call may still fail. Therefore it is user's responsability to verify and abort the transaction if needed.

Options

  • :pool_name (atom/0) - Txn pool's name that will override the default_txn_pool configuration.

Examples

iex> {:ok, [_resp1, _resp2, _resp3]} = MyClient.transaction(fn ->
...>  rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
...>  {:ok, resp1} = MyClient.produce(rec1)
...>  rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
...>  rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
...>  [resp2, resp3] = MyClient.produce_batch([rec2, rec3])
...>  {:ok, [resp1, resp2, resp3]}
...> end)

Types

list_of_records()

@type list_of_records() :: [record()]

record()

@type record() :: Klife.Record.t()