Ferricstore.Commands.Stream (ferricstore v0.3.1)

Copy Markdown View Source

Handles Redis Stream commands: XADD, XLEN, XRANGE, XREVRANGE, XREAD, XTRIM, XDEL, XINFO STREAM, XGROUP CREATE, XREADGROUP, and XACK.

Storage layout

Stream entries are stored in the shared Bitcask via compound keys:

X:{stream_key}\x00{ms}-{seq}  =>  :erlang.term_to_binary(field_value_pairs)

Stream metadata is tracked in an ETS table (Ferricstore.Stream.Meta) for fast access without Bitcask reads:

{stream_key} => {length, first_id, last_id, last_ms, last_seq}

Consumer group state is tracked in a second ETS table (Ferricstore.Stream.Groups):

{stream_key, group_name} => {last_delivered_id, consumers, pending}

Stream IDs

Stream IDs follow the Redis format {milliseconds}-{sequence}. When the client sends *, the server auto-generates a monotonically increasing ID using Ferricstore.HLC.now_ms/0 (Hybrid Logical Clock, spec 2G.6) as the milliseconds component and an incrementing sequence number when multiple entries arrive in the same millisecond. The HLC ensures monotonicity even when the wall clock jumps backward and, in multi-node mode, tracks the cluster-wide max physical time via Raft heartbeat piggyback.

Explicit IDs must be strictly greater than the last entry's ID.

Summary

Types

A stream entry: {id_string, [field, value, ...]} flat list.

A parsed stream ID as {milliseconds, sequence}.

Functions

Removes all stream waiters registered by pid across all keys.

Ensures the stream metadata ETS tables exist.

Handles a stream command.

Eagerly creates the stream ETS tables.

Registers pid as a waiter for new entries on stream_key.

Returns the number of stream waiters for stream_key.

Unregisters pid as a waiter for stream_key.

Types

entry()

@type entry() :: {binary(), [binary()]}

A stream entry: {id_string, [field, value, ...]} flat list.

stream_id()

@type stream_id() :: {non_neg_integer(), non_neg_integer()}

A parsed stream ID as {milliseconds, sequence}.

Functions

cleanup_stream_waiters(pid)

@spec cleanup_stream_waiters(pid()) :: :ok

Removes all stream waiters registered by pid across all keys.

Called when a client disconnects.

ensure_meta_table()

@spec ensure_meta_table() :: :ok

Ensures the stream metadata ETS tables exist.

Called lazily on first use. The tables are :public and :named_table so any process can read and write them. When init_tables/0 has been called at application startup, this is a cheap no-op.

handle(cmd, args, store)

@spec handle(binary(), [binary()], map()) :: term()

Handles a stream command.

Parameters

  • cmd - Uppercased command name (e.g. "XADD", "XLEN")
  • args - List of string arguments
  • store - Injected store map with get, put, delete, exists? callbacks

Returns

Plain Elixir term: string, integer, list, map, :ok, or {:error, message}.

init_tables()

@spec init_tables() :: :ok

Eagerly creates the stream ETS tables.

Must be called once during application startup (from Application.start/2) so that the tables are owned by the long-lived application process. This prevents the tables from being destroyed when short-lived connection processes exit.

register_stream_waiter(stream_key, pid, last_seen_id)

@spec register_stream_waiter(binary(), pid(), binary()) :: :ok

Registers pid as a waiter for new entries on stream_key.

When XADD inserts a new entry into this stream, all registered waiters receive {:stream_waiter_notify, stream_key}.

Parameters

  • stream_key -- the Redis key of the stream
  • pid -- the process to notify
  • last_seen_id -- the last ID the caller has seen (for future filtering)

stream_waiter_count(stream_key)

@spec stream_waiter_count(binary()) :: non_neg_integer()

Returns the number of stream waiters for stream_key.

unregister_stream_waiter(stream_key, pid)

@spec unregister_stream_waiter(binary(), pid()) :: :ok

Unregisters pid as a waiter for stream_key.