A GenStage consumer that accumulates events into a sketch.
SketchConsumer subscribes to a producer, ingests events using the
configured sketch module, and provides read and flush access to the
accumulated sketch.
Event Contract
Each incoming event is interpreted in one of two ways:
Sketch snapshot: when the event is a struct of the configured
:sketch_module, the consumer merges it into its accumulated sketch viasketch_module.merge/2. This is the contract used byExDataSketch.GenStage.SketchProducerandExDataSketch.GenStage.SketchStage, which emit snapshots of their internal sketch on demand.Raw item: any other event is passed through
:key_fnand inserted into the accumulated sketch viasketch_module.update/2(orsketch_module.from_enumerable/2for a batch). This is the contract used by raw event sources (Kafka offsets, Phoenix events, etc.).
A single batch may mix both shapes; sketches are merged and raw items are updated independently, then folded into a single accumulator.
Options
:sketch_module-- required, the sketch module (e.g.,ExDataSketch.HLL).:sketch_opts-- options forwarded tosketch_module.new/1(default:[]).:key_fn-- function(event -> term)that extracts the value from each raw event (default:fn event -> event end). Not applied to sketch snapshots.:flush_interval-- milliseconds between automatic flushes (default::infinity, no automatic flush). When set, the consumer calls:flush_callbackand resets.:flush_callback-- function(sketch -> term())called on each automatic flush (default:nil).:subscribe_to-- a producer or{producer, opts}tuple to subscribe to.
Examples
{:ok, consumer} = SketchConsumer.start_link(
sketch_module: ExDataSketch.HLL,
sketch_opts: [p: 14],
subscribe_to: [{my_producer, max_demand: 1000}]
)
# After events are consumed
SketchConsumer.estimate(consumer)
# Flush to get the accumulated sketch and reset
sketch = SketchConsumer.flush(consumer)
Summary
Functions
Returns the current estimate from the accumulated sketch.
Flushes the accumulated sketch and resets to a new one.
Returns the current accumulated sketch without resetting.
Merges a partial sketch into the consumer's accumulated sketch.
Starts a SketchConsumer process.
Types
Functions
@spec estimate(GenServer.server()) :: float()
Returns the current estimate from the accumulated sketch.
Examples
iex> {:ok, consumer} = ExDataSketch.GenStage.SketchConsumer.start_link(
...> sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], subscribe_to: []
...> )
iex> ExDataSketch.GenStage.SketchConsumer.estimate(consumer)
0.0
@spec flush(GenServer.server()) :: struct()
Flushes the accumulated sketch and resets to a new one.
Returns the flushed sketch.
Examples
iex> {:ok, consumer} = ExDataSketch.GenStage.SketchConsumer.start_link(
...> sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], subscribe_to: []
...> )
iex> partial = ExDataSketch.HLL.from_enumerable(["a", "b"], p: 10)
iex> :ok = ExDataSketch.GenStage.SketchConsumer.merge(consumer, partial)
iex> flushed = ExDataSketch.GenStage.SketchConsumer.flush(consumer)
iex> ExDataSketch.HLL.estimate(flushed) > 0.0
true
@spec get(GenServer.server()) :: struct()
Returns the current accumulated sketch without resetting.
Examples
iex> {:ok, consumer} = ExDataSketch.GenStage.SketchConsumer.start_link(
...> sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], subscribe_to: []
...> )
iex> current = ExDataSketch.GenStage.SketchConsumer.get(consumer)
iex> ExDataSketch.HLL.estimate(current)
0.0
@spec merge( GenServer.server(), struct() ) :: :ok
Merges a partial sketch into the consumer's accumulated sketch.
Examples
iex> {:ok, consumer} = ExDataSketch.GenStage.SketchConsumer.start_link(
...> sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], subscribe_to: []
...> )
iex> partial = ExDataSketch.HLL.from_enumerable(["a", "b"], p: 10)
iex> :ok = ExDataSketch.GenStage.SketchConsumer.merge(consumer, partial)
iex> ExDataSketch.GenStage.SketchConsumer.estimate(consumer) > 0.0
true
@spec start_link(keyword()) :: GenServer.on_start()
Starts a SketchConsumer process.
Examples
iex> {:ok, consumer} = ExDataSketch.GenStage.SketchConsumer.start_link(
...> sketch_module: ExDataSketch.HLL,
...> sketch_opts: [p: 10],
...> subscribe_to: []
...> )
iex> is_pid(consumer)
true