Last-write-wins key/value buffer where same-key writes coalesce between processing ticks.
Tidefall.HashMap buffers key-value entries in an ETS :set — a hash
table, hence the name — so that writes to the same key overwrite each
other and only the latest value survives to the next processing tick.
It periodically processes buffered entries using a configurable processor
function. Like Queue, it implements partitioning to reduce lock
contention during high-throughput writes, and uses double-buffering to
ensure zero-downtime processing.
It also supports versioned conditional updates via put_newer/4 and
put_all_newer/3, which use "newer version wins" semantics — an entry is
only written if the key doesn't exist or the new version is greater than
the existing one.
Data Flow
put(buffer, key, value) put_newer(buffer, key, value, opts)
| |
v v
+--------------------+ +--------------------+
| Partition Routing | | Partition Routing |
| phash2(key, N) | | phash2(key, N) |
+--------------------+ +--------------------+
| |
v v
+------------------+ +---------------------------+
| :ets.insert | | 1. :ets.insert_new |
| (last-write-wins)| | Key new? -> inserted |
+------------------+ | 2. :ets.select_replace |
\ | new_ver > old_ver? |
\ | Yes -> updated |
\ | No -> skipped |
\ +---------------------------+
\ /
v v
+-------------------------------------+
| ETS :set |
| (key, raw_key, value, version, |
| updates) |
+-------------------------------------+
|
v
+--------------------------------------+
| processor(batch) |
| batch = [%Entry{key, value, version, |
| updates}, ...] |
+--------------------------------------+Entries are routed to partitions via phash2(key) and stored in
:set ETS tables. Regular put/4 uses simple ets:insert
(last-write-wins). Versioned put_newer/4 uses a two-step
atomic approach: ets:insert_new for new keys, then
ets:select_replace for conditional "newer version wins"
updates.
Complex keys and :key_hasher
ETS match specs (used by put_newer/4/put_all_newer/3)
cannot equality-compare arbitrary terms in the match head — in
particular, maps anywhere in the key fall back to subset
semantics and may match the wrong row. Plain put/4 and
put_all/3 are unaffected (they do not use match specs);
:key_hasher is only needed once you use put_newer/4 or
put_all_newer/3 on a complex key. When you do, pass the same
:key_hasher on every operation that touches that key — including
put/4, get/4, and delete/3 — otherwise hashed and non-hashed
writes produce two distinct entries for one logical key (see the
per-function docs for the option).
Start options
:name(atom/0) - Required. The buffer name (used to identify the buffer).:processor- Required. A callback that processes batches of messages. Called with a list of accumulated messages and should handle the processing logic (e.g., send to external service, persist to database, etc.).Can be either:
- A function of arity 1:
fn batch -> ... endor&MyModule.process/1. - An MFA tuple
{Module, Function, Args}: The batch is prepended to the arguments, e.g.,{MyModule, :process, [extra_arg]}will callMyModule.process(batch, extra_arg).
- A function of arity 1:
:partitions(non_neg_integer/0) - Number of partitions to create. Each partition has its own buffer and processing cycle. Defaults toSystem.schedulers_online()to match the number of available schedulers. More partitions reduce lock contention but increase per-partition overhead.:processing_interval(pos_integer/0) - How often (in milliseconds) each partition checks its buffer and initiates processing. Messages are processed at this interval if any are buffered. Lower values mean faster processing but more frequent task spawning. The default value is5000.:processing_timeout(timeout/0) - Maximum time (in milliseconds) for a processing task to complete before being forcefully terminated. Used during graceful shutdown to wait for in-flight processing to complete. SeeTask.Supervisor.async_nolink/3for more information. The default value is60000.:processing_batch_size- Controls how buffered data is passed to the processor callback.Can be either:
A positive integer (default
10): Messages are read from the ETS table in batches of up to this size usingets:selectwith continuations. The processor is called once per batch. This optimizes memory usage for large tables.:table: The ETS table identifier (a tid) is passed directly to the processor instead of reading and batching the data. The processor has full control over how it reads and processes the table.In this mode the processor takes ownership of the table. The buffer creates a fresh table for incoming writes on the next swap and does NOT delete the table you received. When the processing task exits (after the processor returns), ETS will auto-delete the table unless you have already transferred it elsewhere via
:ets.give_away/3. If you want to keep the table beyond the task's lifetime, call:ets.give_away/3to hand it off to another process before returning.
The default value is
10.
Runtime options
:partition_key(term/0) - Determines what value is used as the routing key for partitioning messages.Can be one of four values:
nil(default): The message itself is used as the routing key. Messages with the same content are routed to the same partition.A function of arity 1: Applied to each message to return the routing key. Allows grouping related messages together (e.g., by user ID or account ID) to keep them in the same partition.
An MFA tuple
{Module, Function, Args}: The function is applied with the message prepended to the arguments. Useful for delegating routing logic to a module function while keeping configuration declarative.Any static term: Used as the routing key for all messages, giving explicit control over which partition receives them (e.g.,
:logs,:events, or an identifier).
Fundamentally, this option determines how messages are distributed across partitions. Use it to keep related messages together (for ordering or state locality) or spread unrelated messages across partitions (for parallelism).
The default value is
nil.:key_hasher- Optional key hashing for complex keys that don't survive ETS match-spec equality (e.g. keys containing maps).Can be one of:
- Omitted (default) — no hashing; the key is stored and looked up as-is.
true— hash the key with:erlang.phash2/1. Fast, but 28-bit and so collision-prone; use only when collisions are acceptable.- A function of arity 1 — applied to the key to produce the
storage/lookup key. Use a cryptographic hash (e.g.
&:crypto.hash(:sha256, :erlang.term_to_binary(&1))) if you need collision resistance.
Use consistently
If you write a key with a given
:key_hasher, you must pass the same:key_hasherto every subsequentget/4,delete/3,put_newer/4, andput_all_newer/3call for that key. Otherwise the lookup will compute a different storage key and miss the entry. Mixing hashed and non-hashed writes for the same logical key produces two distinct entries under the hood — that's a caller error, not a library bug.
Additional options accepted by put_newer/4:
:partition_key(term/0) - Determines what value is used as the routing key for partitioning messages.Can be one of four values:
nil(default): The message itself is used as the routing key. Messages with the same content are routed to the same partition.A function of arity 1: Applied to each message to return the routing key. Allows grouping related messages together (e.g., by user ID or account ID) to keep them in the same partition.
An MFA tuple
{Module, Function, Args}: The function is applied with the message prepended to the arguments. Useful for delegating routing logic to a module function while keeping configuration declarative.Any static term: Used as the routing key for all messages, giving explicit control over which partition receives them (e.g.,
:logs,:events, or an identifier).
Fundamentally, this option determines how messages are distributed across partitions. Use it to keep related messages together (for ordering or state locality) or spread unrelated messages across partitions (for parallelism).
The default value is
nil.:key_hasher- Optional key hashing for complex keys that don't survive ETS match-spec equality (e.g. keys containing maps).Can be one of:
- Omitted (default) — no hashing; the key is stored and looked up as-is.
true— hash the key with:erlang.phash2/1. Fast, but 28-bit and so collision-prone; use only when collisions are acceptable.- A function of arity 1 — applied to the key to produce the
storage/lookup key. Use a cryptographic hash (e.g.
&:crypto.hash(:sha256, :erlang.term_to_binary(&1))) if you need collision resistance.
Use consistently
If you write a key with a given
:key_hasher, you must pass the same:key_hasherto every subsequentget/4,delete/3,put_newer/4, andput_all_newer/3call for that key. Otherwise the lookup will compute a different storage key and miss the entry. Mixing hashed and non-hashed writes for the same logical key produces two distinct entries under the hood — that's a caller error, not a library bug.:version- Version stamp used to resolve conflicts. Accepted types: integer, atom, or binary. Defaults (at call time) to:erlang.unique_integer([:monotonic, :positive])— a positive, strictly-monotonic VM-local integer; it's safe to mix with plainput/4entries (whose version is0).Versions are compared with Erlang's
>operator, which uses the total term order. Typical choices: integers (sequence numbers, timestamps), binaries (e.g. ULIDs, lexicographic IDs), or atoms (status-like ordering).Mixing types
Erlang's total order is well-defined across types (
number < atom < ... < bitstring), but mixing types on the same key produces surprising results — for example,100 < :any_atomand:zzz < "anystring". Pick one version type per key and stick with it. Correctness here is the caller's responsibility.
Examples
Standalone Usage
# Start a HashMap buffer with a custom processor
iex> {:ok, _sup_pid} =
...> Tidefall.HashMap.start_link(
...> name: :my_hash_map_buffer,
...> processor: fn batch -> IO.inspect(batch) end
...> )
# Put a single entry
iex> Tidefall.HashMap.put(:my_hash_map_buffer, :key1, "value1")
:ok
# Put multiple entries at once
iex> Tidefall.HashMap.put_all(:my_hash_map_buffer, %{key2: "value2", key3: "value3"})
:ok
# Delete an entry
iex> Tidefall.HashMap.delete(:my_hash_map_buffer, :key1)
:ok
# Versioned put (newer version wins)
iex> Tidefall.HashMap.put_newer(:my_hash_map_buffer, :key4, "v1", version: 100)
:ok
iex> Tidefall.HashMap.put_newer(:my_hash_map_buffer, :key4, "v2", version: 200)
:ok
iex> Tidefall.HashMap.get(:my_hash_map_buffer, :key4)
"v2"
# Check buffer size
iex> Tidefall.HashMap.size(:my_hash_map_buffer)
3
# Stop the buffer gracefully (processes remaining items)
iex> Tidefall.HashMap.stop(:my_hash_map_buffer)
:okAdding to a Supervision Tree
children = [
{Tidefall.HashMap,
name: :my_hash_map_buffer,
processor: &MyApp.EventProcessor.process_batch/1}
]
Supervisor.start_link(children, strategy: :one_for_one)Defining a buffer module
For the recommended module-based pattern — use Tidefall.HashMap,
where the module name becomes the default instance and start options
layer across compile-time use opts, the application environment, and
explicit opts — see the
Module-based buffers
section of Tidefall.
Processor
The processor function receives a list of Tidefall.HashMap.Entry.t/0
structs, where :version is the entry version (set via put_newer/4 and
put_all_newer/3; 0 for regular put/4 entries), and :updates is the
number of times an existing key was replaced by a newer version (only
tracked for versioned updates; regular put/4 entries always have
:updates set to 0):
fn batch ->
Enum.each(batch, fn %Tidefall.HashMap.Entry{key: k, value: v} ->
process(k, v)
end)
endWhen :key_hasher is used, the entry's :key is the original
(pre-hash) key — the hash is purely an internal ETS lookup detail.
See The processor for when it runs, batching, failure isolation, and shutdown-drain behavior.
Summary
Types
Proxy type for a buffer
Key-hashing option accepted by every HashMap operation
A versioned key-value entry — the tuple shape accepted by
put_all_newer/3. (Named kv_entry to avoid clashing with the
private entry record.)
Version stamp used to resolve conflicts in put_newer/4 and
put_all_newer/3. Restricted to integers, atoms, and binaries —
see the :version option for semantics and the caveat about
mixing types.
Functions
Returns the HashMap buffer child spec.
Deletes a key from the buffer's current write table.
Gets the value for the given key from the buffer's current write table.
Puts a single key-value entry into the buffer.
Puts multiple key-value entries into the buffer.
Puts multiple versioned key-value entries into the buffer.
Puts a single versioned key-value entry into the buffer.
Returns the HashMap buffer size (total number of entries across all partitions).
Starts a new HashMap buffer.
Stops a HashMap buffer gracefully.
Updates the options for the HashMap buffer.
Types
@type buffer() :: Tidefall.Buffer.buffer()
Proxy type for a buffer
Key-hashing option accepted by every HashMap operation:
nil(default) — no hashing; the user's key is stored and looked up as-is.true— hash the key with:erlang.phash2/1before storage/lookup. Fast, but 28-bit and so collision-prone; use only when collisions are acceptable.- A function of arity 1 — applied to the user's key to produce
the storage/lookup key. Use a cryptographic hash
(e.g.
&:crypto.hash(:sha256, :erlang.term_to_binary(&1))) if you need collision resistance.
Use consistently
If you write a key with a given :key_hasher, you must
pass the same :key_hasher to every subsequent get/4,
delete/3, put_newer/4, and put_all_newer/3 call for
that key. Otherwise the lookup will compute a different
storage key and miss the entry. Mixing hashed and non-hashed
writes for the same logical key produces two distinct entries
under the hood — that's a user error, not a library bug.
A versioned key-value entry — the tuple shape accepted by
put_all_newer/3. (Named kv_entry to avoid clashing with the
private entry record.)
Version stamp used to resolve conflicts in put_newer/4 and
put_all_newer/3. Restricted to integers, atoms, and binaries —
see the :version option for semantics and the caveat about
mixing types.
Versions are compared with Erlang term ordering, not as wall-clock timestamps. If you use integer timestamps as versions, clock skew across nodes can let a stale write win; prefer a monotonic source (a per-entity counter, or a lexicographically-ordered binary such as a ULID) when cross-node correctness matters.
Functions
@spec child_spec(keyword()) :: Supervisor.child_spec()
Returns the HashMap buffer child spec.
Deletes a key from the buffer's current write table.
Note: If the entry has already been handed off for processing (via double-buffering), this delete will not affect the in-flight batch.
Parameters
buffer- The buffer name (atom).key- The key to delete.opts- Optional runtime options.
Options
See runtime options.
Examples
# Simple delete
delete(:my_buffer, :key1)
# With custom partition routing
delete(:my_buffer, :key1, partition_key: fn {k, _v} -> k end)
Gets the value for the given key from the buffer's current write table.
Returns default if the key is not found.
Note: This reads from the current write table only. Entries already handed off for processing will not be visible.
Parameters
buffer- The buffer name (atom).key- The key to look up.default- The default value if key is not found (defaults tonil).opts- Optional runtime options.
Options
See runtime options.
Examples
# Simple get
get(:my_buffer, :key1)
# With an explicit default and the same partition routing used on write
# (`get/4` is `get(buffer, key, default \ nil, opts \ [])`, so the
# default must be passed before the options)
get(:my_buffer, :key1, nil, partition_key: :shard_a)
Puts a single key-value entry into the buffer.
Parameters
buffer- The buffer name (atom).key- The key for the entry.value- The value for the entry.opts- Optional runtime options.
Options
See runtime options.
Examples
# Simple put
put(:my_buffer, :key1, "val1")
# With custom partition routing
put(:my_buffer, :key1, "val1", partition_key: fn {k, _v} -> k end)
# With key hashing (key is a map — store under a hash; processor
# still receives the original map as the entry's :key)
put(:my_buffer, %{tenant: "acme", id: 42}, "val1", key_hasher: true)
Puts multiple key-value entries into the buffer.
Accepts either a map or a list of {key, value} tuples.
Parameters
buffer- The buffer name (atom).entries- A map or list of{key, value}tuples.opts- Optional runtime options.
Options
See runtime options.
Examples
# Using a map
put_all(:my_buffer, %{key1: "val1", key2: "val2"})
# Using a list of tuples
put_all(:my_buffer, [{:key1, "val1"}, {:key2, "val2"}])
# With custom partition routing
put_all(:my_buffer, %{key1: "val1"}, partition_key: fn {k, _v} -> k end)
Puts multiple versioned key-value entries into the buffer.
Uses "newer version wins" semantics for each entry.
Parameters
buffer- The buffer name (atom).entries- A list of{key, value, version}tuples.opts- Optional runtime options.
Options
See runtime options.
Examples
entries = [
{:user_1, %{name: "Alice"}, 100},
{:user_2, %{name: "Bob"}, 200}
]
put_all_newer(:my_buffer, entries)
Puts a single versioned key-value entry into the buffer.
Uses "newer version wins" semantics: the entry is only written if:
- The key doesn't exist, or
- The new version is greater than the existing version
Useful for scenarios where you want to ensure only the latest version of data is stored, such as event sourcing or state synchronization.
Parameters
buffer- The buffer name (atom).key- The key for the entry.value- The value for the entry.opts- Optional runtime options.
Options
See runtime options.
Examples
# Default version (positive monotonic integer)
put_newer(:my_buffer, :user_123, %{name: "Alice"})
# Explicit version — e.g. a sequence number
put_newer(:my_buffer, :counter, 42, version: 5)
# Complex key with hashing (must use :key_hasher on get/delete too)
put_newer(:my_buffer, %{tenant: "acme", id: 42}, "val", key_hasher: true)
@spec size(buffer()) :: non_neg_integer()
Returns the HashMap buffer size (total number of entries across all partitions).
Examples
size(:my_buffer)
@spec start_link(keyword()) :: Supervisor.on_start()
Starts a new HashMap buffer.
Options
See start options.
Examples
Tidefall.HashMap.start_link(
name: :my_hash_map_buffer,
processor: &MyApp.Sink.process/1
)
Stops a HashMap buffer gracefully.
Examples
Tidefall.HashMap.stop(:my_hash_map_buffer)
Updates the options for the HashMap buffer.
Options
Updatable options: :processing_interval, :processing_timeout,
:processing_batch_size. See start options
for each option's semantics.
Examples
# Update the processing interval to 100ms
update_options(:my_buffer, processing_interval: 100)