Ferricstore.Store.BitcaskWriter (ferricstore v0.4.0)

Copy Markdown View Source

Background Bitcask writer for deferred persistence of small values.

Each shard has one BitcaskWriter process. When the StateMachine applies a write for a small value (< hot_cache_max_value_size), it inserts the value into ETS immediately (for instant read availability) and sends a cast to this process with the key, value, and the active file path at the time of the write. The BitcaskWriter accumulates writes and flushes them to Bitcask in batches, then updates each ETS entry's file_id and offset via :ets.update_element/3.

This decouples the ~50us synchronous NIF write from replicated apply, allowing the apply process to move to the next command immediately. The ETS entry is tagged with file_id = :pending until the background write completes.

Batching strategy

Writes are flushed when either:

  • The pending batch reaches 100 entries, OR
  • 1ms has elapsed since the first pending entry was queued

File rotation handling

The active file path is passed with each write cast, so the writer automatically handles file rotations -- writes to different paths are grouped and flushed separately.

Invariants

  • A key with file_id = :pending in ETS has its value in ETS (non-nil). MemoryGuard must NOT evict these entries.
  • After the Bitcask write completes, the writer updates ETS positions 5 (file_id), 6 (offset), and 7 (value_size) via update_element.
  • Large values (>= hot_cache_max_value_size) are NOT routed here -- they use the synchronous path in StateMachine because their ETS value is nil and cold reads need a valid disk offset immediately.

Summary

Functions

Returns a specification to start this module under a supervisor.

Queues a deferred Bitcask tombstone (delete) for background processing.

Drops queued deferred writes for keys that were rolled back before reaching Bitcask.

Synchronously flushes all pending writes to Bitcask and returns.

Flushes all running BitcaskWriter processes.

Starts a BitcaskWriter for the given shard index.

Queues a batch of deferred Bitcask writes in a single cast.

Returns the registered name for a shard's BitcaskWriter.

Types

flush_error()

@type flush_error() :: {:flush_failed, pos_integer()} | {:flush_exit, term()}

flush_result()

@type flush_result() :: :ok | {:error, flush_error()}

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

delete(shard_index, active_file_path, key)

@spec delete(non_neg_integer(), binary(), binary()) :: :ok

Queues a deferred Bitcask tombstone (delete) for background processing.

Called from local/direct delete paths when Raft is not involved. The tombstone is written in the same ordered batch as value writes, so a tombstone for key K will always appear after any preceding value write for K. The ETS entry has already been deleted by the caller before this cast.

delete(instance_ctx, shard_index, active_file_path, key)

@spec delete(FerricStore.Instance.t() | nil, non_neg_integer(), binary(), binary()) ::
  :ok

discard_pending(instance_ctx, shard_index, key)

@spec discard_pending(
  FerricStore.Instance.t() | map() | nil,
  non_neg_integer(),
  binary()
) :: :ok
@spec discard_pending(FerricStore.Instance.t() | map() | nil, non_neg_integer(), [
  binary()
]) :: :ok

Drops queued deferred writes for keys that were rolled back before reaching Bitcask.

Rollback code appends its own compensation tombstone/value synchronously, so leaving the original pending writer entry behind is both obsolete and harmful: if the original path disappeared, the writer would retry forever and poison later flushes.

flush(shard_index)

@spec flush(non_neg_integer()) :: flush_result()

Synchronously flushes all pending writes to Bitcask and returns.

Used in tests and before shard shutdown to ensure all deferred writes are persisted.

flush(shard_index, timeout)

@spec flush(non_neg_integer(), timeout()) :: flush_result()
@spec flush(FerricStore.Instance.t() | map() | nil, non_neg_integer()) ::
  flush_result()

flush(instance_ctx, shard_index, timeout)

@spec flush(FerricStore.Instance.t() | map() | nil, non_neg_integer(), timeout()) ::
  flush_result()

flush_all(shard_count \\ 4, timeout \\ 10000)

@spec flush_all(non_neg_integer(), timeout()) ::
  :ok | {:error, [{non_neg_integer(), term()}]}

Flushes all running BitcaskWriter processes.

Iterates through shard indices 0..N-1 (default N=4) and flushes each writer that is alive. Missing writers are ignored; writer failures are returned with their shard index. Used in tests that need all background writes to be on disk before simulating eviction or verifying disk state.

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts a BitcaskWriter for the given shard index.

write(shard_index, active_file_path, active_file_id, ets_table, key, value, expire_at_ms)

@spec write(
  non_neg_integer(),
  binary(),
  non_neg_integer(),
  atom(),
  binary(),
  binary(),
  non_neg_integer()
) :: :ok

write(instance_ctx, shard_index, active_file_path, active_file_id, ets_table, key, value, expire_at_ms)

@spec write(
  FerricStore.Instance.t() | nil,
  non_neg_integer(),
  binary(),
  non_neg_integer(),
  atom(),
  binary(),
  binary(),
  non_neg_integer()
) :: :ok

Queues a deferred Bitcask write for background processing.

Called from StateMachine.apply/3 for small values. The write is non-blocking (cast) so replicated apply is not delayed.

Parameters

  • shard_index -- zero-based shard index
  • active_file_path -- the active log file path at the time of the write
  • active_file_id -- the numeric file ID for the active log file
  • ets_table -- the ETS table name (keydir) to update after writing
  • key -- the key being written
  • value -- the value to persist (always a binary, always < 64KB)
  • expire_at_ms -- expiry timestamp in milliseconds (0 = no expiry)

write_batch(shard_index, entries)

@spec write_batch(non_neg_integer(), [
  {binary(), non_neg_integer(), atom(), binary(), binary(), non_neg_integer()}
]) :: :ok

Queues a batch of deferred Bitcask writes in a single cast.

Same semantics as calling write/7 for each entry, but sends one GenServer cast instead of N. Each entry is a tuple: {active_file_path, active_file_id, ets_table, key, value, expire_at_ms}.

writer_name(shard_index)

@spec writer_name(non_neg_integer()) :: atom()

Returns the registered name for a shard's BitcaskWriter.

writer_name(arg1, shard_index)

@spec writer_name(FerricStore.Instance.t() | map() | nil, non_neg_integer()) :: atom()