Handles Redis Stream commands: XADD, XLEN, XRANGE, XREVRANGE, XREAD, XTRIM, XDEL, XINFO STREAM, XGROUP CREATE, XREADGROUP, and XACK.
Storage layout
Stream entries are stored in the shared Bitcask via compound keys:
X:{stream_key}\x00{ms}-{seq} => :erlang.term_to_binary(field_value_pairs)Stream metadata is tracked in an ETS table (Ferricstore.Stream.Meta) for
fast access without Bitcask reads:
{stream_key} => {length, first_id, last_id, last_ms, last_seq}Consumer group state is persisted as stream compound metadata and cached in
a second ETS table (Ferricstore.Stream.Groups):
{stream_key, group_name} => {last_delivered_id, consumers, pending}Stream IDs
Stream IDs follow the Redis format {milliseconds}-{sequence}. When the
client sends *, the server auto-generates a monotonically increasing ID
using Ferricstore.CommandTime.now_ms/0 as the milliseconds component and an
incrementing sequence number when multiple entries arrive in the same
millisecond. Outside Raft this reads the Hybrid Logical Clock; inside Raft it
reads the stamped log-entry time so replicas generate the same IDs.
Explicit IDs must be strictly greater than the last entry's ID.
Summary
Types
A stream entry: {id_string, [field, value, ...]} flat list.
A parsed stream ID as {milliseconds, sequence}.
Functions
See Ferricstore.Commands.Stream.Waiters.cleanup/1.
Clears all local, non-durable stream state.
Ensures the stream metadata ETS tables exist.
Handles a stream command.
Eagerly creates the stream ETS tables.
See Ferricstore.Commands.Stream.Waiters.register/3.
See Ferricstore.Commands.Stream.Waiters.count/1.
See Ferricstore.Commands.Stream.Waiters.unregister/2.
Types
A stream entry: {id_string, [field, value, ...]} flat list.
@type stream_id() :: {non_neg_integer(), non_neg_integer()}
A parsed stream ID as {milliseconds, sequence}.
Functions
@spec cleanup_stream_waiters(pid()) :: :ok
See Ferricstore.Commands.Stream.Waiters.cleanup/1.
@spec clear_local_state() :: :ok
Clears all local, non-durable stream state.
Stream entries live in the store through compound keys. These ETS tables are acceleration/waiter state and must not survive FLUSHDB/FLUSHALL, otherwise a recreated stream can retain stale range-index rows or blocked readers.
@spec ensure_meta_table() :: :ok
Ensures the stream metadata ETS tables exist.
Called lazily on first use. The tables are :public and :named_table
so any process can read and write them. When init_tables/0 has been
called at application startup, this is a cheap no-op.
Handles a stream command.
Parameters
cmd- Uppercased command name (e.g."XADD","XLEN")args- List of string argumentsstore- Injected store map withget,put,delete,exists?callbacks
Returns
Plain Elixir term: string, integer, list, map, :ok, or {:error, message}.
@spec init_tables() :: :ok
Eagerly creates the stream ETS tables.
Must be called once during application startup (from Application.start/2)
so that the tables are owned by the long-lived application process. This
prevents the tables from being destroyed when short-lived connection
processes exit.
See Ferricstore.Commands.Stream.Waiters.register/3.
@spec stream_waiter_count(binary()) :: non_neg_integer()
See Ferricstore.Commands.Stream.Waiters.count/1.
See Ferricstore.Commands.Stream.Waiters.unregister/2.