Tidefall.HashMap (Tidefall v1.0.0-rc.0)

Copy Markdown View Source

Last-write-wins key/value buffer where same-key writes coalesce between processing ticks.

Tidefall.HashMap buffers key-value entries in an ETS :set — a hash table, hence the name — so that writes to the same key overwrite each other and only the latest value survives to the next processing tick. It periodically processes buffered entries using a configurable processor function. Like Queue, it implements partitioning to reduce lock contention during high-throughput writes, and uses double-buffering to ensure zero-downtime processing.

It also supports versioned conditional updates via put_newer/4 and put_all_newer/3, which use "newer version wins" semantics — an entry is only written if the key doesn't exist or the new version is greater than the existing one.

Data Flow

put(buffer, key, value)          put_newer(buffer, key, value, opts)
       |                                    |
       v                                    v
+--------------------+            +--------------------+
| Partition Routing  |            | Partition Routing  |
| phash2(key, N)     |            | phash2(key, N)     |
+--------------------+            +--------------------+
       |                                    |
       v                                    v
+------------------+              +---------------------------+
| :ets.insert      |              | 1. :ets.insert_new        |
| (last-write-wins)|              |    Key new? -> inserted   |
+------------------+              | 2. :ets.select_replace    |
                  \              |    new_ver > old_ver?     |
                   \             |    Yes -> updated         |
                    \            |    No  -> skipped         |
                     \           +---------------------------+
                      \            /
                       v           v
            +-------------------------------------+
            | ETS :set                            |
            | (key, raw_key, value, version,      |
            |  updates)                           |
            +-------------------------------------+
                          |
                          v
            +--------------------------------------+
            | processor(batch)                     |
            | batch = [%Entry{key, value, version, |
            |                 updates}, ...]       |
            +--------------------------------------+

Entries are routed to partitions via phash2(key) and stored in :set ETS tables. Regular put/4 uses simple ets:insert (last-write-wins). Versioned put_newer/4 uses a two-step atomic approach: ets:insert_new for new keys, then ets:select_replace for conditional "newer version wins" updates.

Complex keys and :key_hasher

ETS match specs (used by put_newer/4/put_all_newer/3) cannot equality-compare arbitrary terms in the match head — in particular, maps anywhere in the key fall back to subset semantics and may match the wrong row. Plain put/4 and put_all/3 are unaffected (they do not use match specs); :key_hasher is only needed once you use put_newer/4 or put_all_newer/3 on a complex key. When you do, pass the same :key_hasher on every operation that touches that key — including put/4, get/4, and delete/3 — otherwise hashed and non-hashed writes produce two distinct entries for one logical key (see the per-function docs for the option).

Start options

  • :name (atom/0) - Required. The buffer name (used to identify the buffer).

  • :processor - Required. A callback that processes batches of messages. Called with a list of accumulated messages and should handle the processing logic (e.g., send to external service, persist to database, etc.).

    Can be either:

    • A function of arity 1: fn batch -> ... end or &MyModule.process/1.
    • An MFA tuple {Module, Function, Args}: The batch is prepended to the arguments, e.g., {MyModule, :process, [extra_arg]} will call MyModule.process(batch, extra_arg).
  • :partitions (non_neg_integer/0) - Number of partitions to create. Each partition has its own buffer and processing cycle. Defaults to System.schedulers_online() to match the number of available schedulers. More partitions reduce lock contention but increase per-partition overhead.

  • :processing_interval (pos_integer/0) - How often (in milliseconds) each partition checks its buffer and initiates processing. Messages are processed at this interval if any are buffered. Lower values mean faster processing but more frequent task spawning. The default value is 5000.

  • :processing_timeout (timeout/0) - Maximum time (in milliseconds) for a processing task to complete before being forcefully terminated. Used during graceful shutdown to wait for in-flight processing to complete. See Task.Supervisor.async_nolink/3 for more information. The default value is 60000.

  • :processing_batch_size - Controls how buffered data is passed to the processor callback.

    Can be either:

    • A positive integer (default 10): Messages are read from the ETS table in batches of up to this size using ets:select with continuations. The processor is called once per batch. This optimizes memory usage for large tables.

    • :table: The ETS table identifier (a tid) is passed directly to the processor instead of reading and batching the data. The processor has full control over how it reads and processes the table.

      In this mode the processor takes ownership of the table. The buffer creates a fresh table for incoming writes on the next swap and does NOT delete the table you received. When the processing task exits (after the processor returns), ETS will auto-delete the table unless you have already transferred it elsewhere via :ets.give_away/3. If you want to keep the table beyond the task's lifetime, call :ets.give_away/3 to hand it off to another process before returning.

    The default value is 10.

