ExDataSketch.Stream (ExDataSketch v0.9.0)

Copy Markdown View Source

Stream-native sketch construction and reduction.

This module provides terminal stream consumers that build sketches from lazy enumerables without buffering the entire input into memory. Each function consumes an Enumerable and returns a completed sketch struct.

Patterns are delegated to the per-sketch from_enumerable/2 and merge_many/1 APIs so that no ingestion logic is duplicated.

Stream Pipeline Example

1..100_000
|> Stream.map(&to_string/1)
|> ExDataSketch.Stream.hll(p: 14)
|> ExDataSketch.HLL.estimate()

Partitioned Processing

For large streams that benefit from intermediate aggregation, use reduce_partitioned/3 to chunk the input, build partial sketches, and merge them:

1..1_000_000
|> Stream.map(&to_string/1)
|> ExDataSketch.Stream.reduce_partitioned(ExDataSketch.HLL, p: 14)

Collectable

All mergeable sketches implement the Collectable protocol, enabling Enum.into/2 usage:

sketch = Enum.into(1..1000, ExDataSketch.HLL.new(p: 14))

See Collectable documentation for each sketch module.

Summary

Functions

Builds a Bloom filter from a stream.

Builds a CMS sketch from a stream.

Builds a CQF (Counting Quotient Filter) from a stream.

Builds a DDSketch from a stream.

Builds a FrequentItems sketch from a stream.

Builds an HLL sketch from a stream.

Builds an IBLT from a stream.

Builds a KLL sketch from a stream.

Builds a MisraGries sketch from a stream.

Builds a Quotient filter from a stream.

Reduces a stream into an existing or new sketch.

Reduces a stream into a sketch using partitioned processing.

Builds an REQ sketch from a stream.

Builds a Theta sketch from a stream.

Builds a ULL sketch from a stream.

Functions

bloom(enumerable, opts \\ [])

@spec bloom(
  Enumerable.t(),
  keyword()
) :: ExDataSketch.Bloom.t()

Builds a Bloom filter from a stream.

Delegates to ExDataSketch.Bloom.from_enumerable/2.

Examples

iex> items = 1..100 |> Stream.map(&to_string/1)
iex> bloom = ExDataSketch.Stream.bloom(items, capacity: 200)
iex> ExDataSketch.Bloom.member?(bloom, "1")
true

cms(enumerable, opts \\ [])

@spec cms(
  Enumerable.t(),
  keyword()
) :: ExDataSketch.CMS.t()

Builds a CMS sketch from a stream.

Delegates to ExDataSketch.CMS.from_enumerable/2.

Examples

iex> sketch = ["a", "b", "c", "a"] |> ExDataSketch.Stream.cms(width: 64, depth: 3)
iex> ExDataSketch.CMS.estimate(sketch, "a") >= 2
true

cqf(enumerable, opts \\ [])

@spec cqf(
  Enumerable.t(),
  keyword()
) :: ExDataSketch.CQF.t()

Builds a CQF (Counting Quotient Filter) from a stream.

Delegates to ExDataSketch.CQF.from_enumerable/2.

Examples

iex> items = 1..50 |> Stream.map(&to_string/1)
iex> cqf = ExDataSketch.Stream.cqf(items, capacity: 100)
iex> ExDataSketch.CQF.member?(cqf, "1")
true

ddsketch(enumerable, opts \\ [])

@spec ddsketch(
  Enumerable.t(),
  keyword()
) :: ExDataSketch.DDSketch.t()

Builds a DDSketch from a stream.

Delegates to ExDataSketch.DDSketch.from_enumerable/2.

Examples

iex> sketch = 1..100 |> ExDataSketch.Stream.ddsketch(alpha: 0.01)
iex> is_float(ExDataSketch.DDSketch.quantile(sketch, 0.5))
true

frequent_items(enumerable, opts \\ [])

@spec frequent_items(
  Enumerable.t(),
  keyword()
) :: ExDataSketch.FrequentItems.t()

Builds a FrequentItems sketch from a stream.

Delegates to ExDataSketch.FrequentItems.from_enumerable/2.

Examples

iex> items = Stream.map(1..200, fn i -> "item_" <> Integer.to_string(rem(i, 20)) end)
iex> sketch = ExDataSketch.Stream.frequent_items(items, k: 10)
iex> length(ExDataSketch.FrequentItems.top_k(sketch, 5)) <= 10
true

hll(enumerable, opts \\ [])

@spec hll(
  Enumerable.t(),
  keyword()
) :: ExDataSketch.HLL.t()

Builds an HLL sketch from a stream.

Delegates to ExDataSketch.HLL.from_enumerable/2.

Examples

iex> sketch = 1..100 |> Stream.map(&to_string/1) |> ExDataSketch.Stream.hll(p: 10)
iex> ExDataSketch.HLL.estimate(sketch) > 0.0
true

iblt(enumerable, opts \\ [])

@spec iblt(
  Enumerable.t(),
  keyword()
) :: ExDataSketch.IBLT.t()

Builds an IBLT from a stream.

Delegates to ExDataSketch.IBLT.from_enumerable/2.

Examples

