Handles Redis Stream commands: XADD, XLEN, XRANGE, XREVRANGE, XREAD, XTRIM, XDEL, XINFO STREAM, XGROUP CREATE, XREADGROUP, and XACK.
Storage layout
Stream entries are stored in the shared Bitcask via compound keys:
X:{stream_key}\x00{ms}-{seq} => :erlang.term_to_binary(field_value_pairs)Stream metadata is tracked in an ETS table (Ferricstore.Stream.Meta) for
fast access without Bitcask reads:
{stream_key} => {length, first_id, last_id, last_ms, last_seq}Consumer group state is tracked in a second ETS table
(Ferricstore.Stream.Groups):
{stream_key, group_name} => {last_delivered_id, consumers, pending}Stream IDs
Stream IDs follow the Redis format {milliseconds}-{sequence}. When the
client sends *, the server auto-generates a monotonically increasing ID
using Ferricstore.HLC.now_ms/0 (Hybrid Logical Clock, spec 2G.6) as the
milliseconds component and an incrementing sequence number when multiple
entries arrive in the same millisecond. The HLC ensures monotonicity even
when the wall clock jumps backward and, in multi-node mode, tracks the
cluster-wide max physical time via Raft heartbeat piggyback.
Explicit IDs must be strictly greater than the last entry's ID.
Summary
Types
A stream entry: {id_string, [field, value, ...]} flat list.
A parsed stream ID as {milliseconds, sequence}.
Functions
Removes all stream waiters registered by pid across all keys.
Ensures the stream metadata ETS tables exist.
Handles a stream command.
Eagerly creates the stream ETS tables.
Registers pid as a waiter for new entries on stream_key.
Returns the number of stream waiters for stream_key.
Unregisters pid as a waiter for stream_key.
Types
A stream entry: {id_string, [field, value, ...]} flat list.
@type stream_id() :: {non_neg_integer(), non_neg_integer()}
A parsed stream ID as {milliseconds, sequence}.
Functions
@spec cleanup_stream_waiters(pid()) :: :ok
Removes all stream waiters registered by pid across all keys.
Called when a client disconnects.
@spec ensure_meta_table() :: :ok
Ensures the stream metadata ETS tables exist.
Called lazily on first use. The tables are :public and :named_table
so any process can read and write them. When init_tables/0 has been
called at application startup, this is a cheap no-op.
Handles a stream command.
Parameters
cmd- Uppercased command name (e.g."XADD","XLEN")args- List of string argumentsstore- Injected store map withget,put,delete,exists?callbacks
Returns
Plain Elixir term: string, integer, list, map, :ok, or {:error, message}.
@spec init_tables() :: :ok
Eagerly creates the stream ETS tables.
Must be called once during application startup (from Application.start/2)
so that the tables are owned by the long-lived application process. This
prevents the tables from being destroyed when short-lived connection
processes exit.
Registers pid as a waiter for new entries on stream_key.
When XADD inserts a new entry into this stream, all registered waiters
receive {:stream_waiter_notify, stream_key}.
Parameters
stream_key-- the Redis key of the streampid-- the process to notifylast_seen_id-- the last ID the caller has seen (for future filtering)
@spec stream_waiter_count(binary()) :: non_neg_integer()
Returns the number of stream waiters for stream_key.
Unregisters pid as a waiter for stream_key.