Ferricstore.Store.Router (ferricstore v0.4.0)

Copy Markdown View Source

Routes keys to shard GenServers using the shared Ferricstore.Store.SlotMap hashing implementation.

This is a pure module with no process state. It provides two categories of functions:

  1. Routing helpers -- shard_for/2 and shard_name/2 map a key to its owning shard index and registered process name respectively. Supports Redis hash tags: keys containing {tag} are hashed on the tag content, allowing related keys to co-locate on the same shard.

  2. Convenience accessors -- get/2, put/4, delete/2, exists?/2, keys/1, and dbsize/1 dispatch to the correct shard GenServer transparently.

All public functions take a ctx (FerricStore.Instance.t()) as the first argument, replacing all persistent_term lookups with instance-local state.

Summary

Functions

Atomically appends suffix to the value of key.

Batch GET variant for TCP large-value streaming.

Batch PUT API with :ok | {:error, _} result shape.

Batch quorum PUT for pipelined SET commands.

Returns the count of all live keys across every shard.

Deletes key. Returns :ok whether or not the key existed.

Returns true if key exists and is not expired.

Fast ETS-direct existence check for a key.

Returns the expiry timestamp for a live plain key without reading its value.

Extracts the hash tag from a key, following Redis hash tag semantics.

Retrieves the value for key, or nil if the key does not exist or is expired.

Returns the on-disk file reference for a key's value, or nil.

Returns the keydir disk location for a key, or :miss.

Returns {value, expire_at_ms} for a live key, or nil if the key does not exist or is expired.

Returns the current write version of the shard that owns key.

Unified GET that returns everything from a single ETS lookup.

Atomically gets and deletes key.

Atomically gets the value and updates the expiry of key.

Returns a byte range for a live plain key without reading the full cold value.

Atomically gets the old value and sets a new value for key.

Atomically increments the integer value of hash field field in key by delta. Returns {:ok, new_int} or {:error, reason}. Shares ordering with the parent hash's shard (routes by the hash's redis_key).

Atomically increments the float value of hash field field in key by delta. Returns the new value as a string, or {:error, reason}.

Atomically increments the integer value of key by delta.

Atomically increments the float value of key by delta.

Returns the keydir ETS table ref for the shard at index.

Returns all live (non-expired, non-deleted) keys across every shard.

Returns the maximum allowed key size in bytes.

Returns the maximum allowed value size in bytes.

Routes a probabilistic data structure write command through Raft.

Stores key with value. expire_at_ms is an absolute Unix-epoch timestamp in milliseconds; pass 0 for no expiry.

Submits a server command through Raft for replication to all nodes.

Atomically applies Redis SET options in Raft order.

Atomically sets the bit at offset to bit_val (0 or 1). Returns the previous bit value (0 or 1). Extends the bitmap with zero bytes if necessary. Goes through Raft so concurrent SETBITs on the same key never lose updates — the state machine is the sole mutator.

Atomically overwrites part of the string at key starting at offset.

Returns the shard index (0-based) that owns key.

Returns the registered process name for the shard at index.

Returns the slot (0-1023) for a key, respecting hash tags.

Returns the live plain key value size without reading a cold value.

Returns a lightweight WATCH token for key.

Atomically increments the score of member in the sorted set at key by increment. Returns the new score as a string.

Functions

append(ctx, key, suffix)

@spec append(FerricStore.Instance.t(), binary(), binary()) :: {:ok, non_neg_integer()}

Atomically appends suffix to the value of key.

If the key does not exist, it is created with value suffix. Returns {:ok, new_byte_length}.

batch_get(ctx, keys)

@spec batch_get(FerricStore.Instance.t(), [binary()]) :: [binary() | nil]

batch_get_with_file_refs(ctx, keys, min_file_ref_size)

@spec batch_get_with_file_refs(
  FerricStore.Instance.t(),
  [binary()],
  non_neg_integer()
) :: [
  binary() | nil | {:file_ref, binary(), non_neg_integer(), non_neg_integer()}
]

Batch GET variant for TCP large-value streaming.

It performs the same single ETS pass as batch_get/2, but cold entries whose value size is at least min_file_ref_size are returned as validated {:file_ref, path, value_offset, size} tuples instead of being materialized into BEAM binaries. Stale or invalid refs fall back to the normal batched cold pread path.

batch_put(ctx, kv_pairs)

@spec batch_put(FerricStore.Instance.t(), [{binary(), binary()}]) ::
  :ok | {:error, binary()}

Batch PUT API with :ok | {:error, _} result shape.

The default application instance submits through quorum. Embedded/custom instances write locally because the Raft batchers are owned by the default application instance.

