Ferricstore.Raft.StateMachine (ferricstore v0.4.0)

Copy Markdown View Source

Replicated state machine for a single FerricStore shard.

Each shard is an independent Raft group. The state machine receives write commands via apply/3, which deterministically applies them to both the Bitcask persistent store (via synchronous NIF) and the ETS hot cache.

Callbacks

  • init/1 -- receives the shard config, returns initial machine state.
  • apply/3 -- deterministic command application (called on every node). Supports :put, :delete, and :batch commands.
  • state_enter/2 -- lifecycle hook for leader/follower transitions.
  • tick/2 -- periodic callback (unused currently, placeholder for metrics).
  • init_aux/1 -- initializes non-replicated auxiliary state.
  • handle_aux/5 -- handles non-replicated auxiliary commands (new API).
  • overview/1 -- returns a summary map for debugging/monitoring.

Design notes

Per the spec (section 2C.4):

  • apply/3 is deterministic and runs on every node in the Raft group.
  • Cold disk reads inside apply/3 wait synchronously for deterministic results, but submit the actual file I/O through async NIFs so Normal schedulers do not run blocking pread work.
  • Effects (send_msg, release_cursor) are returned as the third element of the apply return tuple.
  • In single-node mode, the shard's Raft group has one member (self quorum), so every write commits immediately after local log append + fsync.

HLC piggybacking (spec 2G.6)

HLC timestamps are piggybacked on replicated commands. Submit paths stamp each log entry with the leader's current HLC timestamp before appending it. When apply/3 processes a command carrying an hlc_ts metadata map, it calls HLC.update/1 to merge the leader's clock into the local node's HLC and uses the stamped physical millisecond for TTL and lock expiry decisions.

In single-node mode this merge is a no-op (the node merges its own timestamp). In multi-node clusters, followers use this to stay causally synchronized with the leader's clock while applying the same command timestamp as every other replica.

Commands may arrive in two forms:

  • Wrapped: {inner_command, %{hlc_ts: {physical_ms, logical}}} -- the metadata map carries the leader's HLC timestamp for merging.
  • Unwrapped: inner_command (legacy / test) -- processed as before without HLC merging.

Log compaction (spec 2E.5)

The replicated log grows unbounded unless compacted. Every :release_cursor_interval applied commands (default: 200_000), apply/3 emits {:checkpoint, ra_index, state} and {:release_cursor, ra_index} effects. This tells the log that all entries up to ra_index are fully reflected in the given state checkpoint and can be safely truncated after the checkpoint is materialized.

The interval is stored in the machine state at init time (from the config map or application env) so that apply/3 remains deterministic -- it never reads runtime configuration.

Summary

Functions

Applies a replicated command to the shard state.

Handles non-replicated auxiliary commands (5-arity new API).

Initializes the state machine for a shard.

Initializes non-replicated auxiliary state.

Returns a summary map for debugging and monitoring.

Lifecycle hook called when the Raft node transitions roles.

Periodic tick callback. Returns a list of effects (currently empty).

Types

shard_state()

@type shard_state() :: %{
  shard_index: non_neg_integer(),
  data_dir: binary(),
  data_dir_expanded: binary(),
  shard_data_path: binary(),
  shard_data_path_expanded: binary(),
  active_file_id: non_neg_integer(),
  active_file_path: binary(),
  ets: atom(),
  applied_count: non_neg_integer(),
  release_cursor_interval: pos_integer()
}

Functions

apply(meta, cmd, state)

Applies a replicated command to the shard state.

