Ferricstore.Store.Router (ferricstore v0.3.4)

Copy Markdown View Source

Routes keys to shard GenServers using consistent hashing via :erlang.phash2/2.

This is a pure module with no process state. It provides two categories of functions:

  1. Routing helpers -- shard_for/2 and shard_name/2 map a key to its owning shard index and registered process name respectively. Supports Redis hash tags: keys containing {tag} are hashed on the tag content, allowing related keys to co-locate on the same shard.

  2. Convenience accessors -- get/2, put/4, delete/2, exists?/2, keys/1, and dbsize/1 dispatch to the correct shard GenServer transparently.

All public functions take a ctx (FerricStore.Instance.t()) as the first argument, replacing all persistent_term lookups with instance-local state.

Summary

Functions

Atomically appends suffix to the value of key.

Batch async PUT for pipelined SET commands without options.

Batch quorum PUT for pipelined SET commands.

Runs a Bitmap RMW command (BITOP / BITFIELD) atomically via Raft. SETBIT has its own setbit/4 path. GETBIT/BITCOUNT/BITPOS are read-only and do not go through this path.

Returns the count of all live keys across every shard.

Deletes key. Returns :ok whether or not the key existed.

Public wrapper for durability_for_key, used by batch SET fast path.

Executes a list_op inline under a held latch. Called from Router.async_list_op (fast path) and RmwCoordinator.handle_call (contended path). The latch guarantees exclusive access to the list's compound keys.

Executes an RMW command inline against local ETS + BitcaskWriter and submits the delta to Raft via Batcher.async_submit.

Returns true if key exists and is not expired.

Fast ETS-direct existence check for a key.

Extracts the hash tag from a key, following Redis hash tag semantics.

Runs a Geo RMW command (GEOADD, GEOSEARCHSTORE) atomically via Raft. GEOSEARCHSTORE routes by the destination key; GEOADD routes by the key. Read-only ops (GEOPOS, GEODIST, GEOHASH, GEOSEARCH) don't go through here.

Retrieves the value for key, or nil if the key does not exist or is expired.

Returns the on-disk file reference for a key's value, or nil.

Returns the keydir disk location for a key, or :miss.

Returns {value, expire_at_ms} for a live key, or nil if the key does not exist or is expired.

Returns the current write version of the shard that owns key.

Unified GET that returns everything from a single ETS lookup.

Atomically gets and deletes key.

Atomically gets the value and updates the expiry of key.

Atomically gets the old value and sets a new value for key.

Atomically increments the integer value of hash field field in key by delta. Returns {:ok, new_int} or {:error, reason}. Shares ordering with the parent hash's shard (routes by the hash's redis_key).

Atomically increments the float value of hash field field in key by delta. Returns the new value as a string, or {:error, reason}.

Runs a HyperLogLog RMW command (PFADD / PFMERGE) atomically via Raft. PFCOUNT is read-only and should not go through this path.

Atomically increments the integer value of key by delta.

Atomically increments the float value of key by delta.

Runs a JSON RMW command atomically via Raft. cmd is the Redis command name (e.g. "JSON.SET"), args is the argument list starting with the key. The state machine dispatches to Ferricstore.Commands.Json.handle/3 with a state-machine-scoped store, so concurrent callers serialize through the Raft log — no lost updates.

Returns the keydir ETS table ref for the shard at index.

Returns all live (non-expired, non-deleted) keys across every shard.

Returns the maximum allowed value size in bytes.

Routes a probabilistic data structure write command through Raft.

Stores key with value. expire_at_ms is an absolute Unix-epoch timestamp in milliseconds; pass 0 for no expiry.

Submits a server command through Raft for replication to all nodes.

Atomically sets the bit at offset to bit_val (0 or 1). Returns the previous bit value (0 or 1). Extends the bitmap with zero bytes if necessary. Goes through Raft so concurrent SETBITs on the same key never lose updates — the state machine is the sole mutator.

Atomically overwrites part of the string at key starting at offset.

Returns the shard index (0-based) that owns key.

Returns the registered process name for the shard at index.

Returns the slot (0-1023) for a key, respecting hash tags.

Runs a TDigest RMW command (TDIGEST.ADD / RESET / MERGE / CREATE) atomically via Raft. Read-only ops (QUANTILE, CDF, INFO, RANK, etc.) stay in the caller process.