batch_quorum_put(ctx, kv_pairs)

@spec batch_quorum_put(FerricStore.Instance.t(), [{binary(), binary()}]) :: [
  :ok | {:error, binary() | {:timeout, :unknown_outcome}}
]

Batch quorum PUT for pipelined SET commands.

Groups commands by shard, submits each group as a single batch to its Batcher with ONE reply ref per shard, then waits for all shard replies. Returns a list of results in input order.

Uses one ref per shard (not per command) so the connection process's selective receive scans at most shard_count refs instead of N*pipeline refs — critical for high concurrency where TCP messages flood the mailbox.

cas(ctx, key, expected, new_value, ttl_ms)

@spec cas(
  FerricStore.Instance.t(),
  binary(),
  binary(),
  binary(),
  non_neg_integer() | nil
) ::
  1 | 0 | nil

compound_batch_delete(ctx, redis_key, compound_keys)

@spec compound_batch_delete(FerricStore.Instance.t(), binary(), [binary()]) ::
  :ok | {:error, term()}

compound_batch_get(ctx, redis_key, compound_keys)

@spec compound_batch_get(FerricStore.Instance.t(), binary(), [binary()]) :: [
  binary() | nil
]

compound_batch_get_meta(ctx, redis_key, compound_keys)

@spec compound_batch_get_meta(FerricStore.Instance.t(), binary(), [binary()]) :: [
  {binary(), non_neg_integer()} | nil
]

compound_batch_put(ctx, redis_key, entries)

@spec compound_batch_put(
  FerricStore.Instance.t(),
  binary(),
  [{binary(), binary(), non_neg_integer()}]
) :: :ok | {:error, term()}

compound_count(ctx, redis_key, prefix)

@spec compound_count(FerricStore.Instance.t(), binary(), binary()) ::
  non_neg_integer()

compound_delete(ctx, redis_key, compound_key)

@spec compound_delete(FerricStore.Instance.t(), binary(), binary()) ::
  :ok | {:error, term()}

compound_delete_prefix(ctx, redis_key, prefix)

@spec compound_delete_prefix(FerricStore.Instance.t(), binary(), binary()) :: :ok

compound_fields(ctx, redis_key, prefix)

@spec compound_fields(FerricStore.Instance.t(), binary(), binary()) :: [binary()]

compound_get(ctx, redis_key, compound_key)

@spec compound_get(FerricStore.Instance.t(), binary(), binary()) :: binary() | nil

compound_get_meta(ctx, redis_key, compound_key)

@spec compound_get_meta(FerricStore.Instance.t(), binary(), binary()) ::
  {binary(), non_neg_integer()} | nil

compound_put(ctx, redis_key, compound_key, value, expire_at_ms)

@spec compound_put(
  FerricStore.Instance.t(),
  binary(),
  binary(),
  binary(),
  non_neg_integer()
) ::
  :ok | {:error, term()}

compound_scan(ctx, redis_key, prefix)

@spec compound_scan(FerricStore.Instance.t(), binary(), binary()) :: [
  {binary(), binary()}
]

dbsize(ctx)

Returns the count of all live keys across every shard.

delete(ctx, key)

@spec delete(FerricStore.Instance.t(), binary()) :: :ok

Deletes key. Returns :ok whether or not the key existed.

exists?(ctx, key)

@spec exists?(FerricStore.Instance.t(), binary()) :: boolean()

Returns true if key exists and is not expired.

Uses direct ETS lookup (no GenServer roundtrip) for hot and cold keys. A key is considered existing if it is in the keydir and not expired, regardless of whether its value is hot (in ETS) or cold (on disk only).

exists_fast?(ctx, key)

@spec exists_fast?(FerricStore.Instance.t(), binary()) :: boolean()

Fast ETS-direct existence check for a key.

Returns true if the key exists in ETS and is not expired, false otherwise. This bypasses the GenServer entirely, saving ~1-3us per call. Used in the hot write path (check_keydir_full/2) where we only need a boolean answer and can tolerate the fact that cold keys (value=nil but still in keydir) are correctly detected as existing.

expire_at_ms(ctx, key)

@spec expire_at_ms(FerricStore.Instance.t(), binary()) :: non_neg_integer() | nil

Returns the expiry timestamp for a live plain key without reading its value.

This is used by expiry-time commands so cold large values do not pay a Bitcask pread just to report TTL metadata.

extend(ctx, key, owner, ttl_ms)

@spec extend(FerricStore.Instance.t(), binary(), binary(), pos_integer()) ::
  1 | {:error, binary()}

extract_hash_tag(key)

