ExDataSketch integrates with Broadway for
message queue-driven sketch aggregation. This guide explains how to use
ExDataSketch.Broadway and ExDataSketch.Broadway.PeriodicAggregator in
production pipelines.
Dependency
Add {:broadway, "~> 1.0"} to your mix.exs dependencies. Broadway is an
optional dependency -- if it is not present, calling Broadway integration
functions will raise a clear error directing you to add it.
Per-Batch Aggregation
Use ExDataSketch.Broadway.accumulate/3 inside handle_batch/4 to build a
sketch from a batch of messages:
defmodule MyPipeline do
use Broadway
@impl true
def handle_batch(:default, messages, _batch_info, _context) do
sketch =
ExDataSketch.Broadway.accumulate(messages, ExDataSketch.HLL,
p: 14,
key_fn: fn msg -> msg.data.user_id end
)
:telemetry.execute([:my_app, :cardinality], %{
estimate: ExDataSketch.HLL.estimate(sketch)
})
messages
end
endaccumulate_into/3
To merge a batch into an existing sketch:
existing = ExDataSketch.HLL.new(p: 14)
sketch = ExDataSketch.Broadway.accumulate_into(messages, existing)Periodic Aggregation
For rolling windows or periodic flush semantics, use
ExDataSketch.Broadway.PeriodicAggregator:
{:ok, agg} = ExDataSketch.Broadway.PeriodicAggregator.start_link(
sketch_module: ExDataSketch.HLL,
sketch_opts: [p: 14],
flush_interval: 5_000,
flush_callback: fn sketch ->
:telemetry.execute([:my_app, :cardinality], %{
estimate: ExDataSketch.HLL.estimate(sketch)
})
end
)PeriodicAggregator Operations
| Function | Description |
|---|---|
merge/2 | Merge a partial sketch into the aggregator |
flush/1 | Return the current sketch and reset |
get/1 | Return the current sketch without resetting |
estimate/1 | Return the current estimate |
The aggregator automatically flushes at the configured interval. Set
flush_interval: :infinity to disable automatic flush.
Configuration
Broadway integration can be enabled or disabled via application config:
config :ex_data_sketch, :integrations, broadway: trueWhy Sketches Fit Broadway Pipelines
Broadway processes messages in batches from message queues (SQS, Kafka, etc.). Sketches are ideal for Broadway because:
- Bounded memory: Sketch size is independent of input cardinality
- Associative merge: Partial sketches from different batches can be merged in any order
- No random access: Each message is processed exactly once
- Streaming-friendly: No need to buffer the entire dataset