This guide explains how to use ExDataSketch with Elixir streams and the Collectable protocol for idiomatic, memory-efficient sketch construction.

Why Sketches Fit Streaming Architectures

Probabilistic sketches are natural stream consumers:

  • Bounded memory: Sketches use a fixed amount of memory regardless of input size. An HLL with p: 14 uses 16 KiB whether it processes 1,000 or 1,000,000,000 items.
  • Single-pass: Sketches ingest each item exactly once. No buffering or multi-pass scans are needed.
  • Mergeable: Most sketches support merge/2, which is associative and commutative. This means partial results from parallel or partitioned workers can be combined in any order.
  • No random access: Sketches never need to revisit earlier items. This makes them ideal for lazy streams and pipelines.

ExDataSketch.Stream

The ExDataSketch.Stream module provides terminal stream consumers that build sketches from lazy enumerables. Each function consumes the stream and returns a completed sketch struct.

Building Sketches from Streams

# HLL cardinality estimation from a lazy stream
cardinality =
  1..100_000
  |> Stream.map(&to_string/1)
  |> ExDataSketch.Stream.hll(p: 14)
  |> ExDataSketch.HLL.estimate()

# CMS frequency estimation
freq =
  File.stream!("access.log")
  |> Stream.map(&parse_request_path/1)
  |> ExDataSketch.Stream.cms(width: 2048, depth: 5)
  |> ExDataSketch.CMS.estimate("/api/users")

Available Stream Functions

FunctionSketch
hll/2HyperLogLog
cms/2Count-Min Sketch
theta/2Theta Sketch
kll/2KLL Quantile Sketch
ddsketch/2DDSketch
req/2REQ Sketch
ull/2UltraLogLog
frequent_items/2FrequentItems (SpaceSaving)
misra_gries/2Misra-Gries
bloom/2Bloom Filter
quotient/2Quotient Filter
cqf/2Counting Quotient Filter
iblt/2Invertible Bloom Lookup Table

All stream functions delegate to the corresponding from_enumerable/2 function. No ingestion logic is duplicated.

reduce_into/3

reduce_into/3 accepts either a sketch module atom or an existing sketch struct:

# Create a new sketch from a module
sketch = ExDataSketch.Stream.reduce_into(items, ExDataSketch.HLL, p: 14)

# Accumulate into an existing sketch
existing = ExDataSketch.HLL.new(p: 14)
sketch = ExDataSketch.Stream.reduce_into(more_items, existing)

reduce_partitioned/3

For large streams, reduce_partitioned/3 splits work into chunks, builds a sketch per chunk, and merges all partial results:

sketch =
  large_stream
  |> ExDataSketch.Stream.reduce_partitioned(ExDataSketch.HLL, partitions: 8, p: 14)

The default partition count is System.schedulers_online(). Partition count does not affect result accuracy for mergeable sketches because merge/2 is associative and commutative. It only affects throughput and memory usage during intermediate stages.

Collectable Protocol

All sketch types that support merge/2 implement the Collectable protocol, enabling Enum.into/2 and Enum.into/3:

# Build an HLL from a range
sketch = Enum.into(1..1000, ExDataSketch.HLL.new(p: 14))

# Build a CMS from a stream
sketch =
  some_stream
  |> Enum.into(ExDataSketch.CMS.new(width: 2048, depth: 5))

Collectable Semantics

Collectable.into/1 returns {sketch, collector_fn} where collector_fn handles:

  • {:cont, item} -- inserts the item via the sketch's update/2 or put/2
  • :done -- returns the completed sketch
  • :halt -- discards the sketch and returns :ok

Collectable vs from_enumerable

For performance-sensitive code, prefer from_enumerable/2 or update_many/2 because they batch items internally. Collectable processes items one at a time, which is correct but may be slower for very large collections.

PatternPerformanceUse case
from_enumerable/2Best (batched)Building from a known collection
update_many/2Best (batched)Adding a batch to an existing sketch
Enum.into/2 (Collectable)Good (one at a time)Pipeline integration, for comprehensions
ExDataSketch.Stream.hll/2Same as from_enumerableLazy stream consumption
reducer/1 + Enum.reduceGood (one at a time)Custom reduce chains

Supported Collectable Sketches

Every sketch that supports merge/2 implements Collectable:

  • HLL, CMS, Theta, KLL, DDSketch, REQ, ULL
  • FrequentItems, MisraGries
  • Bloom, Quotient, CQF, IBLT

Skipped sketches:

  • XorFilter -- static construction requires all items up-front; not mergeable.
  • Cuckoo -- bounded capacity means put/2 can return {:error, :full}; Collectable has no error signalling mechanism.
  • Bloom -- included in Collectable above, but note that Bloom filters have fixed capacity; put/2 does not return an error, but items added beyond the intended capacity degrade the false-positive rate.

Merge and Partition Awareness

Sketch merge operations are associative and commutative. This means:

# These produce equivalent results
HLL.merge(HLL.merge(a, b), c) == HLL.merge(a, HLL.merge(b, c))
HLL.merge(a, b) == HLL.merge(b, a)            # same cardinality

This property is what makes partition-local reduction safe. You can build partial sketches on different workers, partitions, or time windows and merge them later without worrying about order.

Partition-Aware Reduction

# Build partial HLLs per partition, then merge
partial_sketches =
  0..3
  |> Enum.map(fn partition ->
    partition_data
    |> ExDataSketch.HLL.from_enumerable(p: 14)
  end)

final = ExDataSketch.HLL.merge_many(partial_sketches)

This pattern is exactly what reduce_partitioned/3 automates:

final = ExDataSketch.Stream.reduce_partitioned(data, ExDataSketch.HLL, partitions: 4, p: 14)

Elixir Stream Reduction

Elixir's Stream module produces lazy enumerables. When you pipe a stream into ExDataSketch.Stream.hll/2, the stream is consumed once and the sketch accumulates each element. No intermediate list is created.

# Lazy: never holds all items in memory
sketch =
  File.stream!("large_file.csv")
  |> Stream.map(&parse_line/1)
  |> Stream.filter(&valid?/1)
  |> ExDataSketch.Stream.hll(p: 14)

This works because from_enumerable/2 uses update_many/2 internally, which chunks input and processes each chunk without materializing the entire stream.