ExDataSketch.GenStage.SketchStage (ExDataSketch v0.9.0)

Copy Markdown View Source

A combined GenStage producer-consumer for sketch aggregation pipelines.

SketchStage subscribes to an upstream producer, accumulates events into a sketch, and emits the current sketch snapshot to downstream consumers on demand. This enables pipeline compositions where one stage aggregates and the next stage persists or reports.

Event Contract

Incoming events follow the same convention as ExDataSketch.GenStage.SketchConsumer:

  1. Sketch snapshot: when the event is a struct of the configured :sketch_module, it is merged into the stage's accumulated sketch via sketch_module.merge/2.
  2. Raw item: any other event is passed through :key_fn and inserted via sketch_module.from_enumerable/2.

After processing each incoming batch, the stage emits a single snapshot of its updated accumulated sketch downstream.

Options

  • :sketch_module -- required, the sketch module.
  • :sketch_opts -- options forwarded to sketch_module.new/1 (default: []).
  • :key_fn -- function (event -> term()) to extract values from raw events (default: fn event -> event end). Not applied to sketch snapshots.
  • :subscribe_to -- a producer or {producer, opts} tuple to subscribe to.

Examples

{:ok, stage} = SketchStage.start_link(
  sketch_module: ExDataSketch.HLL,
  sketch_opts: [p: 14],
  subscribe_to: [{some_producer, max_demand: 100}]
)

# Downstream consumers will receive the current sketch on demand
SketchStage.estimate(stage)

Summary

Functions

Returns the current estimate from the stage's accumulated sketch.

Returns the current accumulated sketch.

Merges a partial sketch into the stage's accumulated sketch.

Starts a SketchStage process.

Types

state()

@type state() :: %{
  sketch_module: module(),
  sketch_opts: keyword(),
  key_fn: (term() -> term()),
  current: struct()
}

Functions

estimate(server)

@spec estimate(GenServer.server()) :: float()

Returns the current estimate from the stage's accumulated sketch.

Examples

iex> {:ok, stage} = ExDataSketch.GenStage.SketchStage.start_link(
...>   sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], subscribe_to: []
...> )
iex> ExDataSketch.GenStage.SketchStage.estimate(stage)
0.0

get(server)

@spec get(GenServer.server()) :: struct()

Returns the current accumulated sketch.

Examples

iex> {:ok, stage} = ExDataSketch.GenStage.SketchStage.start_link(
...>   sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], subscribe_to: []
...> )
iex> ExDataSketch.GenStage.SketchStage.get(stage) |> ExDataSketch.HLL.estimate()
0.0

merge(server, partial_sketch)

@spec merge(
  GenServer.server(),
  struct()
) :: :ok

Merges a partial sketch into the stage's accumulated sketch.

Examples

iex> {:ok, stage} = ExDataSketch.GenStage.SketchStage.start_link(
...>   sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], subscribe_to: []
...> )
iex> partial = ExDataSketch.HLL.from_enumerable(["a"], p: 10)
iex> :ok = ExDataSketch.GenStage.SketchStage.merge(stage, partial)
iex> ExDataSketch.GenStage.SketchStage.estimate(stage) > 0.0
true

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts a SketchStage process.

Examples

iex> {:ok, stage} = ExDataSketch.GenStage.SketchStage.start_link(
...>   sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], subscribe_to: []
...> )
iex> is_pid(stage)
true