Atomically increments the score of member in the sorted set at key by increment. Returns the new score as a string.

Functions

append(ctx, key, suffix)

@spec append(FerricStore.Instance.t(), binary(), binary()) :: {:ok, non_neg_integer()}

Atomically appends suffix to the value of key.

If the key does not exist, it is created with value suffix. Returns {:ok, new_byte_length}.

batch_async_put(ctx, kv_pairs)

@spec batch_async_put(FerricStore.Instance.t(), [{binary(), binary()}]) ::
  :ok | {:error, binary()}

Batch async PUT for pipelined SET commands without options.

Takes a list of {key, value} tuples. All keys must target async durability namespaces. Groups by shard, does batch ETS inserts per shard, fires BitcaskWriter casts and Raft submissions individually (they batch internally). Returns :ok or {:error, reason}.

Caller must validate key/value sizes and check pressure flags before calling. This skips all per-key validation for speed.

batch_get(ctx, keys)

@spec batch_get(FerricStore.Instance.t(), [binary()]) :: [binary() | nil]

batch_quorum_put(ctx, kv_pairs)

@spec batch_quorum_put(FerricStore.Instance.t(), [{binary(), binary()}]) :: [
  :ok | {:error, binary()}
]

Batch quorum PUT for pipelined SET commands.

Groups commands by shard, submits each group as a single batch to its Batcher with ONE reply ref per shard, then waits for all shard replies. Returns a list of results in input order.

Uses one ref per shard (not per command) so the connection process's selective receive scans at most shard_count refs instead of N*pipeline refs — critical for high concurrency where TCP messages flood the mailbox.

bitmap_op(ctx, cmd, args)

@spec bitmap_op(FerricStore.Instance.t(), binary(), [term()]) :: term()

Runs a Bitmap RMW command (BITOP / BITFIELD) atomically via Raft. SETBIT has its own setbit/4 path. GETBIT/BITCOUNT/BITPOS are read-only and do not go through this path.

cas(ctx, key, expected, new_value, ttl_ms)

@spec cas(
  FerricStore.Instance.t(),
  binary(),
  binary(),
  binary(),
  non_neg_integer() | nil
) ::
  1 | 0 | nil

compound_count(ctx, redis_key, prefix)

@spec compound_count(FerricStore.Instance.t(), binary(), binary()) ::
  non_neg_integer()

compound_delete(ctx, redis_key, compound_key)

@spec compound_delete(FerricStore.Instance.t(), binary(), binary()) ::
  :ok | {:error, term()}

compound_delete_prefix(ctx, redis_key, prefix)

@spec compound_delete_prefix(FerricStore.Instance.t(), binary(), binary()) :: :ok

compound_get(ctx, redis_key, compound_key)

@spec compound_get(FerricStore.Instance.t(), binary(), binary()) :: binary() | nil

compound_get_meta(ctx, redis_key, compound_key)

@spec compound_get_meta(FerricStore.Instance.t(), binary(), binary()) ::
  {binary(), non_neg_integer()} | nil

compound_put(ctx, redis_key, compound_key, value, expire_at_ms)

@spec compound_put(
  FerricStore.Instance.t(),
  binary(),
  binary(),
  binary(),
  non_neg_integer()
) ::
  :ok | {:error, term()}

compound_scan(ctx, redis_key, prefix)

@spec compound_scan(FerricStore.Instance.t(), binary(), binary()) :: [
  {binary(), binary()}
]

dbsize(ctx)

Returns the count of all live keys across every shard.

delete(ctx, key)

@spec delete(FerricStore.Instance.t(), binary()) :: :ok

Deletes key. Returns :ok whether or not the key existed.

durability_for_key_public(ctx, key)

@spec durability_for_key_public(FerricStore.Instance.t(), binary()) ::
  :quorum | :async

Public wrapper for durability_for_key, used by batch SET fast path.

execute_list_op_inline(ctx, idx, cmd)

@spec execute_list_op_inline(FerricStore.Instance.t(), non_neg_integer(), tuple()) ::
  term()

Executes a list_op inline under a held latch. Called from Router.async_list_op (fast path) and RmwCoordinator.handle_call (contended path). The latch guarantees exclusive access to the list's compound keys.