Runtime options

  • :partition_key (term/0) - Determines what value is used as the routing key for partitioning messages.

    Can be one of four values:

    • nil (default): The message itself is used as the routing key. Messages with the same content are routed to the same partition.

    • A function of arity 1: Applied to each message to return the routing key. Allows grouping related messages together (e.g., by user ID or account ID) to keep them in the same partition.

    • An MFA tuple {Module, Function, Args}: The function is applied with the message prepended to the arguments. Useful for delegating routing logic to a module function while keeping configuration declarative.

    • Any static term: Used as the routing key for all messages, giving explicit control over which partition receives them (e.g., :logs, :events, or an identifier).

    Fundamentally, this option determines how messages are distributed across partitions. Use it to keep related messages together (for ordering or state locality) or spread unrelated messages across partitions (for parallelism).

    The default value is nil.

  • :key_hasher - Optional key hashing for complex keys that don't survive ETS match-spec equality (e.g. keys containing maps).

    Can be one of:

    • Omitted (default) — no hashing; the key is stored and looked up as-is.
    • true — hash the key with :erlang.phash2/1. Fast, but 28-bit and so collision-prone; use only when collisions are acceptable.
    • A function of arity 1 — applied to the key to produce the storage/lookup key. Use a cryptographic hash (e.g. &:crypto.hash(:sha256, :erlang.term_to_binary(&1))) if you need collision resistance.

    Use consistently

    If you write a key with a given :key_hasher, you must pass the same :key_hasher to every subsequent get/4, delete/3, put_newer/4, and put_all_newer/3 call for that key. Otherwise the lookup will compute a different storage key and miss the entry. Mixing hashed and non-hashed writes for the same logical key produces two distinct entries under the hood — that's a caller error, not a library bug.

Additional options accepted by put_newer/4:

  • :partition_key (term/0) - Determines what value is used as the routing key for partitioning messages.

    Can be one of four values:

    • nil (default): The message itself is used as the routing key. Messages with the same content are routed to the same partition.

    • A function of arity 1: Applied to each message to return the routing key. Allows grouping related messages together (e.g., by user ID or account ID) to keep them in the same partition.

    • An MFA tuple {Module, Function, Args}: The function is applied with the message prepended to the arguments. Useful for delegating routing logic to a module function while keeping configuration declarative.

    • Any static term: Used as the routing key for all messages, giving explicit control over which partition receives them (e.g., :logs, :events, or an identifier).

    Fundamentally, this option determines how messages are distributed across partitions. Use it to keep related messages together (for ordering or state locality) or spread unrelated messages across partitions (for parallelism).

    The default value is nil.

  • :key_hasher - Optional key hashing for complex keys that don't survive ETS match-spec equality (e.g. keys containing maps).

    Can be one of:

    • Omitted (default) — no hashing; the key is stored and looked up as-is.
    • true — hash the key with :erlang.phash2/1. Fast, but 28-bit and so collision-prone; use only when collisions are acceptable.
    • A function of arity 1 — applied to the key to produce the storage/lookup key. Use a cryptographic hash (e.g. &:crypto.hash(:sha256, :erlang.term_to_binary(&1))) if you need collision resistance.

    Use consistently

    If you write a key with a given :key_hasher, you must pass the same :key_hasher to every subsequent get/4, delete/3, put_newer/4, and put_all_newer/3 call for that key. Otherwise the lookup will compute a different storage key and miss the entry. Mixing hashed and non-hashed writes for the same logical key produces two distinct entries under the hood — that's a caller error, not a library bug.

  • :version - Version stamp used to resolve conflicts. Accepted types: integer, atom, or binary. Defaults (at call time) to :erlang.unique_integer([:monotonic, :positive]) — a positive, strictly-monotonic VM-local integer; it's safe to mix with plain put/4 entries (whose version is 0).

    Versions are compared with Erlang's > operator, which uses the total term order. Typical choices: integers (sequence numbers, timestamps), binaries (e.g. ULIDs, lexicographic IDs), or atoms (status-like ordering).

    Mixing types

    Erlang's total order is well-defined across types (number < atom < ... < bitstring), but mixing types on the same key produces surprising results — for example, 100 < :any_atom and :zzz < "anystring". Pick one version type per key and stick with it. Correctness here is the caller's responsibility.

