reckon_evoq_adapter (reckon_evoq v2.0.0)

View Source

Gateway adapter implementation for evoq

Implements the adapter behaviors using reckon_gater_api to route all operations through the reckon-gater load balancer.

This adapter ensures that evoq never directly calls reckon-db modules, instead routing through the gateway for: - Automatic retry with exponential backoff - Load balancing across workers - High availability

Summary

Functions

Append events to a stream via gateway.

Delete all snapshots via gateway.

Delete snapshot at version via gateway.

Delete a stream via gateway.

Check if stream exists via gateway.

Get subscription by name via gateway.

Get checkpoint for subscription via gateway.

Check if a store contains at least one event.

List subscriptions via gateway.

List all streams via gateway.

List snapshot versions via gateway.

Read the latest snapshot via gateway.

Read events from a stream via gateway.

Read all events from a stream via gateway.

Read all events across all streams in global order via gateway.

Read snapshot at specific version via gateway.

Read events by type via gateway.

Read events by tags via gateway (default: ANY match).

Read events by tags via gateway with match mode.

Unsubscribe from events via gateway.

Get current stream version via gateway.

Types

event/0

-type event() ::
          #event{event_id :: binary(),
                 event_type :: binary(),
                 stream_id :: binary(),
                 version :: non_neg_integer(),
                 data :: map() | binary(),
                 metadata :: map(),
                 tags :: [binary()] | undefined,
                 timestamp :: integer(),
                 epoch_us :: integer(),
                 data_content_type :: binary(),
                 metadata_content_type :: binary()}.

evoq_event/0

-type evoq_event() ::
          #evoq_event{event_id :: binary(),
                      event_type :: binary(),
                      stream_id :: binary(),
                      version :: non_neg_integer(),
                      data :: map() | binary(),
                      metadata :: map(),
                      tags :: [binary()] | undefined,
                      timestamp :: integer(),
                      epoch_us :: integer(),
                      data_content_type :: binary(),
                      metadata_content_type :: binary()}.

evoq_snapshot/0

-type evoq_snapshot() ::
          #evoq_snapshot{stream_id :: binary(),
                         version :: non_neg_integer(),
                         data :: map() | binary(),
                         metadata :: map(),
                         timestamp :: integer()}.

evoq_subscription/0

-type evoq_subscription() ::
          #evoq_subscription{id :: binary(),
                             type :: evoq_subscription_type(),
                             selector :: binary() | map(),
                             subscription_name :: binary(),
                             subscriber_pid :: pid() | undefined,
                             created_at :: integer(),
                             pool_size :: pos_integer(),
                             checkpoint :: non_neg_integer() | undefined,
                             options :: map()}.

evoq_subscription_type/0

-type evoq_subscription_type() :: stream | event_type | event_pattern | event_payload | tags.

Functions

ack(StoreId, SubscriptionName, StreamId, Position)

-spec ack(atom(), binary(), binary() | undefined, non_neg_integer()) -> ok | {error, term()}.

Acknowledge event via gateway.

append(StoreId, StreamId, ExpectedVersion, Events)

-spec append(atom(), binary(), integer(), [map()]) -> {ok, non_neg_integer()} | {error, term()}.

Append events to a stream via gateway.

Events from evoq have a flat structure, but reckon_db expects events with a nested data field. This function transforms the events to the format expected by reckon_db.

delete(StoreId, StreamId)

-spec delete(atom(), binary()) -> ok | {error, term()}.

Delete all snapshots via gateway.

delete_at_version(StoreId, StreamId, Version)

-spec delete_at_version(atom(), binary(), non_neg_integer()) -> ok | {error, term()}.

Delete snapshot at version via gateway.

delete_stream(StoreId, StreamId)

-spec delete_stream(atom(), binary()) -> ok | {error, term()}.

Delete a stream via gateway.

event_type_summary(StoreId)

-spec event_type_summary(atom()) -> {ok, [map()]} | {error, term()}.

exists(StoreId, StreamId)

-spec exists(atom(), binary()) -> boolean().

Check if stream exists via gateway.

get_by_name(StoreId, SubscriptionName)

-spec get_by_name(atom(), binary()) -> {ok, evoq_subscription()} | {error, not_found | term()}.

Get subscription by name via gateway.

get_checkpoint(StoreId, SubscriptionName)

-spec get_checkpoint(atom(), binary()) -> {ok, non_neg_integer()} | {error, not_found | term()}.

Get checkpoint for subscription via gateway.

has_events(StoreId)

