ExDataSketch integrates with Flow for parallel
partition-local sketch reduction. This guide explains how to use
ExDataSketch.Flow for distributed cardinality counting, frequency estimation,
and other sketch workloads across multiple partitions.
Dependency
Add {:flow, "~> 1.2"} to your mix.exs dependencies. Flow is an optional
dependency -- if it is not present, calling Flow integration functions will
raise a clear error directing you to add it.
Why Flow?
Flow provides parallel data processing with partition-local reduction. Because sketch merge is associative and commutative, partial results from each partition can be combined in any order to produce the same final result. This makes sketches ideal Flow accumulators.
Partition-Local Reduction
The primary pattern is reduce/3 followed by merge/2:
alias ExDataSketch.Flow
# Parallel cardinality counting
final =
File.stream!("events.csv")
|> Stream.map(&parse_user_id/1)
|> Flow.from_enumerable()
|> Flow.partition()
|> ExDataSketch.Flow.reduce(ExDataSketch.HLL, p: 14)
|> ExDataSketch.Flow.merge(ExDataSketch.HLL)
ExDataSketch.HLL.estimate(final)How It Works
Flow.from_enumerable/1creates a Flow from the stream.Flow.partition/1splits the Flow across schedulers (default:System.schedulers_online()partitions).ExDataSketch.Flow.reduce/3creates one sketch per partition and reduces each element into it using the sketch'sreducer/0.ExDataSketch.Flow.merge/2collects all partition sketches and merges them usingmerge_many/1.
The result is a single merged sketch equivalent to a single-pass
from_enumerable/2, but computed in parallel.
Using Merge Results
After merge/2, you have a single sketch struct. You can query it directly:
# HLL - cardinality estimation
final = items
|> Flow.from_enumerable()
|> Flow.partition()
|> ExDataSketch.Flow.reduce(ExDataSketch.HLL, p: 14)
|> ExDataSketch.Flow.merge(ExDataSketch.HLL)
ExDataSketch.HLL.estimate(final)
# CMS - frequency estimation
final = items
|> Flow.from_enumerable()
|> Flow.partition()
|> ExDataSketch.Flow.reduce(ExDataSketch.CMS, width: 2048, depth: 5)
|> ExDataSketch.Flow.merge(ExDataSketch.CMS)
ExDataSketch.CMS.estimate(final, "popular_item")Single-Partition Collection
For simpler use cases where parallel reduction is not needed, use into/3:
sketch =
1..1000
|> Flow.from_enumerable()
|> ExDataSketch.Flow.into(ExDataSketch.HLL, p: 14)This materializes the entire Flow into a single partition. It does not benefit
from parallel reduction. For workloads requiring parallelism, prefer
reduce/3 followed by merge/2.
Supported Sketches
All mergeable sketch types work with Flow:
| Sketch | Module | Merge? |
|---|---|---|
| HLL | ExDataSketch.HLL | Yes |
| CMS | ExDataSketch.CMS | Yes |
| Theta | ExDataSketch.Theta | Yes |
| KLL | ExDataSketch.KLL | Yes |
| DDSketch | ExDataSketch.DDSketch | Yes |
| REQ | ExDataSketch.REQ | Yes |
| ULL | ExDataSketch.ULL | Yes |
| FrequentItems | ExDataSketch.FrequentItems | Yes |
| MisraGries | ExDataSketch.MisraGries | Yes |
| Bloom | ExDataSketch.Bloom | Yes |
| Quotient | ExDataSketch.Quotient | Yes |
| CQF | ExDataSketch.CQF | Yes |
| IBLT | ExDataSketch.IBLT | Yes |
Comparing Flow with Other Approaches
| Approach | Parallelism | Best for |
|---|---|---|
from_enumerable/2 | None (single-pass) | Simple collections |
Enum.into/2 (Collectable) | None (single-pass) | Pipeline integration |
ExDataSketch.Stream | None (lazy stream) | Lazy stream consumption |
ExDataSketch.Flow.reduce/3 | Partition-local | Large datasets, multi-core |
ExDataSketch.Flow.into/3 | None (single partition) | Simpler use cases |
Configuration
Flow integration can be explicitly enabled or disabled via application config:
config :ex_data_sketch, :integrations, flow: true