Ferricstore.Raft.Batcher (ferricstore v0.3.4)

Copy Markdown View Source

Namespace-aware group commit batcher for a single FerricStore shard.

Per spec sections 2C.5 and 2F.3, each shard has its own Batcher GenServer that accumulates write commands into per-namespace buffers, each with its own commit window and durability mode. When a namespace's timer fires, only that namespace's buffer is flushed.

How it works

  1. A client calls write/2 which sends a GenServer.call to the batcher.
  2. The batcher extracts the key's namespace prefix (e.g. "session" from "session:abc123", "_root" for keys without a colon).
  3. The namespace config is looked up from the :ferricstore_ns_config ETS table to determine window_ms and durability for this prefix.
  4. The command and caller are appended to the namespace's buffer slot, identified by {prefix, durability}.
  5. On the first write to an empty slot, a timer is started using the namespace's window_ms.
  6. When the timer fires (:flush_slot), only that slot's commands are submitted to Raft via ra:pipeline_command/3.
  7. Each caller receives their individual result from the batch once the ra command commits and the batcher receives the ra_event notification.

Pipelined ra submission (non-blocking)

To avoid serializing all writers through one GenServer while the previous batch is in-flight through Raft consensus, the batcher uses ra:pipeline_command/3 instead of the blocking ra:process_command/2.

pipeline_command is a cast -- it returns immediately with :ok, and the batcher receives an async {ra_event, Leader, {applied, [...]}} message when the command is committed and applied by the state machine.

The batcher maintains a pending map keyed by correlation reference, which maps each in-flight batch to the list of callers (froms) that are waiting for a reply. When the ra_event arrives, the batcher extracts the result and calls GenServer.reply/2 for each caller.

This means the GenServer never blocks on Raft. During the time a batch is in-flight, the batcher continues to accept new writes and accumulate them into fresh slots. This eliminates the throughput bottleneck where 50 writers were serialized through one blocked GenServer.

Namespace configuration

Per-prefix configuration is originally sourced from the :ferricstore_ns_config ETS table managed by Ferricstore.NamespaceConfig. To avoid two ETS lookups (~400ns) on every write, the batcher caches namespace config in its process state (ns_cache). The first write for a given prefix fetches from ETS and caches the result; subsequent writes for the same prefix use the cached value with zero ETS overhead.

When namespace config changes (via FERRICSTORE.CONFIG SET or RESET), NamespaceConfig broadcasts :ns_config_changed to all batcher processes, which clears their caches. The next write for any prefix then re-reads from ETS.

If no configuration exists for a prefix, the defaults are used: window_ms = 1, durability = :quorum.

For :quorum durability, commands are submitted to ra via :ra.pipeline_command/3 with a correlation reference. Callers are replied to when the ra_event notification arrives confirming the command was applied.

For :async durability (spec 2F.3), there are two entry points:

  • Batcher.async_submit/2 (preferred) is called by Router.async_write_* after Router has already persisted locally (ETS + Bitcask for big values). Commands accumulate in a dedicated {prefix, :async_origin} slot and flush as one ra.pipeline_command({:batch, [{:async, cmd}, ...]}) for replication. The state machine's {:async, inner} clause origin-skips on the node that already has the ETS entry. No callers to reply to — Router already returned :ok to its caller.

  • Batcher.write/2 on an async namespace (legacy callers) is the blocking entry; the caller is replied :ok immediately when the slot is flushed, commands go to Raft as a regular {:batch, [cmds]} (no {:async, ...} wrapper) and the state machine applies them normally on every node.

Why a separate GenServer?

The batcher is intentionally separate from the Shard GenServer and the ra state machine. This separation keeps the batching logic independent of the consensus layer and allows the Shard to remain focused on read operations and ETS management.

Configuration

  • :shard_id (required) -- the ra server ID for this shard
  • :shard_index (required) -- zero-based shard index
  • :max_batch_size -- flush single-write slot when it reaches this size (default: 50_000)

Summary

Types

A slot holds the accumulated commands and callers for a single namespace buffer, along with the timer reference for that slot's commit window.

A slot key identifies a unique batching bucket by namespace prefix and durability mode. Commands with the same prefix but different durability modes (which can happen if config changes mid-flight) are batched separately.

Functions

Submits an async-durability write. Fire-and-forget.

Submits a list of async commands to the batcher in a single cast.

Returns the registered process name for the batcher at shard_index.

Returns a specification to start this module under a supervisor.

Extracts the namespace prefix from a command's key.

Synchronously flushes all pending writes across all namespace slots.

Force-resets the Batcher's pending correlations and flush waiters.

Starts a batcher GenServer for the given shard.

Submits a write command to the batcher for the given shard.

Submits a write command asynchronously, replying directly to reply_to.

Like write_async/3 but forces quorum durability regardless of namespace config. Used by RMW operations (INCR, APPEND, GETSET, etc.) that need consensus for atomicity even when the namespace is configured async.

Submits multiple quorum write commands in a single message.

Types

command()

@type command() ::
  {:put, binary(), binary(), non_neg_integer()}
  | {:delete, binary()}
  | {:incr, binary(), integer()}
  | {:incr_float, binary(), float()}
  | {:append, binary(), binary()}
  | {:getset, binary(), binary()}
  | {:getdel, binary()}
  | {:getex, binary(), non_neg_integer()}
  | {:setrange, binary(), non_neg_integer(), binary()}
  | {:cas, binary(), binary(), binary(), non_neg_integer() | nil}
  | {:lock, binary(), binary(), non_neg_integer()}
  | {:unlock, binary(), binary()}
  | {:extend, binary(), binary(), non_neg_integer()}
  | {:ratelimit_add, binary(), pos_integer(), pos_integer(), pos_integer()}
  | {:ratelimit_add, binary(), pos_integer(), pos_integer(), pos_integer(),
     non_neg_integer()}
  | {:list_op, binary(), term()}

