A GenStage producer that emits accumulated sketch snapshots on demand.
SketchProducer maintains an internal sketch that can be updated via
update/2 and merge/2. Each downstream demand request schedules the
emission of one snapshot of the current sketch; subsequent snapshots
are emitted as the producer's state changes (via update/2 or
merge/2).
This is useful for downstream consumers that need periodic snapshots of an evolving sketch (e.g., for persistence, cross-node distribution, or metrics reporting).
Emission Semantics
A snapshot is the current sketch struct. Downstream consumers
(typically ExDataSketch.GenStage.SketchConsumer or
ExDataSketch.GenStage.SketchStage) merge each received snapshot into
their own accumulated sketch via sketch_module.merge/2.
Demand is tracked cumulatively. The producer emits at most one snapshot per unit of demand and only when the underlying sketch has been updated since the previous emission (or on the first emission after demand arrives). This avoids flooding downstream consumers with duplicate snapshots while still respecting GenStage back-pressure.
Options
:sketch_module-- required, the sketch module.:sketch_opts-- options forwarded tosketch_module.new/1(default:[]).
Examples
{:ok, producer} = SketchProducer.start_link(
sketch_module: ExDataSketch.HLL,
sketch_opts: [p: 14]
)
# Update with items
SketchProducer.update(producer, "user_1")
SketchProducer.update(producer, "user_2")
# Consumers that subscribe receive snapshots of the current sketch.
Summary
Functions
Returns the current estimate from the producer's sketch.
Returns the current accumulated sketch.
Merges a partial sketch into the producer's accumulated sketch.
Starts a SketchProducer process.
Updates the producer's sketch with a single item.
Types
@type state() :: %{ sketch_module: module(), sketch_opts: keyword(), current: struct(), pending_demand: non_neg_integer(), dirty: boolean() }
Functions
@spec estimate(GenServer.server()) :: float()
Returns the current estimate from the producer's sketch.
Examples
iex> {:ok, producer} = ExDataSketch.GenStage.SketchProducer.start_link(
...> sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10]
...> )
iex> ExDataSketch.GenStage.SketchProducer.estimate(producer)
0.0
@spec get(GenServer.server()) :: struct()
Returns the current accumulated sketch.
Examples
iex> {:ok, producer} = ExDataSketch.GenStage.SketchProducer.start_link(
...> sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10]
...> )
iex> sketch = ExDataSketch.GenStage.SketchProducer.get(producer)
iex> ExDataSketch.HLL.estimate(sketch)
0.0
@spec merge( GenServer.server(), struct() ) :: :ok
Merges a partial sketch into the producer's accumulated sketch.
Examples
iex> {:ok, producer} = ExDataSketch.GenStage.SketchProducer.start_link(
...> sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10]
...> )
iex> partial = ExDataSketch.HLL.from_enumerable(["a", "b"], p: 10)
iex> :ok = ExDataSketch.GenStage.SketchProducer.merge(producer, partial)
iex> ExDataSketch.GenStage.SketchProducer.estimate(producer) > 0.0
true
@spec start_link(keyword()) :: GenServer.on_start()
Starts a SketchProducer process.
Examples
iex> {:ok, producer} = ExDataSketch.GenStage.SketchProducer.start_link(
...> sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10]
...> )
iex> is_pid(producer)
true
@spec update(GenServer.server(), term()) :: :ok
Updates the producer's sketch with a single item.
Examples
iex> {:ok, producer} = ExDataSketch.GenStage.SketchProducer.start_link(
...> sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10]
...> )
iex> :ok = ExDataSketch.GenStage.SketchProducer.update(producer, "item")
iex> ExDataSketch.GenStage.SketchProducer.estimate(producer) > 0.0
true