Ferricstore.Commands.Stream (ferricstore v0.4.2)

Copy Markdown View Source

Handles Redis Stream commands: XADD, XLEN, XRANGE, XREVRANGE, XREAD, XTRIM, XDEL, XINFO STREAM, XGROUP CREATE, XREADGROUP, and XACK.

Storage layout

Stream entries are stored in the shared Bitcask via compound keys:

X:{stream_key}\x00{ms}-{seq}  =>  :erlang.term_to_binary(field_value_pairs)

Stream metadata is tracked in an ETS table (Ferricstore.Stream.Meta) for fast access without Bitcask reads:

{stream_key} => {length, first_id, last_id, last_ms, last_seq}

Consumer group state is persisted as stream compound metadata and cached in a second ETS table (Ferricstore.Stream.Groups):

{stream_key, group_name} => {last_delivered_id, consumers, pending}

Stream IDs

Stream IDs follow the Redis format {milliseconds}-{sequence}. When the client sends *, the server auto-generates a monotonically increasing ID using Ferricstore.CommandTime.now_ms/0 as the milliseconds component and an incrementing sequence number when multiple entries arrive in the same millisecond. Outside Raft this reads the Hybrid Logical Clock; inside Raft it reads the stamped log-entry time so replicas generate the same IDs.

Explicit IDs must be strictly greater than the last entry's ID.

Summary

Types

A stream entry: {id_string, [field, value, ...]} flat list.

A parsed stream ID as {milliseconds, sequence}.

Functions

See Ferricstore.Commands.Stream.Waiters.cleanup/1.

Clears all local, non-durable stream state.

Ensures the stream metadata ETS tables exist.

Handles a stream command.

Eagerly creates the stream ETS tables.

See Ferricstore.Commands.Stream.Waiters.register/3.

See Ferricstore.Commands.Stream.Waiters.count/1.

See Ferricstore.Commands.Stream.Waiters.unregister/2.

Types

entry()

@type entry() :: {binary(), [binary()]}

A stream entry: {id_string, [field, value, ...]} flat list.

stream_id()

@type stream_id() :: {non_neg_integer(), non_neg_integer()}

A parsed stream ID as {milliseconds, sequence}.

Functions

cleanup_stream_waiters(pid)

@spec cleanup_stream_waiters(pid()) :: :ok

See Ferricstore.Commands.Stream.Waiters.cleanup/1.

clear_local_state()

@spec clear_local_state() :: :ok

Clears all local, non-durable stream state.

Stream entries live in the store through compound keys. These ETS tables are acceleration/waiter state and must not survive FLUSHDB/FLUSHALL, otherwise a recreated stream can retain stale range-index rows or blocked readers.

ensure_meta_table()

@spec ensure_meta_table() :: :ok

Ensures the stream metadata ETS tables exist.

Called lazily on first use. The tables are :public and :named_table so any process can read and write them. When init_tables/0 has been called at application startup, this is a cheap no-op.

handle(cmd, args, store)

@spec handle(binary(), [binary()], map()) :: term()

Handles a stream command.

Parameters

  • cmd - Uppercased command name (e.g. "XADD", "XLEN")
  • args - List of string arguments
  • store - Injected store map with get, put, delete, exists? callbacks

Returns

Plain Elixir term: string, integer, list, map, :ok, or {:error, message}.

handle_ast(ast, store)

@spec handle_ast(term(), map()) :: term()

init_tables()

@spec init_tables() :: :ok

Eagerly creates the stream ETS tables.

Must be called once during application startup (from Application.start/2) so that the tables are owned by the long-lived application process. This prevents the tables from being destroyed when short-lived connection processes exit.

register_stream_waiter(stream_key, pid, last_seen_id)

@spec register_stream_waiter(binary(), pid(), binary()) :: :ok

See Ferricstore.Commands.Stream.Waiters.register/3.

stream_waiter_count(stream_key)

@spec stream_waiter_count(binary()) :: non_neg_integer()

See Ferricstore.Commands.Stream.Waiters.count/1.

unregister_stream_waiter(stream_key, pid)

@spec unregister_stream_waiter(binary(), pid()) :: :ok

See Ferricstore.Commands.Stream.Waiters.unregister/2.