UltraLogLog.Concurrent (UltraLogLog v0.2.0)

Copy Markdown View Source

Lock-free, :atomics-backed concurrent insert path for UltraLogLog.

A UltraLogLog.Concurrent sketch is safe to insert into from any process or scheduler with no GenServer, no lock, and no message passing. The insert path is a CAS loop over a per-register 64-bit :atomics cell. For value semantics — snapshots, serialization round-trips, explicit CRDT merge — use the immutable UltraLogLog instead, or call snapshot/1 to materialize the immutable form from a concurrent sketch.

Quick start

iex> {:ok, c} = UltraLogLog.Concurrent.new(precision: 12)
iex> :ok = UltraLogLog.Concurrent.add(c, "session-abc")
iex> :ok = UltraLogLog.Concurrent.add(c, "session-xyz")
iex> sketch = UltraLogLog.Concurrent.snapshot(c)
iex> {:ok, count} = UltraLogLog.cardinality(sketch)
iex> abs(count - 2.0) < 0.1
true

Concurrent inserts from many processes, all hitting the same shared sketch:

{:ok, c} = UltraLogLog.Concurrent.new(precision: 12)

1..1_000
|> Enum.chunk_every(100)
|> Enum.map(fn chunk ->
  Task.async(fn -> Enum.each(chunk, &UltraLogLog.Concurrent.add(c, &1)) end)
end)
|> Task.await_many(:infinity)

sketch = UltraLogLog.Concurrent.snapshot(c)
{:ok, _count} = UltraLogLog.cardinality(sketch)

add/2 returns :ok rather than an updated struct — it is a side-effecting operation on shared mutable state, not a value transformation. This is the deliberate API difference from UltraLogLog.add/2.

When to use this

Use UltraLogLog.Concurrent when many processes need to feed a single shared sketch — for example, every request handler in a web server inserting into one per-metric sketch. Use the immutable UltraLogLog when inserts are single-threaded or when you need value semantics.

The active concurrent structure costs 8× the immutable form's memory (one 64-bit :atomics cell per register; 128 KB at p=14 vs the immutable form's 16 KB). This is deliberate. Concurrent sketches are long-lived shared objects — typically one per metric, not millions — and 128 KB of shared state is irrelevant. The alternative of packing 8 registers per cell was rejected because it creates logical false sharing: two inserts targeting independent registers would contend just because they share a 64-bit word, amplifying contention 8× and turning the CAS loop into a bit-twiddling exercise. One cell per register keeps the CAS loop clean and provably correct.

The compact 16 KB serialization form remains available via snapshot/1.

Correctness under concurrency

Register merge (UltraLogLog.Encoding.merge_registers/2) is commutative, associative, and idempotent — i.e. a CRDT join. The CAS loop in add/2 only ever applies merge_registers to the observed cell value, so the concurrent result is order-independent by construction: a concurrent sketch built by N processes inserting in parallel is identical, byte-for-byte, to an immutable sketch built by inserting the same elements serially, for any interleaving. See test/concurrent_test.exs for the equivalence-under-contention proof — verified by exact register-byte equality at multiple (p, N) cells and reinforced by a StreamData property test.

Memory model

:atomics operations are sequentially consistent. Synchronization between writers and a subsequent snapshot/1 is provided by the usual BEAM mechanisms — typically Task.await/2 or Task.await_many/2 after writer tasks complete establishes happens-before between the last CAS and the snapshot.

See snapshot/1 for the (intentional, safe-by-construction) semantics of snapshots taken while writers are still active.

Summary

Functions

Insert a value into the sketch.

Create a new empty concurrent sketch.

Convert the concurrent sketch to an immutable %UltraLogLog{} for estimation, serialization, or merge.

Types

t()

@opaque t()

Functions

add(c, hash)

@spec add(t(), term() | non_neg_integer()) :: :ok

Insert a value into the sketch.

Accepts any term (hashed internally via UltraLogLog.Hash) or a pre-computed 64-bit unsigned hash — same contract as UltraLogLog.add/2. Returns :ok.

Safe to call from any process / scheduler concurrently. The insert path is a lock-free CAS loop on a single :atomics cell.

new(opts \\ [])

@spec new(keyword()) :: {:ok, t()}

Create a new empty concurrent sketch.

Options

  • :precision3..26, default 12. State size is 8 · 2^precision bytes for the active structure (one 64-bit :atomics cell per register). The compact snapshot/1 form remains 2^precision bytes.

Returns {:ok, struct}.

snapshot(concurrent)

@spec snapshot(t()) :: UltraLogLog.t()

Convert the concurrent sketch to an immutable %UltraLogLog{} for estimation, serialization, or merge.

Snapshot semantics

snapshot/1 reads each :atomics cell independently. It is therefore not a globally atomic snapshot: if writers are active during the snapshot, different cells may reflect different moments in time.

This is safe by construction. Register merge is monotone in the UltraLogLog partial order, so a "torn" snapshot is always a valid intermediate sketch — never an invalid one — and its cardinality estimate is a legitimate estimate of a state the sketch genuinely passed through.

For a globally consistent snapshot, quiesce writers first (e.g. Task.await_many/2 on all insert tasks, as the test suite does).

Martingale

The returned %UltraLogLog{} has martingale: nil. The martingale estimator requires the per-insert update history (Ertl 2024 §3.7, Algorithm 2), which a concurrent sketch does not maintain. UltraLogLog.cardinality(snap, estimator: :martingale) therefore returns {:error, :invalidated_by_merge} on a snapshot. The FGRA and MLE estimators work normally.