-spec has_events(atom()) -> boolean().

Check if a store contains at least one event.

list(StoreId)

-spec list(atom()) -> {ok, [evoq_subscription()]} | {error, term()}.

List subscriptions via gateway.

list_all_snapshots(StoreId)

-spec list_all_snapshots(atom()) -> {ok, [map()]} | {error, term()}.

list_streams(StoreId)

-spec list_streams(atom()) -> {ok, [binary()]} | {error, term()}.

List all streams via gateway.

list_subscriptions(StoreId)

-spec list_subscriptions(atom()) -> {ok, [map()]} | {error, term()}.

list_versions(StoreId, StreamId)

-spec list_versions(atom(), binary()) -> {ok, [non_neg_integer()]} | {error, term()}.

List snapshot versions via gateway.

read(StoreId, StreamId)

-spec read(atom(), binary()) -> {ok, evoq_snapshot()} | {error, not_found | term()}.

Read the latest snapshot via gateway.

read(StoreId, StreamId, StartVersion, Count, Direction)

-spec read(atom(), binary(), non_neg_integer(), pos_integer(), forward | backward) ->
              {ok, [evoq_event()]} | {error, term()}.

Read events from a stream via gateway.

For event sourcing, a non-existent stream just means no events yet, so we translate stream_not_found to an empty list.

read_all(StoreId, StreamId, Direction)

-spec read_all(atom(), binary(), forward | backward) -> {ok, [evoq_event()]} | {error, term()}.

Read all events from a stream via gateway.

read_all_global(StoreId, Offset, BatchSize)

-spec read_all_global(atom(), non_neg_integer(), pos_integer()) ->
                         {ok, [evoq_event()]} | {error, term()}.

Read all events across all streams in global order via gateway.

Returns events sorted by epoch_us, starting from Offset. Used for catch-up subscriptions and global event replay.

read_at_version(StoreId, StreamId, Version)

-spec read_at_version(atom(), binary(), non_neg_integer()) ->
                         {ok, evoq_snapshot()} | {error, not_found | term()}.

Read snapshot at specific version via gateway.

read_by_event_types(StoreId, EventTypes, BatchSize)

-spec read_by_event_types(atom(), [binary()], pos_integer()) -> {ok, [evoq_event()]} | {error, term()}.

Read events by type via gateway.

Uses the server-side native Khepri filtering for efficient type-based queries. Events are filtered at the database level, avoiding loading all events into memory.

read_by_tags(StoreId, Tags, BatchSize)

-spec read_by_tags(atom(), [binary()], pos_integer()) -> {ok, [evoq_event()]} | {error, term()}.

Read events by tags via gateway (default: ANY match).

Tags provide cross-stream querying for the process-centric model. Use this to find all events related to specific participants.

read_by_tags(StoreId, Tags, Match, BatchSize)

-spec read_by_tags(atom(), [binary()], any | all, pos_integer()) ->
                      {ok, [evoq_event()]} | {error, term()}.

Read events by tags via gateway with match mode.

Tags provide cross-stream querying for the process-centric model. Events are filtered at the database level.

Match modes: any - Return events matching ANY of the tags (union) all - Return events matching ALL of the tags (intersection)

save(StoreId, StreamId, Version, Data, Metadata)

-spec save(atom(), binary(), non_neg_integer(), map() | binary(), map()) -> ok | {error, term()}.

Save a snapshot via gateway.

store_stats(StoreId)

-spec store_stats(atom()) -> {ok, map()} | {error, term()}.

stream_info(StoreId, StreamId)

-spec stream_info(atom(), binary()) -> {ok, map()} | {error, term()}.

subscribe(StoreId, Type, Selector, SubscriptionName, Opts)

-spec subscribe(atom(), evoq_subscription_type(), binary() | map(), binary(), map()) ->
                   {ok, binary()} | {error, term()}.

Subscribe to events via gateway.

When a subscriber_pid is provided, a bridge process is spawned that receives raw #event{} records from ReckonDB and translates them to #evoq_event{} records before forwarding to the subscriber.

The subscriber always receives: {events, [#evoq_event{}]}

subscription_lag(StoreId, SubscriptionName)

-spec subscription_lag(atom(), binary()) -> {ok, map()} | {error, term()}.

unsubscribe(StoreId, SubscriptionId)

-spec unsubscribe(atom(), binary()) -> ok | {error, term()}.

Unsubscribe from events via gateway.

version(StoreId, StreamId)

-spec version(atom(), binary()) -> integer().

Get current stream version via gateway.