slot()

@type slot() :: %{
  cmds: [command()],
  froms: [GenServer.from()],
  timer_ref: reference() | nil,
  window_ms: non_neg_integer()
}

A slot holds the accumulated commands and callers for a single namespace buffer, along with the timer reference for that slot's commit window.

slot_key()

@type slot_key() :: {binary(), :quorum | :async}

A slot key identifies a unique batching bucket by namespace prefix and durability mode. Commands with the same prefix but different durability modes (which can happen if config changes mid-flight) are batched separately.

Functions

async_submit(shard_index, inner_command)

@spec async_submit(non_neg_integer(), command()) :: :ok

Submits an async-durability write. Fire-and-forget.

Called by Router on the origin node AFTER it has already written the value locally to ETS (and Bitcask for large values). The Batcher accumulates async commands in a slot and flushes them as a single batched ra.pipeline_command for replication. The caller already has :ok from Router — no reply needed.

Commands are wrapped as {:async, inner_cmd} before submission so the state machine can distinguish them: on the origin node (which has the entry in ETS) apply/3 will skip; on replicas the inner command is applied normally.

Parameters

  • shard_index -- zero-based shard index
  • inner_command -- the raw write command (e.g. {:put, k, v, exp})

async_submit_batch(shard_index, commands)

@spec async_submit_batch(non_neg_integer(), [command()]) :: :ok

Submits a list of async commands to the batcher in a single cast.

Same semantics as calling async_submit/2 for each command, but sends one GenServer cast instead of N — reduces message passing overhead for batch async writes.

batcher_name(shard_index)

@spec batcher_name(non_neg_integer()) :: atom()

Returns the registered process name for the batcher at shard_index.

Examples

iex> Ferricstore.Raft.Batcher.batcher_name(0)
:"Ferricstore.Raft.Batcher.0"

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

extract_prefix(command)

@spec extract_prefix(command()) :: binary()

Extracts the namespace prefix from a command's key.

The prefix is the portion of the key before the first colon (:). Keys without a colon are assigned to the "_root" namespace.

Parameters

  • command -- a write command tuple

Examples

iex> Ferricstore.Raft.Batcher.extract_prefix({:put, "session:abc", "v", 0})
"session"

iex> Ferricstore.Raft.Batcher.extract_prefix({:delete, "nocolon"})
"_root"

iex> Ferricstore.Raft.Batcher.extract_prefix({:put, "ts:sensor:42", "v", 0})
"ts"

flush(shard_index)

@spec flush(non_neg_integer()) :: :ok

Synchronously flushes all pending writes across all namespace slots.

Used in tests and before shard shutdown to ensure all writes are committed. Waits for all in-flight pipelined ra commands to complete before returning.

reset_pending(shard_index)

@spec reset_pending(non_neg_integer()) :: :ok

Force-resets the Batcher's pending correlations and flush waiters.

Intended for test setup/teardown when a prior test left the Batcher in a stuck state (ra leader crash, disk errors, orphan correlations). Callers blocked on :single/:batch pending entries receive {:error, :reset}; flush waiters receive :ok.

Do not call this from production code — it silently drops replication acks.

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts a batcher GenServer for the given shard.

Options

  • :shard_id (required) -- ra server ID {name, node()} for this shard
  • :shard_index (required) -- zero-based shard index (used for process name)
  • :max_batch_size -- max commands per slot before forced flush (default: 50000)

write(shard_index, command)

@spec write(non_neg_integer(), command()) :: :ok | {:error, term()}

Submits a write command to the batcher for the given shard.

The command is accumulated into the appropriate namespace buffer and submitted when the namespace's commit window expires or the buffer reaches max_batch_size.

For :quorum durability, this call blocks until the ra command is committed and applied. For :async durability, the call returns as soon as the slot is flushed (state machine application continues in the background on the origin and on replicas).

Parameters

  • shard_index -- zero-based shard index
  • command -- a write command tuple, e.g. {:put, key, value, expire_at_ms}

Returns

  • :ok on success
  • {:error, reason} on failure

write_async(shard_index, command, reply_to)

@spec write_async(non_neg_integer(), command(), GenServer.from()) :: :ok

Submits a write command asynchronously, replying directly to reply_to.

Unlike write/2, this function does not block the calling process. The batcher accepts the command via GenServer.cast (non-blocking) and will call GenServer.reply(reply_to, result) when the command is committed.

This is used by the Shard GenServer to avoid blocking on Raft consensus. The Shard returns {:noreply, state} and the Batcher replies directly to the original caller (Router/connection process).

Parameters

  • shard_index -- zero-based shard index
  • command -- a write command tuple
  • reply_to -- the from ref from the caller's GenServer.call

write_async_quorum(shard_index, command, reply_to)

@spec write_async_quorum(non_neg_integer(), command(), GenServer.from()) :: :ok

Like write_async/3 but forces quorum durability regardless of namespace config. Used by RMW operations (INCR, APPEND, GETSET, etc.) that need consensus for atomicity even when the namespace is configured async.

write_batch(shard_index, cmds, from)

@spec write_batch(non_neg_integer(), [command()], GenServer.from()) :: :ok

Submits multiple quorum write commands in a single message.

Takes a list of commands and a single from ref. All commands are enqueued into the quorum slot as a batch. The from receives a single {:ok, [results]} reply after Ra commit — one round-trip per shard, not per command.