A combined GenStage producer-consumer for sketch aggregation pipelines.
SketchStage subscribes to an upstream producer, accumulates events into
a sketch, and emits the current sketch snapshot to downstream consumers
on demand. This enables pipeline compositions where one stage aggregates
and the next stage persists or reports.
Event Contract
Incoming events follow the same convention as
ExDataSketch.GenStage.SketchConsumer:
- Sketch snapshot: when the event is a struct of the configured
:sketch_module, it is merged into the stage's accumulated sketch viasketch_module.merge/2. - Raw item: any other event is passed through
:key_fnand inserted viasketch_module.from_enumerable/2.
After processing each incoming batch, the stage emits a single snapshot of its updated accumulated sketch downstream.
Options
:sketch_module-- required, the sketch module.:sketch_opts-- options forwarded tosketch_module.new/1(default:[]).:key_fn-- function(event -> term())to extract values from raw events (default:fn event -> event end). Not applied to sketch snapshots.:subscribe_to-- a producer or{producer, opts}tuple to subscribe to.
Examples
{:ok, stage} = SketchStage.start_link(
sketch_module: ExDataSketch.HLL,
sketch_opts: [p: 14],
subscribe_to: [{some_producer, max_demand: 100}]
)
# Downstream consumers will receive the current sketch on demand
SketchStage.estimate(stage)
Summary
Functions
Returns the current estimate from the stage's accumulated sketch.
Returns the current accumulated sketch.
Merges a partial sketch into the stage's accumulated sketch.
Starts a SketchStage process.
Types
Functions
@spec estimate(GenServer.server()) :: float()
Returns the current estimate from the stage's accumulated sketch.
Examples
iex> {:ok, stage} = ExDataSketch.GenStage.SketchStage.start_link(
...> sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], subscribe_to: []
...> )
iex> ExDataSketch.GenStage.SketchStage.estimate(stage)
0.0
@spec get(GenServer.server()) :: struct()
Returns the current accumulated sketch.
Examples
iex> {:ok, stage} = ExDataSketch.GenStage.SketchStage.start_link(
...> sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], subscribe_to: []
...> )
iex> ExDataSketch.GenStage.SketchStage.get(stage) |> ExDataSketch.HLL.estimate()
0.0
@spec merge( GenServer.server(), struct() ) :: :ok
Merges a partial sketch into the stage's accumulated sketch.
Examples
iex> {:ok, stage} = ExDataSketch.GenStage.SketchStage.start_link(
...> sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], subscribe_to: []
...> )
iex> partial = ExDataSketch.HLL.from_enumerable(["a"], p: 10)
iex> :ok = ExDataSketch.GenStage.SketchStage.merge(stage, partial)
iex> ExDataSketch.GenStage.SketchStage.estimate(stage) > 0.0
true
@spec start_link(keyword()) :: GenServer.on_start()
Starts a SketchStage process.
Examples
iex> {:ok, stage} = ExDataSketch.GenStage.SketchStage.start_link(
...> sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], subscribe_to: []
...> )
iex> is_pid(stage)
true