Distributed Merge Semantics

Copy Markdown View Source

Overview

One of the most powerful properties of probabilistic sketches is their mergeability: two sketches of the same type and parameters can be combined into a single sketch that estimates the union of both input sets. This property makes sketches ideal for distributed aggregation.

This guide explains how merge semantics work, why they matter for distributed systems, and how to use them effectively with ex_data_sketch.

Merge Fundamentals

Associativity and Commutativity

All mergeable sketches in ex_data_sketch satisfy two algebraic laws:

  1. Associativity: merge(merge(a, b), c) == merge(a, merge(b, c))
  2. Commutativity: merge(a, b) == merge(b, a)

These laws mean that the order and grouping of merges does not affect the result. You can merge in any order, in any grouping, at any time, and always get the same answer.

# All three produce the same estimate:
a = HLL.from_enumerable(items_a, p: 14)
b = HLL.from_enumerable(items_b, p: 14)
c = HLL.from_enumerable(items_c, p: 14)

left  = HLL.merge(HLL.merge(a, b), c)
right = HLL.merge(a, HLL.merge(b, c))
flat  = HLL.merge_many([a, b, c])

# All within floating-point tolerance of each other

Idempotency (Approximate)

Merging a sketch with itself is approximately idempotent:

sketch = HLL.from_enumerable(items, p: 14)
merged = HLL.merge(sketch, sketch)

# estimate(merged) == estimate(sketch) -- always true
# because HLL merge takes register-wise maximum

This is exact for HLL and ULL (register-wise max). For CMS (Count-Min Sketch), self-merge is exact because counters are summed and the same items produce the same counters. For Bloom filters, self-merge is a no-op.

Distributed Aggregation Patterns

Pattern 1: Fan-In Aggregation

The simplest pattern: multiple producers each maintain a local sketch, periodically sending it to a central aggregator.

Producer 1 [sketch]
Producer 2 [sketch] Aggregator  Merged Result
Producer 3 [sketch]
defmodule CentralAggregator do
  use GenServer

  def init(_), do: {:ok, %{sketch: HLL.new(p: 14)}}

  def handle_cast({:merge, incoming}, state) do
    merged = HLL.merge(state.sketch, incoming)
    {:noreply, %{state | sketch: merged}}
  end

  def handle_call(:estimate, _, state) do
    {:reply, HLL.estimate(state.sketch), state}
  end
end

Pattern 2: Tree Aggregation

For large clusters, use hierarchical aggregation to avoid overwhelming a single node:

Node 1                      Root
Node 2  Aggregator A 
Node 3                     
Node 4                      Root
Node 5  Aggregator B 
Node 6 
# Level 1: Each node accumulates locally
local_sketch = HLL.from_enumerable(local_events, p: 14)

# Level 2: Fan into intermediate aggregators
send(aggregator_a, {:merge, local_sketch})

# Level 3: Root merges all aggregator results
final = HLL.merge_many(aggregator_sketches)

Tree aggregation reduces the merge burden on any single node from O(N) to O(log N) rounds, each merging O(sqrt(N)) sketches.

Pattern 3: Partition-Local with Periodic Merge

Each Broadway or GenStage partition accumulates independently. A separate process periodically merges all partition sketches:

defmodule MyBroadway do
  use Broadway

  # Each processor partition holds its own sketch
  def handle_message(processor, message, %{sketch: sketch} = context) do
    updated = HLL.update(sketch, message.data.user_id)
    {:ok, message, %{context | sketch: updated}}
  end
end

# Periodic aggregation
defmodule PeriodicAggregator do
  use GenServer

  def handle_info(:flush, state) do
    sketches = for pid <- Broadway.list_processors(MyPipeline) do
      GenServer.call(pid, :get_sketch)
    end
    merged = HLL.merge_many(sketches)
    # Publish merged estimate
    {:noreply, %{state | pending: []}}
  end
end

Pattern 4: ETS-Sharded Aggregation

Use ETS tables as a shared sketch store, enabling any process to contribute to a shared sketch without message passing:

# Create a named ETS table
table = :ets.new(:my_sketches, [:set, :public, :named_table])

# Any process can save
ExDataSketch.Storage.ETS.save(local_sketch, table, "daily:active_users")

# Any process can merge
ExDataSketch.Storage.ETS.merge(another_sketch, table, "daily:active_users")

# Any process can load and estimate
{:ok, merged} = ExDataSketch.Storage.ETS.load(ExDataSketch.HLL, table, "daily:active_users")
ExDataSketch.HLL.estimate(merged)

Merge Correctness Guarantees

Same Parameters Required

All sketches being merged must have the same parameters:

# This raises IncompatibleSketchesError:
a = HLL.new(p: 10)
b = HLL.new(p: 14)
HLL.merge(a, b)  # ** (IncompatibleSketchesError) HLL precision mismatch: 10 vs 14

Sketch Type Must Match

You cannot merge an HLL with a ULL or a CMS:

