ExDataSketch.GenStage.SketchConsumer (ExDataSketch v0.9.0)

Copy Markdown View Source

A GenStage consumer that accumulates events into a sketch.

SketchConsumer subscribes to a producer, ingests events using the configured sketch module, and provides read and flush access to the accumulated sketch.

Event Contract

Each incoming event is interpreted in one of two ways:

  1. Sketch snapshot: when the event is a struct of the configured :sketch_module, the consumer merges it into its accumulated sketch via sketch_module.merge/2. This is the contract used by ExDataSketch.GenStage.SketchProducer and ExDataSketch.GenStage.SketchStage, which emit snapshots of their internal sketch on demand.

  2. Raw item: any other event is passed through :key_fn and inserted into the accumulated sketch via sketch_module.update/2 (or sketch_module.from_enumerable/2 for a batch). This is the contract used by raw event sources (Kafka offsets, Phoenix events, etc.).

A single batch may mix both shapes; sketches are merged and raw items are updated independently, then folded into a single accumulator.

Options

  • :sketch_module -- required, the sketch module (e.g., ExDataSketch.HLL).
  • :sketch_opts -- options forwarded to sketch_module.new/1 (default: []).
  • :key_fn -- function (event -> term) that extracts the value from each raw event (default: fn event -> event end). Not applied to sketch snapshots.
  • :flush_interval -- milliseconds between automatic flushes (default: :infinity, no automatic flush). When set, the consumer calls :flush_callback and resets.
  • :flush_callback -- function (sketch -> term()) called on each automatic flush (default: nil).
  • :subscribe_to -- a producer or {producer, opts} tuple to subscribe to.

Examples

{:ok, consumer} = SketchConsumer.start_link(
  sketch_module: ExDataSketch.HLL,
  sketch_opts: [p: 14],
  subscribe_to: [{my_producer, max_demand: 1000}]
)

# After events are consumed
SketchConsumer.estimate(consumer)

# Flush to get the accumulated sketch and reset
sketch = SketchConsumer.flush(consumer)

Summary

Functions

Returns the current estimate from the accumulated sketch.

Flushes the accumulated sketch and resets to a new one.

Returns the current accumulated sketch without resetting.

Merges a partial sketch into the consumer's accumulated sketch.

Starts a SketchConsumer process.

Types

state()

@type state() :: %{
  sketch_module: module(),
  sketch_opts: keyword(),
  key_fn: (term() -> term()),
  current: struct(),
  flush_callback: (struct() -> term()) | nil,
  flush_interval: non_neg_integer() | :infinity,
  last_flush_time: integer()
}

Functions

estimate(server)

@spec estimate(GenServer.server()) :: float()

Returns the current estimate from the accumulated sketch.

Examples

iex> {:ok, consumer} = ExDataSketch.GenStage.SketchConsumer.start_link(
...>   sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], subscribe_to: []
...> )
iex> ExDataSketch.GenStage.SketchConsumer.estimate(consumer)
0.0

flush(server)

@spec flush(GenServer.server()) :: struct()

Flushes the accumulated sketch and resets to a new one.

Returns the flushed sketch.

Examples

iex> {:ok, consumer} = ExDataSketch.GenStage.SketchConsumer.start_link(
...>   sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], subscribe_to: []
...> )
iex> partial = ExDataSketch.HLL.from_enumerable(["a", "b"], p: 10)
iex> :ok = ExDataSketch.GenStage.SketchConsumer.merge(consumer, partial)
iex> flushed = ExDataSketch.GenStage.SketchConsumer.flush(consumer)
iex> ExDataSketch.HLL.estimate(flushed) > 0.0
true

get(server)

@spec get(GenServer.server()) :: struct()

Returns the current accumulated sketch without resetting.

Examples

iex> {:ok, consumer} = ExDataSketch.GenStage.SketchConsumer.start_link(
...>   sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], subscribe_to: []
...> )
iex> current = ExDataSketch.GenStage.SketchConsumer.get(consumer)
iex> ExDataSketch.HLL.estimate(current)
0.0

merge(server, partial_sketch)

@spec merge(
  GenServer.server(),
  struct()
) :: :ok

Merges a partial sketch into the consumer's accumulated sketch.

Examples

iex> {:ok, consumer} = ExDataSketch.GenStage.SketchConsumer.start_link(
...>   sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], subscribe_to: []
...> )
iex> partial = ExDataSketch.HLL.from_enumerable(["a", "b"], p: 10)
iex> :ok = ExDataSketch.GenStage.SketchConsumer.merge(consumer, partial)
iex> ExDataSketch.GenStage.SketchConsumer.estimate(consumer) > 0.0
true

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts a SketchConsumer process.

Examples

iex> {:ok, consumer} = ExDataSketch.GenStage.SketchConsumer.start_link(
...>   sketch_module: ExDataSketch.HLL,
...>   sketch_opts: [p: 10],
...> subscribe_to: []
...> )
iex> is_pid(consumer)
true