Stream-native sketch construction and reduction.
This module provides terminal stream consumers that build sketches from
lazy enumerables without buffering the entire input into memory. Each
function consumes an Enumerable and returns a completed sketch struct.
Patterns are delegated to the per-sketch from_enumerable/2 and
merge_many/1 APIs so that no ingestion logic is duplicated.
Stream Pipeline Example
1..100_000
|> Stream.map(&to_string/1)
|> ExDataSketch.Stream.hll(p: 14)
|> ExDataSketch.HLL.estimate()Partitioned Processing
For large streams that benefit from intermediate aggregation, use
reduce_partitioned/3 to chunk the input, build partial sketches, and
merge them:
1..1_000_000
|> Stream.map(&to_string/1)
|> ExDataSketch.Stream.reduce_partitioned(ExDataSketch.HLL, p: 14)Collectable
All mergeable sketches implement the Collectable protocol, enabling
Enum.into/2 usage:
sketch = Enum.into(1..1000, ExDataSketch.HLL.new(p: 14))See Collectable documentation for each sketch module.
Summary
Functions
Builds a Bloom filter from a stream.
Builds a CMS sketch from a stream.
Builds a CQF (Counting Quotient Filter) from a stream.
Builds a DDSketch from a stream.
Builds a FrequentItems sketch from a stream.
Builds an HLL sketch from a stream.
Builds an IBLT from a stream.
Builds a KLL sketch from a stream.
Builds a MisraGries sketch from a stream.
Builds a Quotient filter from a stream.
Reduces a stream into an existing or new sketch.
Reduces a stream into a sketch using partitioned processing.
Builds an REQ sketch from a stream.
Builds a Theta sketch from a stream.
Builds a ULL sketch from a stream.
Functions
@spec bloom( Enumerable.t(), keyword() ) :: ExDataSketch.Bloom.t()
Builds a Bloom filter from a stream.
Delegates to ExDataSketch.Bloom.from_enumerable/2.
Examples
iex> items = 1..100 |> Stream.map(&to_string/1)
iex> bloom = ExDataSketch.Stream.bloom(items, capacity: 200)
iex> ExDataSketch.Bloom.member?(bloom, "1")
true
@spec cms( Enumerable.t(), keyword() ) :: ExDataSketch.CMS.t()
Builds a CMS sketch from a stream.
Delegates to ExDataSketch.CMS.from_enumerable/2.
Examples
iex> sketch = ["a", "b", "c", "a"] |> ExDataSketch.Stream.cms(width: 64, depth: 3)
iex> ExDataSketch.CMS.estimate(sketch, "a") >= 2
true
@spec cqf( Enumerable.t(), keyword() ) :: ExDataSketch.CQF.t()
Builds a CQF (Counting Quotient Filter) from a stream.
Delegates to ExDataSketch.CQF.from_enumerable/2.
Examples
iex> items = 1..50 |> Stream.map(&to_string/1)
iex> cqf = ExDataSketch.Stream.cqf(items, capacity: 100)
iex> ExDataSketch.CQF.member?(cqf, "1")
true
@spec ddsketch( Enumerable.t(), keyword() ) :: ExDataSketch.DDSketch.t()
Builds a DDSketch from a stream.
Delegates to ExDataSketch.DDSketch.from_enumerable/2.
Examples
iex> sketch = 1..100 |> ExDataSketch.Stream.ddsketch(alpha: 0.01)
iex> is_float(ExDataSketch.DDSketch.quantile(sketch, 0.5))
true
@spec frequent_items( Enumerable.t(), keyword() ) :: ExDataSketch.FrequentItems.t()
Builds a FrequentItems sketch from a stream.
Delegates to ExDataSketch.FrequentItems.from_enumerable/2.
Examples
iex> items = Stream.map(1..200, fn i -> "item_" <> Integer.to_string(rem(i, 20)) end)
iex> sketch = ExDataSketch.Stream.frequent_items(items, k: 10)
iex> length(ExDataSketch.FrequentItems.top_k(sketch, 5)) <= 10
true
@spec hll( Enumerable.t(), keyword() ) :: ExDataSketch.HLL.t()
Builds an HLL sketch from a stream.
Delegates to ExDataSketch.HLL.from_enumerable/2.
Examples
iex> sketch = 1..100 |> Stream.map(&to_string/1) |> ExDataSketch.Stream.hll(p: 10)
iex> ExDataSketch.HLL.estimate(sketch) > 0.0
true
@spec iblt( Enumerable.t(), keyword() ) :: ExDataSketch.IBLT.t()
Builds an IBLT from a stream.
Delegates to ExDataSketch.IBLT.from_enumerable/2.
Examples
iex> items = 1..20 |> Stream.map(&to_string/1)
iex> iblt = ExDataSketch.Stream.iblt(items, m: 40, num_hashes: 3)
iex> ExDataSketch.IBLT.member?(iblt, "1")
true
@spec kll( Enumerable.t(), keyword() ) :: ExDataSketch.KLL.t()
Builds a KLL sketch from a stream.
Delegates to ExDataSketch.KLL.from_enumerable/2.
Examples
iex> sketch = 1..100 |> ExDataSketch.Stream.kll(k: 200)
iex> is_float(ExDataSketch.KLL.quantile(sketch, 0.5))
true
@spec misra_gries( Enumerable.t(), keyword() ) :: ExDataSketch.MisraGries.t()
Builds a MisraGries sketch from a stream.
Delegates to ExDataSketch.MisraGries.from_enumerable/2.
Examples
iex> items = Stream.map(1..200, fn i -> "item_" <> Integer.to_string(rem(i, 20)) end)
iex> sketch = ExDataSketch.Stream.misra_gries(items, k: 10)
iex> ExDataSketch.MisraGries.count(sketch) == 200
true
@spec quotient( Enumerable.t(), keyword() ) :: ExDataSketch.Quotient.t()
Builds a Quotient filter from a stream.
Delegates to ExDataSketch.Quotient.from_enumerable/2.
Examples
iex> items = 1..50 |> Stream.map(&to_string/1)
iex> qf = ExDataSketch.Stream.quotient(items, capacity: 100)
iex> ExDataSketch.Quotient.member?(qf, "1")
true
@spec reduce_into(Enumerable.t(), module() | struct(), keyword()) :: struct()
Reduces a stream into an existing or new sketch.
When sketch_or_module is a sketch struct, items are reduced into it
using the appropriate update function. When it is a module atom
(e.g., ExDataSketch.HLL), a new sketch is created with the given
options and items are reduced into it.
This is a convenience function that wraps Enum.reduce/3 with the
sketch's reducer/0 function.
Examples
# Using a module atom
iex> sketch = 1..100 |> ExDataSketch.Stream.reduce_into(ExDataSketch.HLL, p: 10)
iex> ExDataSketch.HLL.estimate(sketch) > 0.0
true
# Using an existing sketch
iex> existing = ExDataSketch.HLL.new(p: 10)
iex> sketch = ["a", "b"] |> ExDataSketch.Stream.reduce_into(existing)
iex> ExDataSketch.HLL.estimate(sketch) > 0.0
true
@spec reduce_partitioned(Enumerable.t(), module(), keyword()) :: struct()
Reduces a stream into a sketch using partitioned processing.
Splits the enumerable into chunks, builds a partial sketch per chunk,
and merges all partial sketches into a final result. This leverages
merge associativity to produce results identical to a single-pass
from_enumerable/2.
Processing is sequential within this function. For parallel processing,
use ExDataSketch.Flow with Flow.partition/2.
Options
:partitions- number of chunks to split the input into (default:System.schedulers_online()). Must be a positive integer.:update_many_chunk_size- chunk size forfrom_enumerable/2calls within each partition. Defaults tomax(1, div(10_000, partitions)).- All other options are forwarded to the sketch module's
new/1andfrom_enumerable/2.
Examples
iex> sketch = 1..1000 |> ExDataSketch.Stream.reduce_partitioned(ExDataSketch.HLL, partitions: 4, p: 10)
iex> ExDataSketch.HLL.estimate(sketch) > 0.0
true
@spec req( Enumerable.t(), keyword() ) :: ExDataSketch.REQ.t()
Builds an REQ sketch from a stream.
Delegates to ExDataSketch.REQ.from_enumerable/2.
Examples
iex> sketch = 1..100 |> ExDataSketch.Stream.req(k: 200)
iex> is_float(ExDataSketch.REQ.quantile(sketch, 0.5)) or is_nil(ExDataSketch.REQ.quantile(sketch, 0.5))
true
@spec theta( Enumerable.t(), keyword() ) :: ExDataSketch.Theta.t()
Builds a Theta sketch from a stream.
Delegates to ExDataSketch.Theta.from_enumerable/2.
Examples
iex> sketch = 1..50 |> Stream.map(&to_string/1) |> ExDataSketch.Stream.theta(k: 100)
iex> ExDataSketch.Theta.estimate(sketch) > 0.0
true
@spec ull( Enumerable.t(), keyword() ) :: ExDataSketch.ULL.t()
Builds a ULL sketch from a stream.
Delegates to ExDataSketch.ULL.from_enumerable/2.
Examples
iex> sketch = 1..100 |> Stream.map(&to_string/1) |> ExDataSketch.Stream.ull(p: 10)
iex> ExDataSketch.ULL.estimate(sketch) > 0.0
true