evoq_event_store (evoq v1.19.0)

View Source

Wrapper for event store operations via adapter.

Provides a consistent interface for event store operations, delegating to a configured adapter.

Configuration (Required)

You must configure an adapter in your application config:

  {evoq, [
      {event_store_adapter, evoq_esdb_gater_adapter}
  ]}

Available adapters: - evoq_esdb_gater_adapter (from reckon_evoq package)

Summary

Functions

Conditionally append events under the DCB pseudo-stream (Dynamic Consistency Boundary, paired with reckon-db 3.1.0+).

Check if a stream exists.

Get the configured event store adapter. Crashes if no adapter is configured.

Check if a store contains at least one event.

List all streams in the store.

Read all events from a stream.

Read all events from a stream with batch size.

Read all events from all streams, sorted by global position. This is useful for projection rebuild.

Read all events across all streams in global order.

Read events across streams by tag match. Match is the atom any (union) or all (intersection).

Read all events of specific types from all streams.

Set the event store adapter (primarily for testing).

Get the current version of a stream.

Types

evoq_event/0

-type evoq_event() ::
          #evoq_event{event_id :: binary(),
                      event_type :: binary(),
                      stream_id :: binary(),
                      version :: non_neg_integer(),
                      data :: map() | binary(),
                      metadata :: map(),
                      tags :: [binary()] | undefined,
                      timestamp :: integer(),
                      epoch_us :: integer(),
                      data_content_type :: binary(),
                      metadata_content_type :: binary(),
                      prev_event_hash :: binary() | undefined}.

Functions

append(StoreId, StreamId, ExpectedVersion, Events)

-spec append(atom(), binary(), integer(), [map()]) -> {ok, non_neg_integer()} | {error, term()}.

Append events to a stream.

append_if_no_tag_matches(StoreId, TagFilter, SeqCutoff, Events)

-spec append_if_no_tag_matches(atom(), term(), integer(), [map()]) ->
                                  {ok, non_neg_integer()} |
                                  {error, {context_changed, non_neg_integer()}} |
                                  {error, term()}.

Conditionally append events under the DCB pseudo-stream (Dynamic Consistency Boundary, paired with reckon-db 3.1.0+).

TagFilter is the consistency context query (a backend-defined tag-filter term). SeqCutoff is the highest seq the caller saw (or -1 for "saw nothing"). Returns {error, {context_changed, MaxSeq}} if any event matching TagFilter has seq above SeqCutoff.

The typical caller is evoq_decision_runtime; user code uses the evoq_decision behaviour rather than calling this directly.

exists(StoreId, StreamId)

-spec exists(atom(), binary()) -> boolean().

Check if a stream exists.

get_adapter()

-spec get_adapter() -> module().

Get the configured event store adapter. Crashes if no adapter is configured.

has_events(StoreId)

-spec has_events(atom()) -> boolean().

Check if a store contains at least one event.

list_streams(StoreId)

-spec list_streams(atom()) -> {ok, [binary()]} | {error, term()}.

List all streams in the store.

read(StoreId, StreamId, FromVersion, Count, Direction)

-spec read(atom(), binary(), non_neg_integer(), pos_integer(), forward | backward) ->
              {ok, [map()]} | {error, term()}.

Read events from a stream.

read_all(StoreId, StreamId, Direction)

-spec read_all(atom(), binary(), forward | backward) -> {ok, [map()]} | {error, term()}.

Read all events from a stream.

read_all(StoreId, StreamId, BatchSize, Direction)

-spec read_all(atom(), binary(), pos_integer(), forward | backward) -> {ok, [map()]} | {error, term()}.

Read all events from a stream with batch size.

read_all_events(StoreId, BatchSize)

-spec read_all_events(atom(), pos_integer()) -> {ok, [map()]} | {error, term()}.

Read all events from all streams, sorted by global position. This is useful for projection rebuild.

read_all_global(StoreId, Offset, BatchSize)

-spec read_all_global(atom(), non_neg_integer(), pos_integer()) ->
                         {ok, [evoq_event()]} | {error, term()}.

Read all events across all streams in global order.

Returns events sorted by epoch_us, starting from Offset. Used for catch-up subscriptions and global event replay. Falls back to read_all_events/2 if adapter does not implement the optional read_all_global/3 callback.

read_by_tags(StoreId, Tags, Match, BatchSize)

-spec read_by_tags(atom(), [binary()], any | all, pos_integer()) -> {ok, [map()]} | {error, term()}.

Read events across streams by tag match. Match is the atom any (union) or all (intersection).

read_events_by_types(StoreId, EventTypes, BatchSize)

-spec read_events_by_types(atom(), [binary()], pos_integer()) -> {ok, [map()]} | {error, term()}.

Read all events of specific types from all streams.

Routes through the adapter which uses native filtering when available. Returns events sorted by epoch_us (global ordering).

set_adapter(Adapter)

-spec set_adapter(module()) -> ok.

Set the event store adapter (primarily for testing).

version(StoreId, StreamId)

-spec version(atom(), binary()) -> integer().

Get the current version of a stream.