reckon_evoq_adapter (reckon_evoq v2.5.0)
View SourceGateway adapter implementation for evoq
Implements the adapter behaviors using reckon_gater_api to route all operations through the reckon-gater load balancer.
This adapter ensures that evoq never directly calls reckon-db modules, instead routing through the gateway for: - Automatic retry with exponential backoff - Load balancing across workers - High availability
Summary
Functions
Acknowledge event via gateway.
Append events to a stream via gateway.
Conditionally append events under the DCB pseudo-stream (Dynamic Consistency Boundary, paired with reckon-db 3.1.0+).
Delete all snapshots via gateway.
Delete snapshot at version via gateway.
Delete a stream via gateway.
Check if stream exists via gateway.
Get subscription by name via gateway.
Get checkpoint for subscription via gateway.
Check if a store contains at least one event.
List subscriptions via gateway.
List all streams via gateway.
List snapshot versions via gateway.
Read the latest snapshot via gateway.
Read events from a stream via gateway.
Read all events from a stream via gateway.
Read all events across all streams in global order via gateway.
Read snapshot at specific version via gateway.
Read events by type via gateway.
Read events by a metadata key=value pair via gateway.
Read events by tags via gateway (default: ANY match).
Read events by tags via gateway with match mode.
Save a snapshot via gateway.
Subscribe to events via gateway.
Unsubscribe from events via gateway.
Get current stream version via gateway.
Types
-type event() :: #event{event_id :: binary(), event_type :: binary(), stream_id :: binary(), version :: non_neg_integer(), data :: map() | binary(), metadata :: map(), tags :: [binary()] | undefined, timestamp :: integer(), epoch_us :: integer(), data_content_type :: binary(), metadata_content_type :: binary(), prev_event_hash :: binary() | undefined, mac :: {KeyId :: non_neg_integer(), MacBytes :: binary()} | undefined, signature :: binary() | undefined}.
-type evoq_event() :: #evoq_event{event_id :: binary(), event_type :: binary(), stream_id :: binary(), version :: non_neg_integer(), data :: map() | binary(), metadata :: map(), tags :: [binary()] | undefined, timestamp :: integer(), epoch_us :: integer(), data_content_type :: binary(), metadata_content_type :: binary(), prev_event_hash :: binary() | undefined}.
-type evoq_subscription() :: #evoq_subscription{id :: binary(), type :: evoq_subscription_type(), selector :: binary() | map(), subscription_name :: binary(), subscriber_pid :: pid() | undefined, created_at :: integer(), pool_size :: pos_integer(), checkpoint :: non_neg_integer() | undefined, options :: map()}.
-type evoq_subscription_type() :: stream | event_type | event_pattern | event_payload | tags.
Functions
-spec ack(atom(), binary(), binary() | undefined, non_neg_integer()) -> ok | {error, term()}.
Acknowledge event via gateway.
Append events to a stream via gateway.
Events from evoq have a flat structure, but reckon_db expects events with a nested data field. This function transforms the events to the format expected by reckon_db.
-spec append_if_no_tag_matches(atom(), term(), integer(), [map()]) -> {ok, non_neg_integer()} | {error, {context_changed, non_neg_integer()}} | {error, no_events} | {error, integrity_not_supported_in_dcb_v1} | {error, term()}.
Conditionally append events under the DCB pseudo-stream (Dynamic Consistency Boundary, paired with reckon-db 3.1.0+).
Unlike append/4, the precondition is a tag-filter context query rather than a stream-version check. Returns {error, {context_changed, MaxSeq}} when any event matching TagFilter has seq above SeqCutoff.
TagFilter is reckon_gater_types:tag_filter() (see reckon-gater 2.3.0+ for the canonical type). SeqCutoff is an integer; -1 means "saw nothing yet".
Events are transformed via the same evoq_to_reckon_event/1 shim as append/4, so callers pass evoq-shaped events and the adapter bridges to the reckon_db payload shape.
Delete all snapshots via gateway.
-spec delete_at_version(atom(), binary(), non_neg_integer()) -> ok | {error, term()}.
Delete snapshot at version via gateway.
Delete a stream via gateway.
Check if stream exists via gateway.
-spec get_by_name(atom(), binary()) -> {ok, evoq_subscription()} | {error, not_found | term()}.
Get subscription by name via gateway.
-spec get_checkpoint(atom(), binary()) -> {ok, non_neg_integer()} | {error, not_found | term()}.
Get checkpoint for subscription via gateway.
Check if a store contains at least one event.
-spec list(atom()) -> {ok, [evoq_subscription()]} | {error, term()}.
List subscriptions via gateway.
List all streams via gateway.
-spec list_versions(atom(), binary()) -> {ok, [non_neg_integer()]} | {error, term()}.
List snapshot versions via gateway.
-spec read(atom(), binary()) -> {ok, evoq_snapshot()} | {error, not_found | term()}.
Read the latest snapshot via gateway.
-spec read(atom(), binary(), non_neg_integer(), pos_integer(), forward | backward) -> {ok, [evoq_event()]} | {error, term()}.
Read events from a stream via gateway.
For event sourcing, a non-existent stream just means no events yet, so we translate stream_not_found to an empty list.
-spec read_all(atom(), binary(), forward | backward) -> {ok, [evoq_event()]} | {error, term()}.
Read all events from a stream via gateway.
-spec read_all_global(atom(), non_neg_integer(), pos_integer()) -> {ok, [evoq_event()]} | {error, term()}.
Read all events across all streams in global order via gateway.
Returns events sorted by epoch_us, starting from Offset. Used for catch-up subscriptions and global event replay.
-spec read_at_version(atom(), binary(), non_neg_integer()) -> {ok, evoq_snapshot()} | {error, not_found | term()}.
Read snapshot at specific version via gateway.
-spec read_by_event_types(atom(), [binary()], pos_integer()) -> {ok, [evoq_event()]} | {error, term()}.
Read events by type via gateway.
Uses the server-side native Khepri filtering for efficient type-based queries. Events are filtered at the database level, avoiding loading all events into memory.
-spec read_by_metadata(atom(), binary(), binary()) -> {ok, [evoq_event()]} | {error, term()}.
Read events by a metadata key=value pair via gateway.
The cross-cutting lineage primitive: events whose metadata Key equals Value (e.g. all events with causation_id == some event id). O(matches) when the store declared the {meta, Key} index, else a server-side scan.
-spec read_by_tags(atom(), [binary()], pos_integer()) -> {ok, [evoq_event()]} | {error, term()}.
Read events by tags via gateway (default: ANY match).
Tags provide cross-stream querying for the process-centric model. Use this to find all events related to specific participants.
-spec read_by_tags(atom(), [binary()], any | all, pos_integer()) -> {ok, [evoq_event()]} | {error, term()}.
Read events by tags via gateway with match mode.
Tags provide cross-stream querying for the process-centric model. Events are filtered at the database level.
Match modes: any - Return events matching ANY of the tags (union) all - Return events matching ALL of the tags (intersection)
Save a snapshot via gateway.
-spec subscribe(atom(), evoq_subscription_type(), binary() | map(), binary(), map()) -> {ok, binary()} | {error, term()}.
Subscribe to events via gateway.
When a subscriber_pid is provided, a bridge process is spawned that receives raw #event{} records from ReckonDB and translates them to #evoq_event{} records before forwarding to the subscriber.
The subscriber always receives: {events, [#evoq_event{}]}
Unsubscribe from events via gateway.
Get current stream version via gateway.