Uses ListOps.execute/3 with an origin-local compound store that writes ETS + casts BitcaskWriter for each mutation. Then submits the original {:list_op, key, op} command to Raft via Batcher.async_submit so replicas re-execute against their own state in Raft log order (deterministic convergence).

execute_rmw_inline(ctx, idx, cmd)

@spec execute_rmw_inline(FerricStore.Instance.t(), non_neg_integer(), tuple()) ::
  term()

Executes an RMW command inline against local ETS + BitcaskWriter and submits the delta to Raft via Batcher.async_submit.

Called with the per-key latch held. The latch guarantees exclusive access to key's ETS row among RMW paths. Router.async_rmw/4 (latch path) and Ferricstore.Store.RmwCoordinator (worker path) both call this after winning the latch.

Returns the command's natural result shape (e.g. {:ok, new_int} for INCR, old_value_or_nil for GETSET/GETDEL).

exists?(ctx, key)

@spec exists?(FerricStore.Instance.t(), binary()) :: boolean()

Returns true if key exists and is not expired.

Uses direct ETS lookup (no GenServer roundtrip) for hot and cold keys. A key is considered existing if it is in the keydir and not expired, regardless of whether its value is hot (in ETS) or cold (on disk only).

exists_fast?(ctx, key)

@spec exists_fast?(FerricStore.Instance.t(), binary()) :: boolean()

Fast ETS-direct existence check for a key.

Returns true if the key exists in ETS and is not expired, false otherwise. This bypasses the GenServer entirely, saving ~1-3us per call. Used in the hot write path (check_keydir_full/2) where we only need a boolean answer and can tolerate the fact that cold keys (value=nil but still in keydir) are correctly detected as existing.

extend(ctx, key, owner, ttl_ms)

@spec extend(FerricStore.Instance.t(), binary(), binary(), pos_integer()) ::
  1 | {:error, binary()}

extract_hash_tag(key)

@spec extract_hash_tag(binary()) :: binary() | nil

Extracts the hash tag from a key, following Redis hash tag semantics.

If the key contains a substring enclosed in {...} where the content between the first { and the next } is non-empty, that substring is used for hashing instead of the full key. This allows related keys to be routed to the same shard.

Examples

iex> Ferricstore.Store.Router.extract_hash_tag("{user:42}:session")
"user:42"

iex> Ferricstore.Store.Router.extract_hash_tag("no_tag")
nil

iex> Ferricstore.Store.Router.extract_hash_tag("{}empty")
nil

geo_op(ctx, cmd, args)

@spec geo_op(FerricStore.Instance.t(), binary(), [term()]) :: term()

Runs a Geo RMW command (GEOADD, GEOSEARCHSTORE) atomically via Raft. GEOSEARCHSTORE routes by the destination key; GEOADD routes by the key. Read-only ops (GEOPOS, GEODIST, GEOHASH, GEOSEARCH) don't go through here.

get(ctx, key)

@spec get(FerricStore.Instance.t(), binary()) :: binary() | nil

Retrieves the value for key, or nil if the key does not exist or is expired.

Hot path: reads directly from ETS (no GenServer roundtrip for cached keys). Falls back to a GenServer call for cache misses or when the ETS table is temporarily unavailable (e.g. during a shard restart).

Each successful read is recorded as either hot (ETS hit) or cold (Bitcask fallback) in Ferricstore.Stats for the FERRICSTORE.HOTNESS command and the INFO stats hot/cold fields.

get_file_ref(ctx, key)

@spec get_file_ref(FerricStore.Instance.t(), binary()) ::
  {binary(), non_neg_integer(), non_neg_integer()} | nil

Returns the on-disk file reference for a key's value, or nil.

Used by the sendfile optimisation in standalone TCP mode. Returns {file_path, value_byte_offset, value_size} for cold (on-disk) keys. Returns nil for hot keys (ETS), expired keys, or missing keys -- the caller should fall back to the normal read path.

Only cold keys benefit from sendfile: hot keys are already in BEAM memory and would need a normal get + transport.send.

get_keydir_file_ref(ctx, key)

@spec get_keydir_file_ref(FerricStore.Instance.t(), binary()) ::
  {:ok, {non_neg_integer(), non_neg_integer(), non_neg_integer()}} | :miss