Examples

Standalone Usage

# Start a HashMap buffer with a custom processor
iex> {:ok, _sup_pid} =
...>   Tidefall.HashMap.start_link(
...>     name: :my_hash_map_buffer,
...>     processor: fn batch -> IO.inspect(batch) end
...>   )

# Put a single entry
iex> Tidefall.HashMap.put(:my_hash_map_buffer, :key1, "value1")
:ok

# Put multiple entries at once
iex> Tidefall.HashMap.put_all(:my_hash_map_buffer, %{key2: "value2", key3: "value3"})
:ok

# Delete an entry
iex> Tidefall.HashMap.delete(:my_hash_map_buffer, :key1)
:ok

# Versioned put (newer version wins)
iex> Tidefall.HashMap.put_newer(:my_hash_map_buffer, :key4, "v1", version: 100)
:ok
iex> Tidefall.HashMap.put_newer(:my_hash_map_buffer, :key4, "v2", version: 200)
:ok
iex> Tidefall.HashMap.get(:my_hash_map_buffer, :key4)
"v2"

# Check buffer size
iex> Tidefall.HashMap.size(:my_hash_map_buffer)
3

# Stop the buffer gracefully (processes remaining items)
iex> Tidefall.HashMap.stop(:my_hash_map_buffer)
:ok

Adding to a Supervision Tree

children = [
  {Tidefall.HashMap,
   name: :my_hash_map_buffer,
   processor: &MyApp.EventProcessor.process_batch/1}
]

Supervisor.start_link(children, strategy: :one_for_one)

Defining a buffer module

For the recommended module-based pattern — use Tidefall.HashMap, where the module name becomes the default instance and start options layer across compile-time use opts, the application environment, and explicit opts — see the Module-based buffers section of Tidefall.

Processor

The processor function receives a list of Tidefall.HashMap.Entry.t/0 structs, where :version is the entry version (set via put_newer/4 and put_all_newer/3; 0 for regular put/4 entries), and :updates is the number of times an existing key was replaced by a newer version (only tracked for versioned updates; regular put/4 entries always have :updates set to 0):

fn batch ->
  Enum.each(batch, fn %Tidefall.HashMap.Entry{key: k, value: v} ->
    process(k, v)
  end)
end

When :key_hasher is used, the entry's :key is the original (pre-hash) key — the hash is purely an internal ETS lookup detail.

See The processor for when it runs, batching, failure isolation, and shutdown-drain behavior.

Summary

Types

Proxy type for a buffer

Key-hashing option accepted by every HashMap operation

A versioned key-value entry — the tuple shape accepted by put_all_newer/3. (Named kv_entry to avoid clashing with the private entry record.)

Version stamp used to resolve conflicts in put_newer/4 and put_all_newer/3. Restricted to integers, atoms, and binaries — see the :version option for semantics and the caveat about mixing types.

