ExDataSketch integrates with GenStage for event-driven sketch aggregation. GenStage is always available as part of OTP, so no additional dependency is required.
Module Overview
ExDataSketch.GenStage.SketchConsumer-- A consumer that accumulates events into a sketch with periodic flush support.ExDataSketch.GenStage.SketchProducer-- A producer that emits accumulated sketches on demand.ExDataSketch.GenStage.SketchStage-- A combined producer-consumer that accumulates events and emits merged sketches downstream.
Event Contract
The consumer modules (SketchConsumer and SketchStage) accept two
kinds of events from upstream:
- Sketch snapshots -- structs of the configured
:sketch_module, typically emitted by upstreamSketchProducerorSketchStageinstances. These are merged into the accumulator viasketch_module.merge/2. - Raw items -- any other term. These are passed through
:key_fnand inserted viasketch_module.update/2(orsketch_module.from_enumerable/2for a batch). This is the contract used by external event sources (Kafka, RabbitMQ, Phoenix events, etc.).
A single batch may mix both shapes; sketches are merged and raw items are updated independently, then folded into a single accumulator.
SketchConsumer
SketchConsumer subscribes to a producer, ingests events using the configured
sketch module, and provides read and flush access to the accumulated sketch.
{:ok, consumer} = ExDataSketch.GenStage.SketchConsumer.start_link(
sketch_module: ExDataSketch.HLL,
sketch_opts: [p: 14],
subscribe_to: [{my_producer, max_demand: 1000}]
)Options
| Option | Description | Default |
|---|---|---|
:sketch_module | Required. The sketch module | -- |
:sketch_opts | Options for sketch_module.new/1 | [] |
:key_fn | Function to extract values from raw events (not applied to sketch snapshots) | fn e -> e end |
:flush_interval | Auto-flush interval in ms | :infinity |
:flush_callback | Called on each automatic flush | nil |
:subscribe_to | Producer(s) to subscribe to | [] |
Operations
| Function | Description |
|---|---|
merge/2 | Merge a partial sketch into the consumer |
flush/1 | Return current sketch and reset |
get/1 | Return current sketch without resetting |
estimate/1 | Return current estimate |
SketchProducer
SketchProducer holds a sketch that can be updated and emits snapshots
of the current sketch struct to downstream consumers as state changes.
Demand is tracked cumulatively: the producer emits at most one snapshot
per unit of demand and only when the sketch has been updated since the
previous emission. This avoids flooding downstream consumers with
duplicate copies while still respecting GenStage back-pressure.
{:ok, producer} = ExDataSketch.GenStage.SketchProducer.start_link(
sketch_module: ExDataSketch.HLL,
sketch_opts: [p: 14]
)
ExDataSketch.GenStage.SketchProducer.update(producer, "user_1")
ExDataSketch.GenStage.SketchProducer.update(producer, "user_2")When a SketchConsumer subscribes to a SketchProducer, each event the
consumer receives is a sketch struct and is merged into its own
accumulator, preserving cardinality round-trip.
SketchStage
SketchStage is a combined producer-consumer that subscribes upstream,
merges whatever it receives (raw items or sketches), and re-emits a
snapshot of its accumulated sketch downstream after each batch:
{:ok, stage} = ExDataSketch.GenStage.SketchStage.start_link(
sketch_module: ExDataSketch.HLL,
sketch_opts: [p: 14],
subscribe_to: [{my_producer, max_demand: 1000}]
)Periodic Flush
Both SketchConsumer and PeriodicAggregator support periodic flush:
# Consumer with periodic flush
{:ok, consumer} = ExDataSketch.GenStage.SketchConsumer.start_link(
sketch_module: ExDataSketch.HLL,
sketch_opts: [p: 14],
flush_interval: 5_000,
flush_callback: fn sketch ->
:telemetry.execute([:app, :cardinality], %{estimate: HLL.estimate(sketch)})
end,
subscribe_to: [{producer, max_demand: 1000}]
)Merge Semantics
All three modules use sketch_module.merge/2 for accumulation. Because merge
is associative and commutative for most sketch types, partial results from
different workers or time windows can be combined in any order without
affecting the final result.