@spec extract_hash_tag(binary()) :: binary() | nil

Extracts the hash tag from a key, following Redis hash tag semantics.

If the key contains a substring enclosed in {...} where the content between the first { and the next } is non-empty, that substring is used for hashing instead of the full key. This allows related keys to be routed to the same shard.

Examples

iex> Ferricstore.Store.Router.extract_hash_tag("{user:42}:session")
"user:42"

iex> Ferricstore.Store.Router.extract_hash_tag("no_tag")
nil

iex> Ferricstore.Store.Router.extract_hash_tag("{}empty")
nil

flow_index_count_all(ctx, key)

@spec flow_index_count_all(FerricStore.Instance.t(), binary()) ::
  {:ok, non_neg_integer()} | :unavailable

flow_index_count_all_many(ctx, keys)

@spec flow_index_count_all_many(FerricStore.Instance.t(), [binary()]) ::
  {:ok, [non_neg_integer()]} | :unavailable

flow_index_rank_range(ctx, key, start_idx, stop_idx, reverse?)

@spec flow_index_rank_range(
  FerricStore.Instance.t(),
  binary(),
  non_neg_integer(),
  non_neg_integer(),
  boolean()
) :: {:ok, [{binary(), float()}]} | :unavailable

flow_index_rank_range_many(ctx, requests)

@spec flow_index_rank_range_many(
  FerricStore.Instance.t(),
  [{binary(), non_neg_integer(), non_neg_integer(), boolean()}]
) :: {:ok, [[{binary(), float()}]]} | :unavailable

flow_index_score_range_slice(ctx, key, min_bound, max_bound, reverse?, offset, count)

@spec flow_index_score_range_slice(
  FerricStore.Instance.t(),
  binary(),
  term(),
  term(),
  boolean(),
  non_neg_integer(),
  non_neg_integer() | :all
) :: {:ok, [{binary(), float()}]} | :unavailable

get(ctx, key)

@spec get(FerricStore.Instance.t(), binary()) :: binary() | nil

Retrieves the value for key, or nil if the key does not exist or is expired.

Hot path: reads directly from ETS (no GenServer roundtrip for cached keys). Falls back to a GenServer call for cache misses or when the ETS table is temporarily unavailable (e.g. during a shard restart).

Each successful read is recorded as either hot (ETS hit) or cold (Bitcask fallback) in Ferricstore.Stats for the FERRICSTORE.HOTNESS command and the INFO stats hot/cold fields.

get_file_ref(ctx, key)

@spec get_file_ref(FerricStore.Instance.t(), binary()) ::
  {binary(), non_neg_integer(), non_neg_integer()} | nil

Returns the on-disk file reference for a key's value, or nil.

Used by the sendfile optimisation in standalone TCP mode. Returns {file_path, value_byte_offset, value_size} for cold (on-disk) keys. Returns nil for hot keys (ETS), expired keys, or missing keys -- the caller should fall back to the normal read path.

Only cold keys benefit from sendfile: hot keys are already in BEAM memory and would need a normal get + transport.send.

get_keydir_file_ref(ctx, key)

@spec get_keydir_file_ref(FerricStore.Instance.t(), binary()) ::
  {:ok, {term(), non_neg_integer(), non_neg_integer()}} | :miss

Returns the keydir disk location for a key, or :miss.

Reads the {file_id, offset, value_size} fields directly from the keydir ETS table without a GenServer roundtrip. Returns {:ok, {fid, off, vsize}} for live keys, or :miss if the key is not in the keydir or is expired.

Used by sendfile zero-copy and STRLEN on cold keys.

get_meta(ctx, key)

@spec get_meta(FerricStore.Instance.t(), binary()) ::
  {binary(), non_neg_integer()} | nil

Returns {value, expire_at_ms} for a live key, or nil if the key does not exist or is expired.

Hot path: reads directly from ETS for cached keys. Each read is recorded as hot or cold in Ferricstore.Stats.

get_version(ctx, key)

@spec get_version(FerricStore.Instance.t(), binary()) :: non_neg_integer()

Returns the current write version of the shard that owns key.

Used by the WATCH/EXEC transaction mechanism to detect concurrent modifications.

get_with_file_ref(ctx, key)

@spec get_with_file_ref(FerricStore.Instance.t(), binary()) ::
  {:hot, binary()}
  | {:cold_ref, binary(), non_neg_integer(), non_neg_integer()}
  | {:cold_value, binary()}
  | {:error, binary()}
  | :miss

Unified GET that returns everything from a single ETS lookup.