Functions

Returns the HashMap buffer child spec.

Deletes a key from the buffer's current write table.

Gets the value for the given key from the buffer's current write table.

Puts a single key-value entry into the buffer.

Puts multiple key-value entries into the buffer.

Puts multiple versioned key-value entries into the buffer.

Puts a single versioned key-value entry into the buffer.

Returns the HashMap buffer size (total number of entries across all partitions).

Starts a new HashMap buffer.

Stops a HashMap buffer gracefully.

Updates the options for the HashMap buffer.

Types

buffer()

@type buffer() :: Tidefall.Buffer.buffer()

Proxy type for a buffer

key_hasher()

@type key_hasher() :: true | (any() -> any())

Key-hashing option accepted by every HashMap operation:

  • nil (default) — no hashing; the user's key is stored and looked up as-is.
  • true — hash the key with :erlang.phash2/1 before storage/lookup. Fast, but 28-bit and so collision-prone; use only when collisions are acceptable.
  • A function of arity 1 — applied to the user's key to produce the storage/lookup key. Use a cryptographic hash (e.g. &:crypto.hash(:sha256, :erlang.term_to_binary(&1))) if you need collision resistance.

Use consistently

If you write a key with a given :key_hasher, you must pass the same :key_hasher to every subsequent get/4, delete/3, put_newer/4, and put_all_newer/3 call for that key. Otherwise the lookup will compute a different storage key and miss the entry. Mixing hashed and non-hashed writes for the same logical key produces two distinct entries under the hood — that's a user error, not a library bug.

kv_entry()

@type kv_entry() :: {key :: any(), value :: any(), version :: version()}

A versioned key-value entry — the tuple shape accepted by put_all_newer/3. (Named kv_entry to avoid clashing with the private entry record.)

version()

@type version() :: integer() | atom() | binary()

Version stamp used to resolve conflicts in put_newer/4 and put_all_newer/3. Restricted to integers, atoms, and binaries — see the :version option for semantics and the caveat about mixing types.

Versions are compared with Erlang term ordering, not as wall-clock timestamps. If you use integer timestamps as versions, clock skew across nodes can let a stale write win; prefer a monotonic source (a per-entity counter, or a lexicographically-ordered binary such as a ULID) when cross-node correctness matters.

Functions

child_spec(opts)

@spec child_spec(keyword()) :: Supervisor.child_spec()

Returns the HashMap buffer child spec.

delete(buffer, key, opts \\ [])

@spec delete(buffer(), any(), keyword()) :: :ok

Deletes a key from the buffer's current write table.

Note: If the entry has already been handed off for processing (via double-buffering), this delete will not affect the in-flight batch.

Parameters

  • buffer - The buffer name (atom).
  • key - The key to delete.
  • opts - Optional runtime options.

Options

See runtime options.

Examples

# Simple delete
delete(:my_buffer, :key1)

# With custom partition routing
delete(:my_buffer, :key1, partition_key: fn {k, _v} -> k end)

get(buffer, key, default \\ nil, opts \\ [])

@spec get(buffer(), any(), any(), keyword()) :: any()

Gets the value for the given key from the buffer's current write table.

Returns default if the key is not found.

Note: This reads from the current write table only. Entries already handed off for processing will not be visible.

Parameters

  • buffer - The buffer name (atom).
  • key - The key to look up.
  • default - The default value if key is not found (defaults to nil).
  • opts - Optional runtime options.

Options

See runtime options.

Examples

# Simple get
get(:my_buffer, :key1)

# With an explicit default and the same partition routing used on write
# (`get/4` is `get(buffer, key, default \ nil, opts \ [])`, so the
# default must be passed before the options)
get(:my_buffer, :key1, nil, partition_key: :shard_a)

put(buffer, key, value, opts \\ [])

@spec put(buffer(), any(), any(), keyword()) :: :ok

Puts a single key-value entry into the buffer.