Supported commands:

  • {:put, key, value, expire_at_ms} -- Write a key-value pair with optional expiry. Writes to Bitcask (sync NIF) and updates ETS.
  • {:put_batch, entries} -- Hot-path write-only SET batch where entries are {key, value, expire_at_ms} tuples. Stages Bitcask records, then publishes ETS after append succeeds. Returns {:ok, results}.
  • {:delete, key} -- Delete a key. Writes a tombstone to Bitcask, removes from ETS.
  • {:delete_batch, keys} -- Hot-path DEL batch. Returns {:ok, results}.
  • {:delete_prefix, prefix} -- Delete all keys matching a raw key prefix.
  • {:batch, commands} -- Apply a mixed list of commands atomically. Use this shape when later commands in the same Ra entry need pending read-your-own-write state. Returns {:ok, results} where results is a list of individual command results.
  • {:list_op, key, operation} -- Execute a list operation (LPUSH, RPUSH, LPOP, RPOP, etc.) as an atomic read-modify-write. Reads the current value from ETS/Bitcask, delegates to ListOps.execute/4, and persists the result.
  • {:compound_put, compound_key, value, expire_at_ms} -- Write a hash/set/zset field. Inserts {compound_key, value, expire_at_ms} into ETS and Bitcask.
  • {:compound_delete, compound_key} -- Delete a hash/set/zset field. Removes the compound key from ETS and Bitcask.
  • {:compound_delete_prefix, prefix} -- Delete all compound keys matching the given prefix from ETS and Bitcask. Used by DEL on data structures (hashes, sets, sorted sets) to clean up all fields.
  • {:incr_float, key, delta} -- Atomic read-modify-write float increment. Reads the current value, parses as float, adds delta, formats the result, and writes back. Returns {:ok, new_float_string} or {:error, "ERR value is not a valid float"}.
  • {:append, key, suffix} -- Atomic read-modify-write append. Reads the current value (or ""), concatenates suffix, writes back. Returns {:ok, byte_size(new_value)}.
  • {:getset, key, new_value} -- Atomic get-and-set. Reads the old value, writes the new value with no expiry, returns the old value (or nil).
  • {:getdel, key} -- Atomic get-and-delete. Reads the value, deletes the key, returns the value (or nil).
  • {:getex, key, expire_at_ms} -- Atomic get-and-update-expiry. Reads the value, re-writes with the new expire_at_ms, returns the value (or nil).
  • {:setrange, key, offset, value} -- Atomic set-range. Reads the current value, pads with zero bytes if needed, replaces bytes at offset, writes back. Returns {:ok, byte_size(new_value)}.
  • {:cas, key, expected, new_value, ttl_ms} -- Compare-and-swap. Reads the current value; if it matches expected, writes new_value with optional TTL. Returns 1 (swapped), 0 (mismatch), or nil (key missing/expired).
  • {:lock, key, owner, ttl_ms} -- Distributed lock acquire. If the key does not exist, is expired, or is already held by the same owner, sets {owner, ttl}. Returns :ok or {:error, reason}.
  • {:unlock, key, owner} -- Distributed lock release. If the key exists and the owner matches, deletes the key. Returns 1 on success, {:error, reason} on owner mismatch.
  • {:extend, key, owner, ttl_ms} -- Distributed lock TTL extension. If the key exists and the owner matches, updates the TTL. Returns 1 on success, {:error, reason} on owner mismatch or missing key.
  • {:ratelimit_add, key, window_ms, max, count} -- Sliding window rate limiter. Reads counters, rotates windows, computes effective count, and updates. Returns [status, count, remaining, ttl_ms].

Returns {new_state, result} or {new_state, result, effects}.

handle_aux(raft_state, arg2, arg3, aux, int_state)

Handles non-replicated auxiliary commands (5-arity new API).

The int_state parameter is ra's internal state and must be passed back unchanged in the return tuple.

Currently supports:

  • {:cast, {:key_written, key}} -- Increments a local hot-key counter.

init(config)

@spec init(map()) :: shard_state()

Initializes the state machine for a shard.

The config map must include (v2 -- path-based, no NIF store reference):

  • :shard_index -- zero-based shard index
  • :shard_data_path -- absolute path to the shard's Bitcask data directory
  • :active_file_id -- numeric ID of the active log file
  • :active_file_path -- absolute path to the active log file
  • :ets -- ETS table name (already created)

Optional:

  • :release_cursor_interval -- number of applies between release_cursor effects (default: 200000). Can also be set via Application.get_env(:ferricstore, :release_cursor_interval).

Returns the initial machine state.

init_aux(name)

Initializes non-replicated auxiliary state.

Aux state is local to each node and not replicated via Raft. Used for tracking hot-key statistics and other node-local metadata.

overview(state)

Returns a summary map for debugging and monitoring.

Includes the shard index, ETS keydir size, total applied command count, and the release_cursor interval.

state_enter(arg1, state)

Lifecycle hook called when the Raft node transitions roles.

When becoming leader, generates a fresh HLC timestamp via HLC.now/0 to ensure the leader's clock is up to date before it starts stamping commands. This is a side-effect only -- it does not affect the deterministic state machine output.

In single-node mode, the node is always the leader. In multi-node clusters, this can be used to start/stop leader-only processes (e.g., merge scheduler, active expiry sweeper).

Returns a list of effects (currently empty).

tick(time_ms, state)

Periodic tick callback. Returns a list of effects (currently empty).