reckon_evoq_adapter (reckon_evoq v2.4.0)

View Source

Gateway adapter implementation for evoq

Implements the adapter behaviors using reckon_gater_api to route all operations through the reckon-gater load balancer.

This adapter ensures that evoq never directly calls reckon-db modules, instead routing through the gateway for: - Automatic retry with exponential backoff - Load balancing across workers - High availability

Summary

Functions

Append events to a stream via gateway.

Conditionally append events under the DCB pseudo-stream (Dynamic Consistency Boundary, paired with reckon-db 3.1.0+).

Delete all snapshots via gateway.

Delete snapshot at version via gateway.

Delete a stream via gateway.

Check if stream exists via gateway.

Get subscription by name via gateway.

Get checkpoint for subscription via gateway.

Check if a store contains at least one event.

List subscriptions via gateway.

List all streams via gateway.

List snapshot versions via gateway.

Read the latest snapshot via gateway.

Read events from a stream via gateway.

Read all events from a stream via gateway.

Read all events across all streams in global order via gateway.

Read snapshot at specific version via gateway.

Read events by type via gateway.

Read events by a metadata key=value pair via gateway.

Read events by tags via gateway (default: ANY match).

Read events by tags via gateway with match mode.

Unsubscribe from events via gateway.

Get current stream version via gateway.

Types

event/0

-type event() ::
          #event{event_id :: binary(),
                 event_type :: binary(),
                 stream_id :: binary(),
                 version :: non_neg_integer(),
                 data :: map() | binary(),
                 metadata :: map(),
                 tags :: [binary()] | undefined,
                 timestamp :: integer(),
                 epoch_us :: integer(),
                 data_content_type :: binary(),
                 metadata_content_type :: binary(),
                 prev_event_hash :: binary() | undefined,
                 mac :: {KeyId :: non_neg_integer(), MacBytes :: binary()} | undefined,
                 signature :: binary() | undefined}.

evoq_event/0

-type evoq_event() ::
          #evoq_event{event_id :: binary(),
                      event_type :: binary(),
                      stream_id :: binary(),
                      version :: non_neg_integer(),
                      data :: map() | binary(),
                      metadata :: map(),
                      tags :: [binary()] | undefined,
                      timestamp :: integer(),
                      epoch_us :: integer(),
                      data_content_type :: binary(),
                      metadata_content_type :: binary(),
                      prev_event_hash :: binary() | undefined}.

evoq_snapshot/0

-type evoq_snapshot() ::
          #evoq_snapshot{stream_id :: binary(),
                         version :: non_neg_integer(),
                         data :: map() | binary(),
                         metadata :: map(),
                         timestamp :: integer()}.

evoq_subscription/0

-type evoq_subscription() ::
          #evoq_subscription{id :: binary(),
                             type :: evoq_subscription_type(),
                             selector :: binary() | map(),
                             subscription_name :: binary(),
                             subscriber_pid :: pid() | undefined,
                             created_at :: integer(),
                             pool_size :: pos_integer(),
                             checkpoint :: non_neg_integer() | undefined,
                             options :: map()}.

evoq_subscription_type/0

-type evoq_subscription_type() :: stream | event_type | event_pattern | event_payload | tags.

Functions

ack(StoreId, SubscriptionName, StreamId, Position)

-spec ack(atom(), binary(), binary() | undefined, non_neg_integer()) -> ok | {error, term()}.

Acknowledge event via gateway.

append(StoreId, StreamId, ExpectedVersion, Events)

-spec append(atom(), binary(), integer(), [map()]) -> {ok, non_neg_integer()} | {error, term()}.

Append events to a stream via gateway.

Events from evoq have a flat structure, but reckon_db expects events with a nested data field. This function transforms the events to the format expected by reckon_db.

append_if_no_tag_matches(StoreId, TagFilter, SeqCutoff, Events)

-spec append_if_no_tag_matches(atom(), term(), integer(), [map()]) ->
                                  {ok, non_neg_integer()} |
                                  {error, {context_changed, non_neg_integer()}} |
                                  {error, no_events} |
                                  {error, integrity_not_supported_in_dcb_v1} |
                                  {error, term()}.

Conditionally append events under the DCB pseudo-stream (Dynamic Consistency Boundary, paired with reckon-db 3.1.0+).

Unlike append/4, the precondition is a tag-filter context query rather than a stream-version check. Returns {error, {context_changed, MaxSeq}} when any event matching TagFilter has seq above SeqCutoff.

TagFilter is reckon_gater_types:tag_filter() (see reckon-gater 2.3.0+ for the canonical type). SeqCutoff is an integer; -1 means "saw nothing yet".

Events are transformed via the same evoq_to_reckon_event/1 shim as append/4, so callers pass evoq-shaped events and the adapter bridges to the reckon_db payload shape.

delete(StoreId, StreamId)

-spec delete(atom(), binary()) -> ok | {error, term()}.

Delete all snapshots via gateway.

delete_at_version(StoreId, StreamId, Version)

-spec delete_at_version(atom(), binary(), non_neg_integer()) -> ok | {error, term()}.

Delete snapshot at version via gateway.

delete_stream(StoreId, StreamId)

-spec delete_stream(atom(), binary()) -> ok | {error, term()}.

Delete a stream via gateway.

event_type_summary(StoreId)

-spec event_type_summary(atom()) -> {ok, [map()]} | {error, term()}.

exists(StoreId, StreamId)