Parameters

  • buffer - The buffer name (atom).
  • key - The key for the entry.
  • value - The value for the entry.
  • opts - Optional runtime options.

Options

See runtime options.

Examples

# Simple put
put(:my_buffer, :key1, "val1")

# With custom partition routing
put(:my_buffer, :key1, "val1", partition_key: fn {k, _v} -> k end)

# With key hashing (key is a map — store under a hash; processor
# still receives the original map as the entry's :key)
put(:my_buffer, %{tenant: "acme", id: 42}, "val1", key_hasher: true)

put_all(buffer, entries, opts \\ [])

@spec put_all(buffer(), map() | [{any(), any()}], keyword()) :: :ok

Puts multiple key-value entries into the buffer.

Accepts either a map or a list of {key, value} tuples.

Parameters

  • buffer - The buffer name (atom).
  • entries - A map or list of {key, value} tuples.
  • opts - Optional runtime options.

Options

See runtime options.

Examples

# Using a map
put_all(:my_buffer, %{key1: "val1", key2: "val2"})

# Using a list of tuples
put_all(:my_buffer, [{:key1, "val1"}, {:key2, "val2"}])

# With custom partition routing
put_all(:my_buffer, %{key1: "val1"}, partition_key: fn {k, _v} -> k end)

put_all_newer(buffer, entries, opts \\ [])

@spec put_all_newer(buffer(), [kv_entry()], keyword()) :: :ok

Puts multiple versioned key-value entries into the buffer.

Uses "newer version wins" semantics for each entry.

Parameters

  • buffer - The buffer name (atom).
  • entries - A list of {key, value, version} tuples.
  • opts - Optional runtime options.

Options

See runtime options.

Examples

entries = [
  {:user_1, %{name: "Alice"}, 100},
  {:user_2, %{name: "Bob"}, 200}
]
put_all_newer(:my_buffer, entries)

put_newer(buffer, key, value, opts \\ [])

@spec put_newer(buffer(), any(), any(), keyword()) :: :ok

Puts a single versioned key-value entry into the buffer.

Uses "newer version wins" semantics: the entry is only written if:

  • The key doesn't exist, or
  • The new version is greater than the existing version

Useful for scenarios where you want to ensure only the latest version of data is stored, such as event sourcing or state synchronization.

Parameters

  • buffer - The buffer name (atom).
  • key - The key for the entry.
  • value - The value for the entry.
  • opts - Optional runtime options.

Options

See runtime options.

Examples

# Default version (positive monotonic integer)
put_newer(:my_buffer, :user_123, %{name: "Alice"})

# Explicit version — e.g. a sequence number
put_newer(:my_buffer, :counter, 42, version: 5)

# Complex key with hashing (must use :key_hasher on get/delete too)
put_newer(:my_buffer, %{tenant: "acme", id: 42}, "val", key_hasher: true)

size(buffer)

@spec size(buffer()) :: non_neg_integer()

Returns the HashMap buffer size (total number of entries across all partitions).

Examples

size(:my_buffer)

start_link(opts \\ [])

@spec start_link(keyword()) :: Supervisor.on_start()

Starts a new HashMap buffer.

Options

See start options.

Examples

Tidefall.HashMap.start_link(
  name: :my_hash_map_buffer,
  processor: &MyApp.Sink.process/1
)

stop(buffer, reason \\ :normal, timeout \\ :infinity)

@spec stop(buffer() | pid(), reason :: any(), timeout()) :: :ok

Stops a HashMap buffer gracefully.

Examples

Tidefall.HashMap.stop(:my_hash_map_buffer)

update_options(buffer, opts)

@spec update_options(
  buffer(),
  keyword()
) :: :ok

Updates the options for the HashMap buffer.

Options

Updatable options: :processing_interval, :processing_timeout, :processing_batch_size. See start options for each option's semantics.

Examples

# Update the processing interval to 100ms
update_options(:my_buffer, processing_interval: 100)