A GenServer that periodically accumulates and flushes sketch data.
This module provides a process-based aggregator for use in Broadway pipelines and other streaming contexts where sketches should be accumulated over time and periodically flushed for downstream consumption (e.g., telemetry, metrics, persistence).
The aggregator holds a single sketch and supports two operations:
merge/2-- merge a partial sketch into the aggregatorflush/1-- return the current aggregate sketch and reset to a new one
Usage
# Start an aggregator for HLL cardinality tracking
{:ok, agg} = PeriodicAggregator.start_link(
sketch_module: ExDataSketch.HLL,
sketch_opts: [p: 14],
flush_interval: 5_000,
flush_callback: fn sketch ->
:telemetry.execute([:my_app, :cardinality], %{estimate: HLL.estimate(sketch)})
end
)
# Merge partial sketches
PeriodicAggregator.merge(agg, partial_sketch)
# Manually flush
sketch = PeriodicAggregator.flush(agg)Flush Semantics
When :flush_interval is set, the aggregator automatically calls the
:flush_callback and resets the sketch at the given interval. If no
:flush_callback is provided, the aggregator simply resets the sketch
without side effects.
Calling flush/1 manually returns the current aggregate sketch and
resets it to a new empty sketch.
Dependency
This module depends on :broadway being available. Call
ExDataSketch.Integration.require_broadway!/0 before use if Broadway
might not be present.
Summary
Functions
Returns a specification to start this module under a supervisor.
Returns the current estimate from the aggregate sketch.
Flushes the aggregator, returning the current sketch and resetting to a new one.
Returns the current aggregate sketch without resetting it.
Merges a partial sketch into the aggregator.
Starts a periodic aggregator process.
Types
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec estimate(GenServer.server()) :: float()
Returns the current estimate from the aggregate sketch.
Convenience function that calls sketch_module.estimate/1 on the
current aggregate.
Examples
iex> {:ok, agg} = ExDataSketch.Broadway.PeriodicAggregator.start_link(
...> sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], flush_interval: :infinity
...> )
iex> ExDataSketch.Broadway.PeriodicAggregator.estimate(agg)
0.0
@spec flush(GenServer.server()) :: struct()
Flushes the aggregator, returning the current sketch and resetting to a new one.
Examples
iex> {:ok, agg} = ExDataSketch.Broadway.PeriodicAggregator.start_link(
...> sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], flush_interval: :infinity
...> )
iex> partial = ExDataSketch.HLL.from_enumerable(["a", "b"], p: 10)
iex> :ok = ExDataSketch.Broadway.PeriodicAggregator.merge(agg, partial)
iex> flushed = ExDataSketch.Broadway.PeriodicAggregator.flush(agg)
iex> ExDataSketch.HLL.estimate(flushed) > 0.0
true
@spec get(GenServer.server()) :: struct()
Returns the current aggregate sketch without resetting it.
Examples
iex> {:ok, agg} = ExDataSketch.Broadway.PeriodicAggregator.start_link(
...> sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], flush_interval: :infinity
...> )
iex> current = ExDataSketch.Broadway.PeriodicAggregator.get(agg)
iex> ExDataSketch.HLL.estimate(current)
0.0
@spec merge( GenServer.server(), struct() ) :: :ok
Merges a partial sketch into the aggregator.
The partial sketch is merged with the current aggregate using
sketch_module.merge/2.
Examples
iex> {:ok, agg} = ExDataSketch.Broadway.PeriodicAggregator.start_link(
...> sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], flush_interval: :infinity
...> )
iex> partial = ExDataSketch.HLL.from_enumerable(["a", "b"], p: 10)
iex> :ok = ExDataSketch.Broadway.PeriodicAggregator.merge(agg, partial)
iex> ExDataSketch.HLL.estimate(ExDataSketch.Broadway.PeriodicAggregator.get(agg)) > 0.0
true
@spec start_link(keyword()) :: GenServer.on_start()
Starts a periodic aggregator process.
Options
:sketch_module-- required, the sketch module (e.g.,ExDataSketch.HLL).:sketch_opts-- options forwarded tosketch_module.new/1(default:[]).:flush_interval-- milliseconds between automatic flushes (default: 5000). Set to:infinityto disable automatic flush.:flush_callback-- function(sketch -> term)called on each flush (default:nil, no side effect).:name-- GenServer name registration (default:nil).
Examples
iex> {:ok, agg} = ExDataSketch.Broadway.PeriodicAggregator.start_link(
...> sketch_module: ExDataSketch.HLL,
...> sketch_opts: [p: 10],
...> flush_interval: :infinity
...> )
iex> is_pid(agg)
true