Returns the keydir disk location for a key, or :miss.

Reads the {file_id, offset, value_size} fields directly from the keydir ETS table without a GenServer roundtrip. Returns {:ok, {fid, off, vsize}} for live keys, or :miss if the key is not in the keydir or is expired.

Used by sendfile zero-copy and STRLEN on cold keys.

get_meta(ctx, key)

@spec get_meta(FerricStore.Instance.t(), binary()) ::
  {binary(), non_neg_integer()} | nil

Returns {value, expire_at_ms} for a live key, or nil if the key does not exist or is expired.

Hot path: reads directly from ETS for cached keys. Each read is recorded as hot or cold in Ferricstore.Stats.

get_version(ctx, key)

@spec get_version(FerricStore.Instance.t(), binary()) :: non_neg_integer()

Returns the current write version of the shard that owns key.

Used by the WATCH/EXEC transaction mechanism to detect concurrent modifications.

get_with_file_ref(ctx, key)

@spec get_with_file_ref(FerricStore.Instance.t(), binary()) ::
  {:hot, binary()}
  | {:cold_ref, binary(), non_neg_integer(), non_neg_integer()}
  | {:cold_value, binary()}
  | :miss

Unified GET that returns everything from a single ETS lookup.

Returns:

  • {:hot, value} — value is in ETS, ready to return
  • {:cold_ref, path, offset, size} — value is on disk, file ref for sendfile
  • {:cold_value, value} — value was on disk, GenServer fetched it
  • :miss — key doesn't exist

getdel(ctx, key)

@spec getdel(FerricStore.Instance.t(), binary()) :: binary() | nil

Atomically gets and deletes key.

Returns the value, or nil if the key did not exist.

getex(ctx, key, expire_at_ms)

@spec getex(FerricStore.Instance.t(), binary(), non_neg_integer()) :: binary() | nil

Atomically gets the value and updates the expiry of key.

expire_at_ms is an absolute Unix-epoch timestamp in milliseconds; pass 0 to persist (remove expiry). Returns the value, or nil if the key did not exist.

getset(ctx, key, value)

@spec getset(FerricStore.Instance.t(), binary(), binary()) :: binary() | nil

Atomically gets the old value and sets a new value for key.

Returns the old value, or nil if the key did not exist.

hincrby(ctx, key, field, delta)

@spec hincrby(FerricStore.Instance.t(), binary(), binary(), integer()) ::
  integer() | {:error, binary()}

Atomically increments the integer value of hash field field in key by delta. Returns {:ok, new_int} or {:error, reason}. Shares ordering with the parent hash's shard (routes by the hash's redis_key).

hincrbyfloat(ctx, key, field, delta)

@spec hincrbyfloat(FerricStore.Instance.t(), binary(), binary(), float()) ::
  binary() | {:error, binary()}

Atomically increments the float value of hash field field in key by delta. Returns the new value as a string, or {:error, reason}.

hll_op(ctx, cmd, args)

@spec hll_op(FerricStore.Instance.t(), binary(), [term()]) :: term()

Runs a HyperLogLog RMW command (PFADD / PFMERGE) atomically via Raft. PFCOUNT is read-only and should not go through this path.

incr(ctx, key, delta)

@spec incr(FerricStore.Instance.t(), binary(), integer()) ::
  {:ok, integer()} | {:error, binary()}

Atomically increments the integer value of key by delta.

If the key does not exist, it is set to delta. Returns {:ok, new_integer} on success or {:error, reason} if the value is not a valid integer.

incr_float(ctx, key, delta)

@spec incr_float(FerricStore.Instance.t(), binary(), float()) ::
  {:ok, binary()} | {:error, binary()}

Atomically increments the float value of key by delta.

If the key does not exist, it is set to delta. Returns {:ok, new_float_string} on success or {:error, reason} if the value is not a valid float.

json_op(ctx, cmd, args)

@spec json_op(FerricStore.Instance.t(), binary(), [term()]) :: term()

Runs a JSON RMW command atomically via Raft. cmd is the Redis command name (e.g. "JSON.SET"), args is the argument list starting with the key. The state machine dispatches to Ferricstore.Commands.Json.handle/3 with a state-machine-scoped store, so concurrent callers serialize through the Raft log — no lost updates.