iex> items = 1..20 |> Stream.map(&to_string/1)
iex> iblt = ExDataSketch.Stream.iblt(items, m: 40, num_hashes: 3)
iex> ExDataSketch.IBLT.member?(iblt, "1")
true

kll(enumerable, opts \\ [])

@spec kll(
  Enumerable.t(),
  keyword()
) :: ExDataSketch.KLL.t()

Builds a KLL sketch from a stream.

Delegates to ExDataSketch.KLL.from_enumerable/2.

Examples

iex> sketch = 1..100 |> ExDataSketch.Stream.kll(k: 200)
iex> is_float(ExDataSketch.KLL.quantile(sketch, 0.5))
true

misra_gries(enumerable, opts \\ [])

@spec misra_gries(
  Enumerable.t(),
  keyword()
) :: ExDataSketch.MisraGries.t()

Builds a MisraGries sketch from a stream.

Delegates to ExDataSketch.MisraGries.from_enumerable/2.

Examples

iex> items = Stream.map(1..200, fn i -> "item_" <> Integer.to_string(rem(i, 20)) end)
iex> sketch = ExDataSketch.Stream.misra_gries(items, k: 10)
iex> ExDataSketch.MisraGries.count(sketch) == 200
true

quotient(enumerable, opts \\ [])

@spec quotient(
  Enumerable.t(),
  keyword()
) :: ExDataSketch.Quotient.t()

Builds a Quotient filter from a stream.

Delegates to ExDataSketch.Quotient.from_enumerable/2.

Examples

iex> items = 1..50 |> Stream.map(&to_string/1)
iex> qf = ExDataSketch.Stream.quotient(items, capacity: 100)
iex> ExDataSketch.Quotient.member?(qf, "1")
true

reduce_into(enumerable, sketch_or_module, opts \\ [])

@spec reduce_into(Enumerable.t(), module() | struct(), keyword()) :: struct()

Reduces a stream into an existing or new sketch.

When sketch_or_module is a sketch struct, items are reduced into it using the appropriate update function. When it is a module atom (e.g., ExDataSketch.HLL), a new sketch is created with the given options and items are reduced into it.

This is a convenience function that wraps Enum.reduce/3 with the sketch's reducer/0 function.

Examples

# Using a module atom
iex> sketch = 1..100 |> ExDataSketch.Stream.reduce_into(ExDataSketch.HLL, p: 10)
iex> ExDataSketch.HLL.estimate(sketch) > 0.0
true

# Using an existing sketch
iex> existing = ExDataSketch.HLL.new(p: 10)
iex> sketch = ["a", "b"] |> ExDataSketch.Stream.reduce_into(existing)
iex> ExDataSketch.HLL.estimate(sketch) > 0.0
true

reduce_partitioned(enumerable, module, opts \\ [])

@spec reduce_partitioned(Enumerable.t(), module(), keyword()) :: struct()

Reduces a stream into a sketch using partitioned processing.

Splits the enumerable into chunks, builds a partial sketch per chunk, and merges all partial sketches into a final result. This leverages merge associativity to produce results identical to a single-pass from_enumerable/2.

Processing is sequential within this function. For parallel processing, use ExDataSketch.Flow with Flow.partition/2.

Options

  • :partitions - number of chunks to split the input into (default: System.schedulers_online()). Must be a positive integer.
  • :update_many_chunk_size - chunk size for from_enumerable/2 calls within each partition. Defaults to max(1, div(10_000, partitions)).
  • All other options are forwarded to the sketch module's new/1 and from_enumerable/2.

Examples

iex> sketch = 1..1000 |> ExDataSketch.Stream.reduce_partitioned(ExDataSketch.HLL, partitions: 4, p: 10)
iex> ExDataSketch.HLL.estimate(sketch) > 0.0
true

req(enumerable, opts \\ [])

@spec req(
  Enumerable.t(),
  keyword()
) :: ExDataSketch.REQ.t()

Builds an REQ sketch from a stream.

Delegates to ExDataSketch.REQ.from_enumerable/2.

Examples

iex> sketch = 1..100 |> ExDataSketch.Stream.req(k: 200)
iex> is_float(ExDataSketch.REQ.quantile(sketch, 0.5)) or is_nil(ExDataSketch.REQ.quantile(sketch, 0.5))
true

theta(enumerable, opts \\ [])

@spec theta(
  Enumerable.t(),
  keyword()
) :: ExDataSketch.Theta.t()

Builds a Theta sketch from a stream.

Delegates to ExDataSketch.Theta.from_enumerable/2.

Examples

iex> sketch = 1..50 |> Stream.map(&to_string/1) |> ExDataSketch.Stream.theta(k: 100)
iex> ExDataSketch.Theta.estimate(sketch) > 0.0
true

ull(enumerable, opts \\ [])

@spec ull(
  Enumerable.t(),
  keyword()
) :: ExDataSketch.ULL.t()

Builds a ULL sketch from a stream.

Delegates to ExDataSketch.ULL.from_enumerable/2.

Examples

iex> sketch = 1..100 |> Stream.map(&to_string/1) |> ExDataSketch.Stream.ull(p: 10)
iex> ExDataSketch.ULL.estimate(sketch) > 0.0
true