Namespace-aware group commit batcher for a single FerricStore shard.
Per spec sections 2C.5 and 2F.3, each shard has its own Batcher GenServer that accumulates write commands into per-namespace buffers, each with its own commit window and durability mode. When a namespace's timer fires, only that namespace's buffer is flushed.
How it works
- A client calls
write/2which sends aGenServer.callto the batcher. - The batcher extracts the key's namespace prefix (e.g.
"session"from"session:abc123","_root"for keys without a colon). - The namespace config is looked up from the
:ferricstore_ns_configETS table to determinewindow_msanddurabilityfor this prefix. - The command and caller are appended to the namespace's buffer slot,
identified by
{prefix, durability}. - On the first write to an empty slot, a timer is started using the
namespace's
window_ms. - When the timer fires (
:flush_slot), only that slot's commands are submitted to Raft viara:pipeline_command/3. - Each caller receives their individual result from the batch once the
ra command commits and the batcher receives the
ra_eventnotification.
Pipelined ra submission (non-blocking)
To avoid serializing all writers through one GenServer while the previous
batch is in-flight through Raft consensus, the batcher uses
ra:pipeline_command/3 instead of the blocking ra:process_command/2.
pipeline_command is a cast -- it returns immediately with :ok, and
the batcher receives an async {ra_event, Leader, {applied, [...]}} message
when the command is committed and applied by the state machine.
The batcher maintains a pending map keyed by correlation reference, which
maps each in-flight batch to the list of callers (froms) that are waiting
for a reply. When the ra_event arrives, the batcher extracts the result
and calls GenServer.reply/2 for each caller.
This means the GenServer never blocks on Raft. During the time a batch is in-flight, the batcher continues to accept new writes and accumulate them into fresh slots. This eliminates the throughput bottleneck where 50 writers were serialized through one blocked GenServer.
Namespace configuration
Per-prefix configuration is originally sourced from the :ferricstore_ns_config
ETS table managed by Ferricstore.NamespaceConfig. To avoid two ETS lookups
(~400ns) on every write, the batcher caches namespace config in its process
state (ns_cache). The first write for a given prefix fetches from ETS and
caches the result; subsequent writes for the same prefix use the cached
value with zero ETS overhead.
When namespace config changes (via FERRICSTORE.CONFIG SET or RESET),
NamespaceConfig broadcasts :ns_config_changed to all batcher processes,
which clears their caches. The next write for any prefix then re-reads
from ETS.
If no configuration exists for a prefix, the defaults are used:
window_ms = 1, durability = :quorum.
For :quorum durability, commands are submitted to ra via
:ra.pipeline_command/3 with a correlation reference. Callers are replied to
when the ra_event notification arrives confirming the command was applied.
For :async durability (spec 2F.3), there are two entry points:
Batcher.async_submit/2(preferred) is called byRouter.async_write_*after Router has already persisted locally (ETS + Bitcask for big values). Commands accumulate in a dedicated{prefix, :async_origin}slot and flush as onera.pipeline_command({:batch, [{:async, cmd}, ...]})for replication. The state machine's{:async, inner}clause origin-skips on the node that already has the ETS entry. No callers to reply to — Router already returned:okto its caller.Batcher.write/2on an async namespace (legacy callers) is the blocking entry; the caller is replied:okimmediately when the slot is flushed, commands go to Raft as a regular{:batch, [cmds]}(no{:async, ...}wrapper) and the state machine applies them normally on every node.
Why a separate GenServer?
The batcher is intentionally separate from the Shard GenServer and the ra state machine. This separation keeps the batching logic independent of the consensus layer and allows the Shard to remain focused on read operations and ETS management.
Configuration
:shard_id(required) -- the ra server ID for this shard:shard_index(required) -- zero-based shard index:max_batch_size-- flush single-write slot when it reaches this size (default: 50_000)
Summary
Types
A slot holds the accumulated commands and callers for a single namespace buffer, along with the timer reference for that slot's commit window.
A slot key identifies a unique batching bucket by namespace prefix and durability mode. Commands with the same prefix but different durability modes (which can happen if config changes mid-flight) are batched separately.
Functions
Submits an async-durability write. Fire-and-forget.
Submits a list of async commands to the batcher in a single cast.
Blocks until this node's local state machine for shard_index has applied
the entry at ra_index. Used by Router.forward_to_leader/4 to barrier
read-your-write after a quorum write was redirected to a peer leader.
Returns the registered process name for the batcher at shard_index.
Returns a specification to start this module under a supervisor.
Extracts the namespace prefix from a command's key.
Synchronously flushes all pending writes across all namespace slots.
Force-resets the Batcher's pending correlations and flush waiters.
Starts a batcher GenServer for the given shard.
Submits a write command to the batcher for the given shard.
Submits a write command asynchronously, replying directly to reply_to.
Like write_async/3 but forces quorum durability regardless of namespace
config. Used by RMW operations (INCR, APPEND, GETSET, etc.) that need
consensus for atomicity even when the namespace is configured async.
Submits multiple quorum write commands in a single message.
Types
@type command() :: {:put, binary(), binary(), non_neg_integer()} | {:delete, binary()} | {:incr, binary(), integer()} | {:incr_float, binary(), float()} | {:append, binary(), binary()} | {:getset, binary(), binary()} | {:getdel, binary()} | {:getex, binary(), non_neg_integer()} | {:setrange, binary(), non_neg_integer(), binary()} | {:cas, binary(), binary(), binary(), non_neg_integer() | nil} | {:lock, binary(), binary(), non_neg_integer()} | {:unlock, binary(), binary()} | {:extend, binary(), binary(), non_neg_integer()} | {:ratelimit_add, binary(), pos_integer(), pos_integer(), pos_integer()} | {:ratelimit_add, binary(), pos_integer(), pos_integer(), pos_integer(), non_neg_integer()} | {:list_op, binary(), term()}
@type slot() :: %{ cmds: [command()], froms: [GenServer.from()], timer_ref: reference() | nil, window_ms: non_neg_integer() }
A slot holds the accumulated commands and callers for a single namespace buffer, along with the timer reference for that slot's commit window.
@type slot_key() :: {binary(), :quorum | :async}
A slot key identifies a unique batching bucket by namespace prefix and durability mode. Commands with the same prefix but different durability modes (which can happen if config changes mid-flight) are batched separately.
Functions
@spec async_submit(non_neg_integer(), command()) :: :ok
Submits an async-durability write. Fire-and-forget.
Called by Router on the origin node AFTER it has already written the value
locally to ETS (and Bitcask for large values). The Batcher accumulates async
commands in a slot and flushes them as a single batched ra.pipeline_command
for replication. The caller already has :ok from Router — no reply needed.
Commands are wrapped as {:async, inner_cmd} before submission so the
state machine can distinguish them: on the origin node (which has the entry
in ETS) apply/3 will skip; on replicas the inner command is applied normally.
Parameters
shard_index-- zero-based shard indexinner_command-- the raw write command (e.g.{:put, k, v, exp})
@spec async_submit_batch(non_neg_integer(), [command()]) :: :ok
Submits a list of async commands to the batcher in a single cast.
Same semantics as calling async_submit/2 for each command, but sends
one GenServer cast instead of N — reduces message passing overhead for
batch async writes.
@spec await_local_applied(non_neg_integer(), non_neg_integer(), non_neg_integer()) :: :ok | {:error, :timeout}
Blocks until this node's local state machine for shard_index has applied
the entry at ra_index. Used by Router.forward_to_leader/4 to barrier
read-your-write after a quorum write was redirected to a peer leader.
Returns :ok once last_local_applied >= ra_index, or {:error, :timeout}
if the local apply hasn't caught up within timeout_ms.
@spec batcher_name(non_neg_integer()) :: atom()
Returns the registered process name for the batcher at shard_index.
Examples
iex> Ferricstore.Raft.Batcher.batcher_name(0)
:"Ferricstore.Raft.Batcher.0"
Returns a specification to start this module under a supervisor.
See Supervisor.
Extracts the namespace prefix from a command's key.
The prefix is the portion of the key before the first colon (:).
Keys without a colon are assigned to the "_root" namespace.
Parameters
command-- a write command tuple
Examples
iex> Ferricstore.Raft.Batcher.extract_prefix({:put, "session:abc", "v", 0})
"session"
iex> Ferricstore.Raft.Batcher.extract_prefix({:delete, "nocolon"})
"_root"
iex> Ferricstore.Raft.Batcher.extract_prefix({:put, "ts:sensor:42", "v", 0})
"ts"
@spec flush(non_neg_integer()) :: :ok
Synchronously flushes all pending writes across all namespace slots.
Used in tests and before shard shutdown to ensure all writes are committed. Waits for all in-flight pipelined ra commands to complete before returning.
@spec reset_pending(non_neg_integer()) :: :ok
Force-resets the Batcher's pending correlations and flush waiters.
Intended for test setup/teardown when a prior test left the Batcher in a
stuck state (ra leader crash, disk errors, orphan correlations). Callers
blocked on :single/:batch pending entries receive {:error, :reset};
flush waiters receive :ok.
Do not call this from production code — it silently drops replication acks.
@spec start_link(keyword()) :: GenServer.on_start()
Starts a batcher GenServer for the given shard.
Options
:shard_id(required) -- ra server ID{name, node()}for this shard:shard_index(required) -- zero-based shard index (used for process name):max_batch_size-- max commands per slot before forced flush (default: 50000)
@spec write(non_neg_integer(), command()) :: :ok | {:error, term()}
Submits a write command to the batcher for the given shard.
The command is accumulated into the appropriate namespace buffer and
submitted when the namespace's commit window expires or the buffer
reaches max_batch_size.
For :quorum durability, this call blocks until the ra command is
committed and applied. For :async durability, the call returns as
soon as the slot is flushed (state machine application continues in
the background on the origin and on replicas).
Parameters
shard_index-- zero-based shard indexcommand-- a write command tuple, e.g.{:put, key, value, expire_at_ms}
Returns
:okon success{:error, reason}on failure
@spec write_async(non_neg_integer(), command(), GenServer.from()) :: :ok
Submits a write command asynchronously, replying directly to reply_to.
Unlike write/2, this function does not block the calling process.
The batcher accepts the command via GenServer.cast (non-blocking) and
will call GenServer.reply(reply_to, result) when the command is committed.
This is used by the Shard GenServer to avoid blocking on Raft consensus.
The Shard returns {:noreply, state} and the Batcher replies directly
to the original caller (Router/connection process).
Parameters
shard_index-- zero-based shard indexcommand-- a write command tuplereply_to-- thefromref from the caller'sGenServer.call
@spec write_async_quorum(non_neg_integer(), command(), GenServer.from()) :: :ok
Like write_async/3 but forces quorum durability regardless of namespace
config. Used by RMW operations (INCR, APPEND, GETSET, etc.) that need
consensus for atomicity even when the namespace is configured async.
@spec write_batch(non_neg_integer(), [command()], GenServer.from()) :: :ok
Submits multiple quorum write commands in a single message.
Takes a list of commands and a single from ref. All commands are
enqueued into the quorum slot as a batch. The from receives a
single {:ok, [results]} reply after Ra commit — one round-trip
per shard, not per command.