Returns:

  • {:hot, value} — value is in ETS, ready to return
  • {:cold_ref, path, offset, size} — value is on disk, file ref for sendfile
  • {:cold_value, value} — value was on disk, GenServer fetched it
  • :miss — key doesn't exist

getdel(ctx, key)

@spec getdel(FerricStore.Instance.t(), binary()) :: binary() | nil

Atomically gets and deletes key.

Returns the value, or nil if the key did not exist.

getex(ctx, key, expire_at_ms)

@spec getex(FerricStore.Instance.t(), binary(), non_neg_integer()) :: binary() | nil

Atomically gets the value and updates the expiry of key.

expire_at_ms is an absolute Unix-epoch timestamp in milliseconds; pass 0 to persist (remove expiry). Returns the value, or nil if the key did not exist.

getrange(ctx, key, start_idx, end_idx)

@spec getrange(FerricStore.Instance.t(), binary(), integer(), integer()) ::
  binary() | nil

Returns a byte range for a live plain key without reading the full cold value.

Hot entries slice the in-memory value. Cold entries validate the Bitcask location once, then read only the requested value bytes from the data file. Missing or expired keys return nil, matching get/2.

getset(ctx, key, value)

@spec getset(FerricStore.Instance.t(), binary(), binary()) :: binary() | nil

Atomically gets the old value and sets a new value for key.

Returns the old value, or nil if the key did not exist.

hincrby(ctx, key, field, delta)

@spec hincrby(FerricStore.Instance.t(), binary(), binary(), integer()) ::
  integer() | {:error, binary()}

Atomically increments the integer value of hash field field in key by delta. Returns {:ok, new_int} or {:error, reason}. Shares ordering with the parent hash's shard (routes by the hash's redis_key).

hincrbyfloat(ctx, key, field, delta)

@spec hincrbyfloat(FerricStore.Instance.t(), binary(), binary(), float()) ::
  binary() | {:error, binary()}

Atomically increments the float value of hash field field in key by delta. Returns the new value as a string, or {:error, reason}.

incr(ctx, key, delta)

@spec incr(FerricStore.Instance.t(), binary(), integer()) ::
  {:ok, integer()} | {:error, binary()}

Atomically increments the integer value of key by delta.

If the key does not exist, it is set to delta. Returns {:ok, new_integer} on success or {:error, reason} if the value is not a valid integer.

incr_float(ctx, key, delta)

@spec incr_float(FerricStore.Instance.t(), binary(), float()) ::
  {:ok, binary()} | {:error, binary()}

Atomically increments the float value of key by delta.

If the key does not exist, it is set to delta. Returns {:ok, new_float_string} on success or {:error, reason} if the value is not a valid float.

keydir_name(ctx, index)

@spec keydir_name(FerricStore.Instance.t(), non_neg_integer()) :: atom() | reference()

Returns the keydir ETS table ref for the shard at index.

Uses the pre-computed tuple from the instance context for O(1) lookup.

keys(ctx)

@spec keys(FerricStore.Instance.t()) :: [binary()]

Returns all live (non-expired, non-deleted) keys across every shard.

list_op(ctx, key, operation)

@spec list_op(FerricStore.Instance.t(), binary(), term()) :: term()

lock(ctx, key, owner, ttl_ms)

@spec lock(FerricStore.Instance.t(), binary(), binary(), pos_integer()) ::
  :ok | {:error, binary()}

max_key_size()

@spec max_key_size() :: pos_integer()

Returns the maximum allowed key size in bytes.

max_value_size()

@spec max_value_size() :: pos_integer()

Returns the maximum allowed value size in bytes.

prob_write(ctx, command)

@spec prob_write(FerricStore.Instance.t(), tuple()) :: term()

Routes a probabilistic data structure write command through Raft.

put(ctx, key, value, expire_at_ms \\ 0)

@spec put(FerricStore.Instance.t(), binary(), binary(), non_neg_integer()) ::
  :ok | {:error, binary()}

Stores key with value. expire_at_ms is an absolute Unix-epoch timestamp in milliseconds; pass 0 for no expiry.

ratelimit_add(ctx, key, window_ms, max, count)

@spec ratelimit_add(
  FerricStore.Instance.t(),
  binary(),
  pos_integer(),
  pos_integer(),
  pos_integer()
) :: [term()]

server_command(ctx, command)

@spec server_command(FerricStore.Instance.t(), term()) :: term()

Submits a server command through Raft for replication to all nodes.

Server commands are opaque to the library — the state machine dispatches them via the raft_apply_hook callback on the Instance struct. Routed through shard 0 for consistent ordering.

set(ctx, key, value, opts)

@spec set(FerricStore.Instance.t(), binary(), binary(), map()) :: term()

