Broadway integration for sketch aggregation.
This module provides helpers for accumulating sketch data from Broadway
message batches. It composes the existing ExDataSketch.Stream and
per-sketch from_enumerable/2 APIs to build sketches from message
payloads without reimplementing any sketch logic.
Dependency
This module requires the :broadway dependency. If Broadway is not
available, calls to accumulate/3 will raise a clear error directing
the user to add it.
Quick Start
defmodule MyPipeline do
use Broadway
def handle_batch(:default, messages, _batch_info, _context) do
key_fn = fn msg -> msg.data.user_id end
sketch = ExDataSketch.Broadway.accumulate(messages, ExDataSketch.HLL, p: 14, key_fn: key_fn)
:telemetry.execute([:my_app, :cardinality], %{estimate: ExDataSketch.HLL.estimate(sketch)})
messages
end
endPeriodic Aggregation
For use cases that require periodic flush semantics (e.g., rolling
cardinality windows), see ExDataSketch.Broadway.PeriodicAggregator.
Configuration
Broadway integration can be explicitly enabled or disabled via application config:
config :ex_data_sketch, :integrations, broadway: trueWhen not configured, availability defaults to whether :broadway is
loaded at runtime.
Summary
Functions
Accumulates sketch data from a list of Broadway messages.
Accumulates sketch data from a list of Broadway messages into an existing sketch.
Functions
Accumulates sketch data from a list of Broadway messages.
Extracts values from messages using key_fn, then builds a sketch from
those values using the specified sketch module's from_enumerable/2.
Arguments
messages-- a list of Broadway messages (any struct with adatafield, or any value ifkey_fnextracts the relevant data).sketch_module-- the sketch module atom (e.g.,ExDataSketch.HLL).opts-- keyword list::key_fn-- function(message -> term)that extracts the value from each message. Defaults tofn msg -> msg.data end.- All other options are forwarded to
sketch_module.from_enumerable/2.
Examples
iex> messages = [%{data: "a"}, %{data: "b"}, %{data: "a"}]
iex> sketch = ExDataSketch.Broadway.accumulate(messages, ExDataSketch.HLL, p: 10)
iex> ExDataSketch.HLL.estimate(sketch) > 0.0
true
iex> messages = [%Broadway.Message{data: "x", acknowledger: nil}, %{data: "y", acknowledger: nil}]
iex> sketch = ExDataSketch.Broadway.accumulate(messages, ExDataSketch.CMS, width: 64, depth: 3, key_fn: fn msg -> msg.data end)
iex> ExDataSketch.CMS.estimate(sketch, "x") >= 1
true
Accumulates sketch data from a list of Broadway messages into an existing sketch.
Builds a batch sketch from the messages using sketch_module.from_enumerable/2,
then merges it into the provided sketch using sketch_module.merge/2.
This works with all mergeable sketch types, including those that use put/2
instead of update/2 (e.g., Bloom, Quotient).
Arguments
messages-- a list of Broadway messages.sketch-- an existing sketch struct to merge into.opts-- keyword list::key_fn-- function(message -> term)that extracts the value from each message. Defaults tofn msg -> msg.data end.- All other options are forwarded to
sketch_module.from_enumerable/2when building the batch sketch.
Examples
iex> existing = ExDataSketch.HLL.new(p: 10) |> ExDataSketch.HLL.update("existing")
iex> messages = [%{data: "a"}, %{data: "b"}]
iex> sketch = ExDataSketch.Broadway.accumulate_into(messages, existing, p: 10)
iex> ExDataSketch.HLL.estimate(sketch) >= 3
true