reckon_db_links (reckon_db v2.3.7)

View Source

Server-side stream linking and projections for reckon-db.

A *link* is a derived stream computed from one or more source streams by applying an optional filter predicate and an optional transform function. The result is persisted under a stable name and exposed as just-another-subscribable-stream — consumers read it like any user stream.

This is the same primitive Greg Young's EventStoreDB exposes as *projections*: $ce-account (category events for account-* streams), $et-UserCreated (all events of one type), or arbitrary user-defined JavaScript projections that emit into custom destination streams. reckon-db's links cover the same conceptual ground with native Erlang funs instead of a scripting layer.

Reckon-db already has rich typed subscription filters (by_event_type, by_event_pattern, by_event_payload, by_tags) that filter the event stream on the fly. So why a separate "link" mechanism?

  • **Transform.** Typed subscriptions can only filter. Links can rewrite events (add fields, re-key, redact, denormalise) before delivery. The transform is server-side, deterministic, and replayable.
  • **Materialisation.** Typed subscriptions are recomputed for every consumer. A link computes once, persists the derived events on disk, and serves them like any other stream — N consumers cost N stream-reads, not N×(source-scans + filter + transform).
  • **Stable named subscription targets.** Dynamically-created source streams (order-018f6a..., order-018f6b..., ...) can't be subscribed to as a group via the bare-id selector; you'd need a wildcard pattern in the consumer. A link ($link:orders filtered to source pattern order-*) gives you one durable, named, listable target.
  • **Replayability.** A new consumer attaching to a link replays the persisted derived events from the start, in order, deterministically — same shape every time, regardless of how the source streams have grown. Typed subscriptions replay by re-scanning the entire global log and re-applying the predicate, which is correct but expensive at scale.
  • **Composability.** Links can source from other links — $link:high-value-orders built on top of $link:orders. Useful for incremental refinement of a view.
  • **Operational visibility.** Named derived streams show up in store listings (lazyreckon's streams pane, for example) — operators can see "what derived views does this store maintain" at a glance. Typed subscriptions are ephemeral client state and invisible to tooling.

Stream-id namespace

Link streams live in the $ system namespace:

$link:<human-readable-name>

e.g. $link:high-value-orders. The $ prefix is the reckon-db convention for "system / projected stream, not user data" — same role as EventStoreDB's $ce- / $et- / $by_category / $stats- prefixes. User streams follow the <prefix>-<hex> format (e.g. account-018f6a7b8c9d4abc...). Both forms are accepted as subscription selectors via reckon_db_filters:by_stream/1.

Link subscriptions themselves get a related namespace:

$link-sub:<link-name>

so subscriptions managed by the link engine are distinguishable from user subscriptions in operational tooling.

Use cases (worked examples)

1. Materialised category view. Collate every event from all order-<hex> streams into one subscribable stream:

  reckon_db_links:create(my_store, #{
      name   => <<"orders">>,
      source => #{type => stream_pattern, pattern => <<"order-*">>}
  }).
  reckon_db_subscriptions:subscribe(
      my_store, stream, <<"$link:orders">>, <<"orders-projector">>).

2. Filtered derived stream. Just the high-value subset:

  reckon_db_links:create(my_store, #{
      name   => <<"high-value-orders">>,
      source => #{type => stream_pattern, pattern => <<"order-*">>},
      filter => fun(E) ->
          maps:get(total, E#event.data, 0) > 1000
      end
  }).

3. Transform / re-shape. Strip PII from outbound webhook events:

  reckon_db_links:create(my_store, #{
      name      => <<"webhook-feed">>,
      source    => #{type => stream_pattern, pattern => <<"order-*">>},
      transform => fun(E) ->
          D2 = maps:without([customer_email, customer_phone],
                            E#event.data),
          E#event{data = D2}
      end
  }).

4. Composition. Layer a filter on top of an existing link:

  reckon_db_links:create(my_store, #{
      name   => <<"high-value-webhook">>,
      source => #{type => stream, name => <<"$link:webhook-feed">>},
      filter => fun(E) ->
          maps:get(total, E#event.data, 0) > 1000
      end
  }).
  • Single-consumer one-off filter — a by_event_type or by_tags subscription is cheaper and avoids persistent derived data.
  • Read model that lives in a database (postgres, sqlite, etc) — subscribe directly with the appropriate typed filter and project to the database. The link layer adds no value when the materialised target is elsewhere.
  • Aggregation across many sources where the math is non-trivial — links emit one derived event per source event; they don't fold. For aggregation, use a process manager.

Status

This module ships with the initial reckon-db release. The feature is wired but **not yet exercised in production**; treat as preview while we accumulate real usage.

Summary

Functions

Create a new link.

Delete a link.

Get a link by name.

Get detailed link info.

List all links.

Start processing a link.

Stop processing a link.

Types

event/0

-type event() ::
          #event{event_id :: binary(),
                 event_type :: binary(),
                 stream_id :: binary(),
                 version :: non_neg_integer(),
                 data :: map() | binary(),
                 metadata :: map(),
                 tags :: [binary()] | undefined,
                 timestamp :: integer(),
                 epoch_us :: integer(),
                 data_content_type :: binary(),
                 metadata_content_type :: binary(),
                 prev_event_hash :: binary() | undefined,
                 mac :: {KeyId :: non_neg_integer(), MacBytes :: binary()} | undefined,
                 signature :: binary() | undefined}.

filter_fun/0

-type filter_fun() :: fun((event()) -> boolean()).

source_spec/0

-type source_spec() ::
          #{type := stream | stream_pattern | all, stream_id => binary(), pattern => binary()}.

transform_fun/0

-type transform_fun() :: fun((event()) -> event()).

Functions

create(StoreId, Spec)

-spec create(atom(), link_spec()) -> ok | {error, term()}.

Create a new link.

Options: - name: Link name (required, will create stream with $link prefix) - source: Source specification (stream, stream_pattern, or all) - filter: Predicate function to filter events - transform: Function to transform events - backfill: Process existing events (default: false)

delete(StoreId, Name)

-spec delete(atom(), binary()) -> ok | {error, term()}.

Delete a link.

get(StoreId, Name)

-spec get(atom(), binary()) -> {ok, link_info()} | {error, not_found}.

Get a link by name.

info(StoreId, Name)

-spec info(atom(), binary()) -> {ok, map()} | {error, not_found}.

Get detailed link info.

list(StoreId)

-spec list(atom()) -> {ok, [link_info()]} | {error, term()}.

List all links.

start(StoreId, Name)

-spec start(atom(), binary()) -> ok | {error, term()}.

Start processing a link.

This will: 1. Subscribe to source stream(s) 2. Optionally backfill existing events 3. Apply filter and transform to each event 4. Write matching events to the link stream

stop(StoreId, Name)

-spec stop(atom(), binary()) -> ok | {error, term()}.

Stop processing a link.