ExDataSketch.GenStage.SketchProducer (ExDataSketch v0.9.0)

Copy Markdown View Source

A GenStage producer that emits accumulated sketch snapshots on demand.

SketchProducer maintains an internal sketch that can be updated via update/2 and merge/2. Each downstream demand request schedules the emission of one snapshot of the current sketch; subsequent snapshots are emitted as the producer's state changes (via update/2 or merge/2).

This is useful for downstream consumers that need periodic snapshots of an evolving sketch (e.g., for persistence, cross-node distribution, or metrics reporting).

Emission Semantics

A snapshot is the current sketch struct. Downstream consumers (typically ExDataSketch.GenStage.SketchConsumer or ExDataSketch.GenStage.SketchStage) merge each received snapshot into their own accumulated sketch via sketch_module.merge/2.

Demand is tracked cumulatively. The producer emits at most one snapshot per unit of demand and only when the underlying sketch has been updated since the previous emission (or on the first emission after demand arrives). This avoids flooding downstream consumers with duplicate snapshots while still respecting GenStage back-pressure.

Options

  • :sketch_module -- required, the sketch module.
  • :sketch_opts -- options forwarded to sketch_module.new/1 (default: []).

Examples

{:ok, producer} = SketchProducer.start_link(
  sketch_module: ExDataSketch.HLL,
  sketch_opts: [p: 14]
)

# Update with items
SketchProducer.update(producer, "user_1")
SketchProducer.update(producer, "user_2")

# Consumers that subscribe receive snapshots of the current sketch.

Summary

Functions

Returns the current estimate from the producer's sketch.

Returns the current accumulated sketch.

Merges a partial sketch into the producer's accumulated sketch.

Starts a SketchProducer process.

Updates the producer's sketch with a single item.

Types

state()

@type state() :: %{
  sketch_module: module(),
  sketch_opts: keyword(),
  current: struct(),
  pending_demand: non_neg_integer(),
  dirty: boolean()
}

Functions

estimate(server)

@spec estimate(GenServer.server()) :: float()

Returns the current estimate from the producer's sketch.

Examples

iex> {:ok, producer} = ExDataSketch.GenStage.SketchProducer.start_link(
...>   sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10]
...> )
iex> ExDataSketch.GenStage.SketchProducer.estimate(producer)
0.0

get(server)

@spec get(GenServer.server()) :: struct()

Returns the current accumulated sketch.

Examples

iex> {:ok, producer} = ExDataSketch.GenStage.SketchProducer.start_link(
...>   sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10]
...> )
iex> sketch = ExDataSketch.GenStage.SketchProducer.get(producer)
iex> ExDataSketch.HLL.estimate(sketch)
0.0

merge(server, partial_sketch)

@spec merge(
  GenServer.server(),
  struct()
) :: :ok

Merges a partial sketch into the producer's accumulated sketch.

Examples

iex> {:ok, producer} = ExDataSketch.GenStage.SketchProducer.start_link(
...>   sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10]
...> )
iex> partial = ExDataSketch.HLL.from_enumerable(["a", "b"], p: 10)
iex> :ok = ExDataSketch.GenStage.SketchProducer.merge(producer, partial)
iex> ExDataSketch.GenStage.SketchProducer.estimate(producer) > 0.0
true

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts a SketchProducer process.

Examples

iex> {:ok, producer} = ExDataSketch.GenStage.SketchProducer.start_link(
...>   sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10]
...> )
iex> is_pid(producer)
true

update(server, item)

@spec update(GenServer.server(), term()) :: :ok

Updates the producer's sketch with a single item.

Examples

iex> {:ok, producer} = ExDataSketch.GenStage.SketchProducer.start_link(
...>   sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10]
...> )
iex> :ok = ExDataSketch.GenStage.SketchProducer.update(producer, "item")
iex> ExDataSketch.GenStage.SketchProducer.estimate(producer) > 0.0
true