Atomically applies Redis SET options in Raft order.

Unlike put/4, this keeps NX/XX/GET/KEEPTTL checks inside the state machine so concurrent conditional SETs serialize correctly.

setbit(ctx, key, offset, bit_val)

@spec setbit(FerricStore.Instance.t(), binary(), non_neg_integer(), 0 | 1) :: 0 | 1

Atomically sets the bit at offset to bit_val (0 or 1). Returns the previous bit value (0 or 1). Extends the bitmap with zero bytes if necessary. Goes through Raft so concurrent SETBITs on the same key never lose updates — the state machine is the sole mutator.

setrange(ctx, key, offset, value)

@spec setrange(FerricStore.Instance.t(), binary(), non_neg_integer(), binary()) ::
  {:ok, non_neg_integer()}

Atomically overwrites part of the string at key starting at offset.

Zero-pads if the key doesn't exist or the string is shorter than offset. Returns {:ok, new_byte_length}.

shard_for(ctx, key)

@spec shard_for(FerricStore.Instance.t(), binary()) :: non_neg_integer()

Returns the shard index (0-based) that owns key.

Routes through the 1,024-slot indirection layer: key -> SlotMap.slot_for_key/1 -> slot_map[slot] -> shard_index

Supports Redis hash tags: if the key contains {tag} (non-empty content between the first { and the next }), the tag is used for hashing instead of the full key.

shard_name(ctx, index)

@spec shard_name(FerricStore.Instance.t(), non_neg_integer()) :: atom()

Returns the registered process name for the shard at index.

Uses the pre-computed tuple from the instance context for O(1) lookup.

slot_for(ctx, key)

@spec slot_for(FerricStore.Instance.t(), binary()) :: non_neg_integer()

Returns the slot (0-1023) for a key, respecting hash tags.

unlock(ctx, key, owner)

@spec unlock(FerricStore.Instance.t(), binary(), binary()) :: 1 | {:error, binary()}

value_size(ctx, key)

@spec value_size(FerricStore.Instance.t(), binary()) :: non_neg_integer() | nil

Returns the live plain key value size without reading a cold value.

Hot entries use the in-memory value size; cold entries use the keydir value_size field populated by Bitcask append/recovery.

watch_token(ctx, key)

@spec watch_token(FerricStore.Instance.t(), binary()) :: term()

Returns a lightweight WATCH token for key.

Hot keys use the value hash plus their live Bitcask location. Cold keys use their live keydir location and expiry, avoiding a large Bitcask read just to snapshot WATCH state. Pending entries fall back to the shard write version.

zincrby(ctx, key, increment, member)

@spec zincrby(FerricStore.Instance.t(), binary(), number(), binary()) ::
  binary() | {:error, binary()}

Atomically increments the score of member in the sorted set at key by increment. Returns the new score as a string.

zset_member_rank(ctx, redis_key, member, reverse?)

@spec zset_member_rank(FerricStore.Instance.t(), binary(), binary(), boolean()) ::
  {:ok, non_neg_integer() | nil} | :unavailable

zset_rank_range(ctx, redis_key, start_idx, stop_idx, reverse?)

@spec zset_rank_range(
  FerricStore.Instance.t(),
  binary(),
  non_neg_integer(),
  non_neg_integer(),
  boolean()
) :: {:ok, [{binary(), float()}]} | :unavailable

zset_score_count(ctx, redis_key, min_bound, max_bound)

@spec zset_score_count(FerricStore.Instance.t(), binary(), term(), term()) ::
  {:ok, non_neg_integer()} | :unavailable

zset_score_count_all_many_no_build(ctx, keys)

@spec zset_score_count_all_many_no_build(FerricStore.Instance.t(), [binary()]) ::
  {:ok, [non_neg_integer()]} | :unavailable

zset_score_count_many(ctx, queries)

@spec zset_score_count_many(FerricStore.Instance.t(), [{binary(), term(), term()}]) ::
  {:ok, [non_neg_integer()]} | :unavailable

zset_score_range(ctx, redis_key, min_bound, max_bound, reverse?)

@spec zset_score_range(FerricStore.Instance.t(), binary(), term(), term(), boolean()) ::
  {:ok, [{binary(), float()}]} | :unavailable

zset_score_range_slice(ctx, redis_key, min_bound, max_bound, reverse?, offset, count)

@spec zset_score_range_slice(
  FerricStore.Instance.t(),
  binary(),
  term(),
  term(),
  boolean(),
  non_neg_integer(),
  non_neg_integer() | :all
) :: {:ok, [{binary(), float()}]} | :unavailable