Ferricstore.Store.RmwCoordinator (ferricstore v0.3.5)

Copy Markdown View Source

Per-shard fallback for async read-modify-write (RMW) commands under contention.

See docs/async-rmw-design.md for the full design. In short:

  • Router.async_rmw/4 tries :ets.insert_new(latch_tab, {key, self()}). If it wins the latch, it runs the RMW inline in the caller's process (~15μs p50). Fast path.
  • If the latch is already held, async_rmw falls through here and does GenServer.call(RmwCoordinator.name(shard), {:rmw, cmd}). The worker processes RMW commands serially from its mailbox (FIFO). This is the slow path under heavy same-key contention, but it never loses updates and callers sleep on receive while queued (zero CPU).

The worker itself also acquires the per-key latch before executing — bounded spin, because at most one latch holder exists for any key, and only one process (the worker) ever spins. No thundering herd.

Periodic latch sweep (every 5s) removes entries whose holder pid is dead — recovery path for a caller that crashed between insert_new and ets.take.

Summary

Functions

Returns a specification to start this module under a supervisor.

Execute an RMW command via the worker (fallback path).

Registered process name for the coordinator at the given shard index.

Starts the coordinator for the given shard index.

Force a sweep of stale latches for this shard. Intended for tests.

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

execute(idx, cmd)

@spec execute(non_neg_integer(), tuple()) :: term()

Execute an RMW command via the worker (fallback path).

Callers only reach this when they lost the latch CAS in their fast path. Returns the command's natural result (e.g. {:ok, integer} for INCR, old_value_or_nil for GETSET/GETDEL, the push's new length for LPUSH, etc.).

Accepted command shapes:

  • Plain RMW: {:incr, k, d}, {:incr_float, k, d}, {:append, k, s}, {:getset, k, v}, {:getdel, k}, {:getex, k, e}, {:setrange, k, o, v}.
  • List ops: {:list_op, k, operation}, {:list_op_lmove, src, dst, from, to}.

Timeouts and worker crashes propagate as :exit to the caller; the caller's async_* function catches them and returns {:error, msg}.

name(idx)

@spec name(non_neg_integer()) :: atom()

Registered process name for the coordinator at the given shard index.

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts the coordinator for the given shard index.

sweep_latches(idx)

@spec sweep_latches(non_neg_integer()) :: :ok

Force a sweep of stale latches for this shard. Intended for tests.