-spec exists(atom(), binary()) -> boolean().

Check if stream exists via gateway.

get_by_name(StoreId, SubscriptionName)

-spec get_by_name(atom(), binary()) -> {ok, evoq_subscription()} | {error, not_found | term()}.

Get subscription by name via gateway.

get_checkpoint(StoreId, SubscriptionName)

-spec get_checkpoint(atom(), binary()) -> {ok, non_neg_integer()} | {error, not_found | term()}.

Get checkpoint for subscription via gateway.

has_events(StoreId)

-spec has_events(atom()) -> boolean().

Check if a store contains at least one event.

list(StoreId)

-spec list(atom()) -> {ok, [evoq_subscription()]} | {error, term()}.

List subscriptions via gateway.

list_all_snapshots(StoreId)

-spec list_all_snapshots(atom()) -> {ok, [map()]} | {error, term()}.

list_streams(StoreId)

-spec list_streams(atom()) -> {ok, [binary()]} | {error, term()}.

List all streams via gateway.

list_subscriptions(StoreId)

-spec list_subscriptions(atom()) -> {ok, [map()]} | {error, term()}.

list_versions(StoreId, StreamId)

-spec list_versions(atom(), binary()) -> {ok, [non_neg_integer()]} | {error, term()}.

List snapshot versions via gateway.

read(StoreId, StreamId)

-spec read(atom(), binary()) -> {ok, evoq_snapshot()} | {error, not_found | term()}.

Read the latest snapshot via gateway.

read(StoreId, StreamId, StartVersion, Count, Direction)

-spec read(atom(), binary(), non_neg_integer(), pos_integer(), forward | backward) ->
              {ok, [evoq_event()]} | {error, term()}.

Read events from a stream via gateway.

For event sourcing, a non-existent stream just means no events yet, so we translate stream_not_found to an empty list.

read_all(StoreId, StreamId, Direction)

-spec read_all(atom(), binary(), forward | backward) -> {ok, [evoq_event()]} | {error, term()}.

Read all events from a stream via gateway.

read_all_global(StoreId, Offset, BatchSize)

-spec read_all_global(atom(), non_neg_integer(), pos_integer()) ->
                         {ok, [evoq_event()]} | {error, term()}.

Read all events across all streams in global order via gateway.

Returns events sorted by epoch_us, starting from Offset. Used for catch-up subscriptions and global event replay.

read_at_version(StoreId, StreamId, Version)

-spec read_at_version(atom(), binary(), non_neg_integer()) ->
                         {ok, evoq_snapshot()} | {error, not_found | term()}.

Read snapshot at specific version via gateway.

read_by_event_types(StoreId, EventTypes, BatchSize)

-spec read_by_event_types(atom(), [binary()], pos_integer()) -> {ok, [evoq_event()]} | {error, term()}.

Read events by type via gateway.

Uses the server-side native Khepri filtering for efficient type-based queries. Events are filtered at the database level, avoiding loading all events into memory.

read_by_metadata(StoreId, Key, Value)

-spec read_by_metadata(atom(), binary(), binary()) -> {ok, [evoq_event()]} | {error, term()}.

Read events by a metadata key=value pair via gateway.

The cross-cutting lineage primitive: events whose metadata Key equals Value (e.g. all events with causation_id == some event id). O(matches) when the store declared the {meta, Key} index, else a server-side scan.

read_by_tags(StoreId, Tags, BatchSize)

-spec read_by_tags(atom(), [binary()], pos_integer()) -> {ok, [evoq_event()]} | {error, term()}.

Read events by tags via gateway (default: ANY match).

Tags provide cross-stream querying for the process-centric model. Use this to find all events related to specific participants.

read_by_tags(StoreId, Tags, Match, BatchSize)

-spec read_by_tags(atom(), [binary()], any | all, pos_integer()) ->
                      {ok, [evoq_event()]} | {error, term()}.

Read events by tags via gateway with match mode.

Tags provide cross-stream querying for the process-centric model. Events are filtered at the database level.

Match modes: any - Return events matching ANY of the tags (union) all - Return events matching ALL of the tags (intersection)

save(StoreId, StreamId, Version, Data, Metadata)

-spec save(atom(), binary(), non_neg_integer(), map() | binary(), map()) -> ok | {error, term()}.

Save a snapshot via gateway.

store_stats(StoreId)

-spec store_stats(atom()) -> {ok, map()} | {error, term()}.

stream_info(StoreId, StreamId)

-spec stream_info(atom(), binary()) -> {ok, map()} | {error, term()}.

subscribe(StoreId, Type, Selector, SubscriptionName, Opts)

-spec subscribe(atom(), evoq_subscription_type(), binary() | map(), binary(), map()) ->
                   {ok, binary()} | {error, term()}.

Subscribe to events via gateway.

When a subscriber_pid is provided, a bridge process is spawned that receives raw #event{} records from ReckonDB and translates them to #evoq_event{} records before forwarding to the subscriber.

The subscriber always receives: {events, [#evoq_event{}]}

subscription_lag(StoreId, SubscriptionName)

-spec subscription_lag(atom(), binary()) -> {ok, map()} | {error, term()}.

unsubscribe(StoreId, SubscriptionId)

-spec unsubscribe(atom(), binary()) -> ok | {error, term()}.

Unsubscribe from events via gateway.

version(StoreId, StreamId)

-spec version(atom(), binary()) -> integer().

Get current stream version via gateway.