Broadway Integration

Copy Markdown View Source

ExDataSketch integrates with Broadway for message queue-driven sketch aggregation. This guide explains how to use ExDataSketch.Broadway and ExDataSketch.Broadway.PeriodicAggregator in production pipelines.

Dependency

Add {:broadway, "~> 1.0"} to your mix.exs dependencies. Broadway is an optional dependency -- if it is not present, calling Broadway integration functions will raise a clear error directing you to add it.

Per-Batch Aggregation

Use ExDataSketch.Broadway.accumulate/3 inside handle_batch/4 to build a sketch from a batch of messages:

defmodule MyPipeline do
  use Broadway

  @impl true
  def handle_batch(:default, messages, _batch_info, _context) do
    sketch =
      ExDataSketch.Broadway.accumulate(messages, ExDataSketch.HLL,
        p: 14,
        key_fn: fn msg -> msg.data.user_id end
      )

    :telemetry.execute([:my_app, :cardinality], %{
      estimate: ExDataSketch.HLL.estimate(sketch)
    })

    messages
  end
end

accumulate_into/3

To merge a batch into an existing sketch:

existing = ExDataSketch.HLL.new(p: 14)
sketch = ExDataSketch.Broadway.accumulate_into(messages, existing)

Periodic Aggregation

For rolling windows or periodic flush semantics, use ExDataSketch.Broadway.PeriodicAggregator:

{:ok, agg} = ExDataSketch.Broadway.PeriodicAggregator.start_link(
  sketch_module: ExDataSketch.HLL,
  sketch_opts: [p: 14],
  flush_interval: 5_000,
  flush_callback: fn sketch ->
    :telemetry.execute([:my_app, :cardinality], %{
      estimate: ExDataSketch.HLL.estimate(sketch)
    })
  end
)

PeriodicAggregator Operations

FunctionDescription
merge/2Merge a partial sketch into the aggregator
flush/1Return the current sketch and reset
get/1Return the current sketch without resetting
estimate/1Return the current estimate

The aggregator automatically flushes at the configured interval. Set flush_interval: :infinity to disable automatic flush.

Configuration

Broadway integration can be enabled or disabled via application config:

config :ex_data_sketch, :integrations, broadway: true

Why Sketches Fit Broadway Pipelines

Broadway processes messages in batches from message queues (SQS, Kafka, etc.). Sketches are ideal for Broadway because:

  1. Bounded memory: Sketch size is independent of input cardinality
  2. Associative merge: Partial sketches from different batches can be merged in any order
  3. No random access: Each message is processed exactly once
  4. Streaming-friendly: No need to buffer the entire dataset

See Also