reckon_db_streams (reckon_db v2.3.7)
View SourceStreams API facade for reckon-db
Provides the public API for stream operations: - append: Write events to a stream with optimistic concurrency - read: Read events from a stream - get_version: Get current stream version - exists: Check if stream exists - list_streams: List all streams in the store
Summary
Functions
Append events to a stream with expected version check
Delete a stream and all its events
Check if a stream exists
Get current version of a stream
Check if a store contains at least one event. Cannot rely on stream existence alone — streams can survive after all their events are deleted (truncation, GDPR erasure). Checks for actual event data by reading 1 event globally.
List all streams in the store
Read events from a stream
Read events from a stream with explicit options.
Read all events from a stream
Read all events across all streams in global epoch_us order.
Read all events of specific types from all streams using Khepri native filtering.
Read all events matching tags from all streams.
Types
-type direction() :: forward | backward.
-type event() :: #event{event_id :: binary(), event_type :: binary(), stream_id :: binary(), version :: non_neg_integer(), data :: map() | binary(), metadata :: map(), tags :: [binary()] | undefined, timestamp :: integer(), epoch_us :: integer(), data_content_type :: binary(), metadata_content_type :: binary(), prev_event_hash :: binary() | undefined, mac :: {KeyId :: non_neg_integer(), MacBytes :: binary()} | undefined, signature :: binary() | undefined}.
-type integrity_ctx() :: disabled | {enabled, Key :: binary(), ChainStart :: non_neg_integer()}.
-type read_opts() :: #{verify => verify_mode()}.
-type verify_mode() :: skip_legacy | strict | skip_all.
Functions
-spec append(atom(), binary(), integer(), [new_event()]) -> {ok, non_neg_integer()} | {error, term()}.
Append events to a stream with expected version check
Expected version semantics: -1 (NO_STREAM) - Stream must not exist (first write) -2 (ANY_VERSION) - No version check, always append N >= 0 - Stream version must equal N
Returns {ok, NewVersion} on success or {error, Reason} on failure.
Delete a stream and all its events
Check if a stream exists
Get current version of a stream
Returns: -1 - if stream doesn't exist or is empty N >= 0 - representing the version of the latest event
Check if a store contains at least one event. Cannot rely on stream existence alone — streams can survive after all their events are deleted (truncation, GDPR erasure). Checks for actual event data by reading 1 event globally.
List all streams in the store
-spec read(atom(), binary(), non_neg_integer(), pos_integer(), direction()) -> {ok, [event()]} | {error, term()}.
Read events from a stream
Parameters: StoreId - The store identifier StreamId - The stream identifier StartVersion - Starting version (0-based) Count - Maximum number of events to read Direction - forward or backward
Returns {ok, [Event]} or {error, Reason}
-spec read(atom(), binary(), non_neg_integer(), pos_integer(), direction(), read_opts()) -> {ok, [event()]} | {error, term()}.
Read events from a stream with explicit options.
Currently supported options:
verify :: skip_legacy | strict | skip_all Tamper-resistance enforcement mode. Default: skip_legacy. - skip_legacy (default): events with version below the per-stream chain_start watermark are returned untouched (legacy data); events at or above the watermark are verified strictly and an integrity_violation is returned on any failure. - strict: every event must carry integrity fields and verify; legacy events surface as missing_integrity. - skip_all: no verification (dangerous; intended for migration tooling only).
Backward-direction reads always bypass chain verification in 2.1.0; the MAC alone could still be checked but is not in this release. Forward reads receive full chain + MAC verification.
Read all events from a stream
-spec read_all_global(atom(), non_neg_integer(), pos_integer()) -> {ok, [event()]} | {error, term()}.
Read all events across all streams in global epoch_us order.
Returns events sorted by epoch_us, skipping Offset events and returning up to BatchSize events. Used by catch-up subscriptions to replay historical events to a subscriber.
Parameters: StoreId - The store identifier Offset - Number of events to skip (0-based) BatchSize - Maximum number of events to return
Returns events sorted by epoch_us (global ordering).
-spec read_by_event_types(atom(), [binary()], pos_integer()) -> {ok, [event()]} | {error, term()}.
Read all events of specific types from all streams using Khepri native filtering.
This function uses Khepri's built-in #if_data_matches condition to filter events by type at the database level, avoiding loading all events into memory.
Parameters: StoreId - The store identifier EventTypes - List of event type binaries to match BatchSize - Maximum number of events to return (for pagination)
Returns events sorted by epoch_us (global ordering).
-spec read_by_tags(atom(), [binary()], any | all, pos_integer()) -> {ok, [event()]} | {error, term()}.
Read all events matching tags from all streams.
Tags provide a mechanism for cross-stream querying without affecting stream-based concurrency control. This is useful for the process-centric model where you want to find all events related to specific participants.
Match Modes
any (default): Returns events containing ANY of the specified tags (union). Example: read_by_tags(Store, [<<"student:456">>, <<"student:789">>], any, 100) Returns events for either student.
all: Returns events containing ALL of the specified tags (intersection). Example: read_by_tags(Store, [<<"student:456">>, <<"course:CS101">>], all, 100) Returns only events tagged with both student 456 AND course CS101.
Parameters
StoreId - The store identifier Tags - List of tag binaries to match Match - any | all (matching strategy) BatchSize - Maximum number of events to return
Returns
Events sorted by epoch_us (global ordering).