ExDataSketch.Broadway.PeriodicAggregator (ExDataSketch v0.9.0)

Copy Markdown View Source

A GenServer that periodically accumulates and flushes sketch data.

This module provides a process-based aggregator for use in Broadway pipelines and other streaming contexts where sketches should be accumulated over time and periodically flushed for downstream consumption (e.g., telemetry, metrics, persistence).

The aggregator holds a single sketch and supports two operations:

  • merge/2 -- merge a partial sketch into the aggregator
  • flush/1 -- return the current aggregate sketch and reset to a new one

Usage

# Start an aggregator for HLL cardinality tracking
{:ok, agg} = PeriodicAggregator.start_link(
  sketch_module: ExDataSketch.HLL,
  sketch_opts: [p: 14],
  flush_interval: 5_000,
  flush_callback: fn sketch ->
    :telemetry.execute([:my_app, :cardinality], %{estimate: HLL.estimate(sketch)})
  end
)

# Merge partial sketches
PeriodicAggregator.merge(agg, partial_sketch)

# Manually flush
sketch = PeriodicAggregator.flush(agg)

Flush Semantics

When :flush_interval is set, the aggregator automatically calls the :flush_callback and resets the sketch at the given interval. If no :flush_callback is provided, the aggregator simply resets the sketch without side effects.

Calling flush/1 manually returns the current aggregate sketch and resets it to a new empty sketch.

Dependency

This module depends on :broadway being available. Call ExDataSketch.Integration.require_broadway!/0 before use if Broadway might not be present.

Summary

Functions

Returns a specification to start this module under a supervisor.

Returns the current estimate from the aggregate sketch.

Flushes the aggregator, returning the current sketch and resetting to a new one.

Returns the current aggregate sketch without resetting it.

Merges a partial sketch into the aggregator.

Starts a periodic aggregator process.

Types

state()

@type state() :: %{
  sketch_module: module(),
  sketch_opts: keyword(),
  current: struct(),
  flush_callback: (struct() -> term()) | nil,
  flush_interval: non_neg_integer(),
  last_flush_time: integer()
}

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

estimate(server)

@spec estimate(GenServer.server()) :: float()

Returns the current estimate from the aggregate sketch.

Convenience function that calls sketch_module.estimate/1 on the current aggregate.

Examples

iex> {:ok, agg} = ExDataSketch.Broadway.PeriodicAggregator.start_link(
...>   sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], flush_interval: :infinity
...> )
iex> ExDataSketch.Broadway.PeriodicAggregator.estimate(agg)
0.0

flush(server)

@spec flush(GenServer.server()) :: struct()

Flushes the aggregator, returning the current sketch and resetting to a new one.

Examples

iex> {:ok, agg} = ExDataSketch.Broadway.PeriodicAggregator.start_link(
...>   sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], flush_interval: :infinity
...> )
iex> partial = ExDataSketch.HLL.from_enumerable(["a", "b"], p: 10)
iex> :ok = ExDataSketch.Broadway.PeriodicAggregator.merge(agg, partial)
iex> flushed = ExDataSketch.Broadway.PeriodicAggregator.flush(agg)
iex> ExDataSketch.HLL.estimate(flushed) > 0.0
true

get(server)

@spec get(GenServer.server()) :: struct()

Returns the current aggregate sketch without resetting it.

Examples

iex> {:ok, agg} = ExDataSketch.Broadway.PeriodicAggregator.start_link(
...>   sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], flush_interval: :infinity
...> )
iex> current = ExDataSketch.Broadway.PeriodicAggregator.get(agg)
iex> ExDataSketch.HLL.estimate(current)
0.0

merge(server, partial_sketch)

@spec merge(
  GenServer.server(),
  struct()
) :: :ok

Merges a partial sketch into the aggregator.

The partial sketch is merged with the current aggregate using sketch_module.merge/2.

Examples

iex> {:ok, agg} = ExDataSketch.Broadway.PeriodicAggregator.start_link(
...>   sketch_module: ExDataSketch.HLL, sketch_opts: [p: 10], flush_interval: :infinity
...> )
iex> partial = ExDataSketch.HLL.from_enumerable(["a", "b"], p: 10)
iex> :ok = ExDataSketch.Broadway.PeriodicAggregator.merge(agg, partial)
iex> ExDataSketch.HLL.estimate(ExDataSketch.Broadway.PeriodicAggregator.get(agg)) > 0.0
true

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts a periodic aggregator process.

Options

  • :sketch_module -- required, the sketch module (e.g., ExDataSketch.HLL).
  • :sketch_opts -- options forwarded to sketch_module.new/1 (default: []).
  • :flush_interval -- milliseconds between automatic flushes (default: 5000). Set to :infinity to disable automatic flush.
  • :flush_callback -- function (sketch -> term) called on each flush (default: nil, no side effect).
  • :name -- GenServer name registration (default: nil).

Examples

iex> {:ok, agg} = ExDataSketch.Broadway.PeriodicAggregator.start_link(
...>   sketch_module: ExDataSketch.HLL,
...>   sketch_opts: [p: 10],
...>   flush_interval: :infinity
...> )
iex> is_pid(agg)
true