OffBroadwayKlife.Producer (Broadway Klife v0.1.0)

Copy Markdown View Source

A Broadway producer that consumes from Kafka through Klife's consumer group manual mode.

Klife runs the full consumer-group machinery, heartbeats, rebalances, fetching and buffering, while this producer drives delivery: it pulls buffered batches on demand, turns each Klife.Record into a Broadway.Message, and commits offsets back as Broadway acknowledges them.

Usage

Define a Klife.Client (see Klife's docs for client configuration) and make sure it is started in your supervision tree before the pipeline:

defmodule MyApp.KafkaClient do
  use Klife.Client, otp_app: :my_app
end

Then start Broadway with this producer, pointing it at the client:

defmodule MyApp.Pipeline do
  use Broadway

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module:
          {OffBroadwayKlife.Producer,
           client: MyApp.KafkaClient,
           group_name: "my-broadway-group",
           topics: [[name: "orders"], [name: "events"]],
           receive_interval: 500},
        concurrency: 1
      ],
      processors: [default: [concurrency: 10]]
    )
  end

  @impl true
  def handle_message(_processor, message, _context) do
    IO.inspect(message.data)
    message
  end
end

The producer starts its built-in consumer group, on the client and supervises it as part of the pipeline, so there is no consumer group to define or add to your supervision tree. The built-in group is bound to the client on first start, which means all :client pipelines in a node must use the same Klife client.

Using a consumer group module

To run Klife's consumer lifecycle callbacks (handle_consumer_start/3 and handle_consumer_stop/4) alongside Broadway — or to consume through more than one Klife client — define a consumer group module and pass it as :consumer_group instead of :client (the two options are mutually exclusive):

defmodule MyApp.KafkaConsumerGroup do
  use Klife.Consumer.ConsumerGroup, client: MyApp.KafkaClient

  @impl true
  def handle_consumer_start(_topic, _partition, _group_name) do
    # e.g. emit telemetry
    :ok
  end
end

producer: [
  module:
    {OffBroadwayKlife.Producer,
     consumer_group: MyApp.KafkaConsumerGroup,
     group_name: "my-broadway-group",
     topics: [[name: "orders"]]}
]

Do not implement handle_record_batch/4: Broadway drives fetching and committing through Klife's manual mode, so that callback never runs. The producer starts the group in either case — do not add it to your supervision tree.

Options

Message format

:message_format controls the shape of each Broadway.Message:

  • :klife (default) - message.data is the full Klife.Record struct, the same type used across Klife's produce and fetch APIs. It carries everything (value, key, headers as maps, timestamp, consumer_attempts, batch_attributes, ...), and message.metadata is empty. Prefer this for new pipelines: it is lossless and consistent with the rest of Klife.

    def handle_message(_, %{data: %Klife.Record{value: value}} = msg, _),
      do: msg
  • :broadway_kafka - message.data is the raw value and message.metadata mirrors broadway_kafka: %{topic, partition, offset, key, ts, headers} with headers as {key, value} tuples. Use this to drop OffBroadwayKlife.Producer into an existing broadway_kafka pipeline without changing handle_message/3.

Either way, routing, batching, acknowledgement and offset commits are identical — only the user-facing message shape changes.

Producer concurrency

The Klife consumer group is started once and keeps a single membership no matter how many producers run. You may set producer concurrency > 1: each assigned {topic, partition} is claimed by exactly one producer via a stable hash, so producers share the pull/commit load with no overlap and no extra group members with no coordinator process overhead. For maximum concurrency raise it up to the expected number of assigned partitions for a given member of the group, any exceeding producer will stay idle.

Raise concurrency up to the number of partitions you expect to be assigned to the application; producers beyond assigned stay idle.

Ordering

Kafka guarantees ordering per topic-partition. The connector preserves it end to end: each partition is pulled by exactly one producer, which emits its records in offset order, and it sets Broadway's :partition_by so that every record of a given {topic, partition} is always routed to the same processor (and batcher) stage. Records of different partitions still process concurrently.

When you use a batcher, each batch also holds a single partition's records: the connector defaults :batch_key to {topic, partition}, so a handle_batch/4 call maps to one partition's contiguous offset range.

You may override :batch_key (or :batcher) in handle_message/3:

  • Coarsening it (e.g. to the topic, or leaving it :default) packs records from several partitions into one handle_batch call, giving fuller batches and fewer round-trips when a node owns many low-volume partitions. Per-partition ordering is still preserved — the batch just interleaves partitions, so group by partition inside handle_batch if your logic needs to.
  • Refining it (a sub-partition key) or routing to multiple :batchers re-splits a partition across independently-flushing batches, which gives up strict per-partition ordering. Use it only when per-key (not per-partition) ordering is enough, or to fan message types out to different sinks (e.g. a dead-letter batcher).

Offset commits stay correct in every case regardless of how records are batched.

Because the connector manages :partition_by, you must not set it yourself — doing so raises. Scale out across partitions with processor/batcher concurrency.

Delivery semantics

Klife provides at-least-once delivery and this producer preserves it: an offset is only committed once it and every lower delivered offset on the same partition* have been acknowledged by Broadway.

Because Kafka tracks a single committed offset per partition, a failed message cannot be skipped while committing past it. Both successful and failed messages therefore advance the offset; handle failures explicitly via Broadway.handle_failed/2 (for example, by producing to a dead-letter topic) rather than relying on them blocking the partition.