keydir_name(ctx, index)

@spec keydir_name(FerricStore.Instance.t(), non_neg_integer()) :: atom() | reference()

Returns the keydir ETS table ref for the shard at index.

Uses the pre-computed tuple from the instance context for O(1) lookup.

keys(ctx)

@spec keys(FerricStore.Instance.t()) :: [binary()]

Returns all live (non-expired, non-deleted) keys across every shard.

list_op(ctx, key, operation)

@spec list_op(FerricStore.Instance.t(), binary(), term()) :: term()

lock(ctx, key, owner, ttl_ms)

@spec lock(FerricStore.Instance.t(), binary(), binary(), pos_integer()) ::
  :ok | {:error, binary()}

max_value_size()

@spec max_value_size() :: pos_integer()

Returns the maximum allowed value size in bytes.

prob_write(ctx, command)

@spec prob_write(FerricStore.Instance.t(), tuple()) :: term()

Routes a probabilistic data structure write command through Raft.

put(ctx, key, value, expire_at_ms \\ 0)

@spec put(FerricStore.Instance.t(), binary(), binary(), non_neg_integer()) ::
  :ok | {:error, binary()}

Stores key with value. expire_at_ms is an absolute Unix-epoch timestamp in milliseconds; pass 0 for no expiry.

ratelimit_add(ctx, key, window_ms, max, count)

@spec ratelimit_add(
  FerricStore.Instance.t(),
  binary(),
  pos_integer(),
  pos_integer(),
  pos_integer()
) ::
  [term()]

server_command(ctx, command)

@spec server_command(FerricStore.Instance.t(), term()) :: term()

Submits a server command through Raft for replication to all nodes.

Server commands are opaque to the library — the state machine dispatches them via the raft_apply_hook callback on the Instance struct. Routed through shard 0 for consistent ordering.

setbit(ctx, key, offset, bit_val)

@spec setbit(FerricStore.Instance.t(), binary(), non_neg_integer(), 0 | 1) :: 0 | 1

Atomically sets the bit at offset to bit_val (0 or 1). Returns the previous bit value (0 or 1). Extends the bitmap with zero bytes if necessary. Goes through Raft so concurrent SETBITs on the same key never lose updates — the state machine is the sole mutator.

setrange(ctx, key, offset, value)

@spec setrange(FerricStore.Instance.t(), binary(), non_neg_integer(), binary()) ::
  {:ok, non_neg_integer()}

Atomically overwrites part of the string at key starting at offset.

Zero-pads if the key doesn't exist or the string is shorter than offset. Returns {:ok, new_byte_length}.

shard_for(ctx, key)

@spec shard_for(FerricStore.Instance.t(), binary()) :: non_neg_integer()

Returns the shard index (0-based) that owns key.

Routes through the 1,024-slot indirection layer: key -> phash2(key) & 0x3FF -> slot -> slot_map[slot] -> shard_index

Supports Redis hash tags: if the key contains {tag} (non-empty content between the first { and the next }), the tag is used for hashing instead of the full key.

shard_name(ctx, index)

@spec shard_name(FerricStore.Instance.t(), non_neg_integer()) :: atom()

Returns the registered process name for the shard at index.

Uses the pre-computed tuple from the instance context for O(1) lookup.

slot_for(ctx, key)

@spec slot_for(FerricStore.Instance.t(), binary()) :: non_neg_integer()

Returns the slot (0-1023) for a key, respecting hash tags.

tdigest_op(ctx, cmd, args)

@spec tdigest_op(FerricStore.Instance.t(), binary(), [term()]) :: term()

Runs a TDigest RMW command (TDIGEST.ADD / RESET / MERGE / CREATE) atomically via Raft. Read-only ops (QUANTILE, CDF, INFO, RANK, etc.) stay in the caller process.

unlock(ctx, key, owner)

@spec unlock(FerricStore.Instance.t(), binary(), binary()) :: 1 | {:error, binary()}

zincrby(ctx, key, increment, member)

@spec zincrby(FerricStore.Instance.t(), binary(), number(), binary()) ::
  binary() | {:error, binary()}

Atomically increments the score of member in the sorted set at key by increment. Returns the new score as a string.