GenStage Integration

Copy Markdown View Source

ExDataSketch integrates with GenStage for event-driven sketch aggregation. GenStage is always available as part of OTP, so no additional dependency is required.

Module Overview

Event Contract

The consumer modules (SketchConsumer and SketchStage) accept two kinds of events from upstream:

  1. Sketch snapshots -- structs of the configured :sketch_module, typically emitted by upstream SketchProducer or SketchStage instances. These are merged into the accumulator via sketch_module.merge/2.
  2. Raw items -- any other term. These are passed through :key_fn and inserted via sketch_module.update/2 (or sketch_module.from_enumerable/2 for a batch). This is the contract used by external event sources (Kafka, RabbitMQ, Phoenix events, etc.).

A single batch may mix both shapes; sketches are merged and raw items are updated independently, then folded into a single accumulator.

SketchConsumer

SketchConsumer subscribes to a producer, ingests events using the configured sketch module, and provides read and flush access to the accumulated sketch.

{:ok, consumer} = ExDataSketch.GenStage.SketchConsumer.start_link(
  sketch_module: ExDataSketch.HLL,
  sketch_opts: [p: 14],
  subscribe_to: [{my_producer, max_demand: 1000}]
)

Options

OptionDescriptionDefault
:sketch_moduleRequired. The sketch module--
:sketch_optsOptions for sketch_module.new/1[]
:key_fnFunction to extract values from raw events (not applied to sketch snapshots)fn e -> e end
:flush_intervalAuto-flush interval in ms:infinity
:flush_callbackCalled on each automatic flushnil
:subscribe_toProducer(s) to subscribe to[]

Operations

FunctionDescription
merge/2Merge a partial sketch into the consumer
flush/1Return current sketch and reset
get/1Return current sketch without resetting
estimate/1Return current estimate

SketchProducer

SketchProducer holds a sketch that can be updated and emits snapshots of the current sketch struct to downstream consumers as state changes. Demand is tracked cumulatively: the producer emits at most one snapshot per unit of demand and only when the sketch has been updated since the previous emission. This avoids flooding downstream consumers with duplicate copies while still respecting GenStage back-pressure.

{:ok, producer} = ExDataSketch.GenStage.SketchProducer.start_link(
  sketch_module: ExDataSketch.HLL,
  sketch_opts: [p: 14]
)

ExDataSketch.GenStage.SketchProducer.update(producer, "user_1")
ExDataSketch.GenStage.SketchProducer.update(producer, "user_2")

When a SketchConsumer subscribes to a SketchProducer, each event the consumer receives is a sketch struct and is merged into its own accumulator, preserving cardinality round-trip.

SketchStage

SketchStage is a combined producer-consumer that subscribes upstream, merges whatever it receives (raw items or sketches), and re-emits a snapshot of its accumulated sketch downstream after each batch:

{:ok, stage} = ExDataSketch.GenStage.SketchStage.start_link(
  sketch_module: ExDataSketch.HLL,
  sketch_opts: [p: 14],
  subscribe_to: [{my_producer, max_demand: 1000}]
)

Periodic Flush

Both SketchConsumer and PeriodicAggregator support periodic flush:

# Consumer with periodic flush
{:ok, consumer} = ExDataSketch.GenStage.SketchConsumer.start_link(
  sketch_module: ExDataSketch.HLL,
  sketch_opts: [p: 14],
  flush_interval: 5_000,
  flush_callback: fn sketch ->
    :telemetry.execute([:app, :cardinality], %{estimate: HLL.estimate(sketch)})
  end,
  subscribe_to: [{producer, max_demand: 1000}]
)

Merge Semantics

All three modules use sketch_module.merge/2 for accumulation. Because merge is associative and commutative for most sketch types, partial results from different workers or time windows can be combined in any order without affecting the final result.

See Also