# Different sketch types cannot be merged
hll = HLL.new(p: 14)
cms = CMS.new(width: 128, depth: 5)
HLL.merge(hll, cms)  # FunctionClauseError -- wrong type

Hash Strategy Compatibility

When merging sketches that used different hash strategies (e.g., one with :phash2 and another with :xxhash3), the merge will raise an error. All merged sketches must use the same hash strategy:

a = HLL.new(p: 14, hash_strategy: :phash2)
b = HLL.new(p: 14, hash_strategy: :xxhash3)
HLL.merge(a, b)  # ** (IncompatibleSketchesError) hash strategy mismatch

Distributed Consistency

Eventual Consistency

Sketch merging provides eventual consistency for cardinality estimates. If partitions accumulate data independently and merge periodically, the merged estimate converges to the true cardinality. The convergence rate depends on:

  1. Merge frequency: More frequent merges = faster convergence
  2. Partition count: More partitions = more merge rounds needed
  3. Sketch precision: Higher p = faster convergence per merge

Network Partition Tolerance

During network partitions, each partition continues to accumulate data independently. When connectivity is restored, merging is straightforward: just merge all partition sketches. No coordination protocol is needed because merge is associative and commutative.

# During partition: nodes A and B accumulate independently
sketch_a = HLL.from_enumerable(events_a, p: 14)
sketch_b = HLL.from_enumerable(events_b, p: 14)

# After partition heals: simple merge
merged = HLL.merge(sketch_a, sketch_b)
# merged estimate reflects the union of both event sets

Time-Windowed Merging

For sliding-window analytics (e.g., "unique users in the last hour"), use separate sketch instances per time window:

defmodule WindowedSketch do
  # One sketch per 5-minute window
  def merge_windows(sketches, window_ms) do
    now = System.monotonic_time(:millisecond)
    cutoff = now - window_ms

    sketches
    |> Enum.filter(fn {timestamp, _sketch} -> timestamp > cutoff end)
    |> Enum.map(fn {_ts, sketch} -> sketch end)
    |> HLL.merge_many()
  end
end

Performance Characteristics

Merge Cost

Sketch TypeMerge CostMemory per Merge
HLL p=14O(2^p) = 16 KB16 KB
ULL p=14O(2^p) + 8~16 KB
CMS 128x5O(w*d) = 640B640 bytes
Bloom 10K capO(n/8) = 1.2KB1.2 KB

Merge is a single pass over the register/counter array. For HLL and ULL, this is a register-wise max operation. For CMS, it's a counter-wise max operation. For Bloom, it's a bitwise OR.

Parallel Merge Scalability

merge_many/1 uses Enum.reduce/3, which merges sequentially. For very large sketch counts, consider parallel reduction:

# Sequential (default)
HLL.merge_many(sketches)

# Parallel (for > 100 sketches)
sketches
|> Task.async_stream(fn chunk -> HLL.merge_many(chunk) end, chunk_size: 10)
|> Enum.map(fn {:ok, result} -> result end)
|> HLL.merge_many()

Anti-Patterns

Don't Merge Too Frequently

Merging on every update defeats the purpose of local accumulation:

# BAD: Merge on every update (wasteful)
def handle_info({:event, user_id}, %{sketch: sketch} = state) do
  new_sketch = HLL.update(sketch, user_id)
  send(aggregator, {:merge, new_sketch})  # 16KB message per event!
  {:noreply, %{state | sketch: new_sketch}}
end

# GOOD: Accumulate locally, merge periodically
def handle_info({:event, user_id}, %{sketch: sketch} = state) do
  {:noreply, %{state | sketch: HLL.update(sketch, user_id)}}
end

def handle_info(:flush, %{sketch: sketch} = state) do
  send(aggregator, {:merge, sketch})  # 16KB message per flush interval
  {:noreply, %{state | sketch: HLL.new(p: 14)}}
end

Don't Use Different Precision Values

# BAD: Different p values across producers
a = HLL.new(p: 10)  # Producer 1 uses p=10
b = HLL.new(p: 14)  # Producer 2 uses p=14
HLL.merge(a, b)      # IncompatibleSketchesError!

Don't Forget to Reset After Flushing

If you flush a sketch to an aggregator, create a new local sketch for the next accumulation period. Otherwise you'll double-count events from the previous period.

Further Reading

  • Flajolet et al., "HyperLogLog: The Analysis of a Near-Optimal Cardinality Estimation Algorithm" (2007) -- HLL register-wise max merge semilattice
  • Cormode and Muthukrishnan, "An Improved Data Stream Summary: The Count-Min Sketch and Its Applications" (2005) -- CMS point-wise max merge
  • Heule, Nunkesser, and Hall, "HyperLogLog in Practice: Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm" (2013) -- ULL sparse/dense representation and linear counting
  • guides/streaming_sketches.md -- Stream API for local accumulation
  • guides/broadway_integration.md -- Broadway pipeline integration
  • guides/genstage_integration.md -- GenStage consumer integration
  • guides/persistence.md -- Persistence for sketch state
  • guides/aggregation_wall.md -- Why sketches break through scaling limits