Counterpoint.ReadAppender (counterpoint v0.1.0)

Copy Markdown View Source

Tracks event reads so appends can enforce optimistic concurrency.

Each call to read_events/2 records the query and the highest position seen. When append_event/2 or append_events/2 is later called, the DCB store checks that no new events matching any recorded query have been appended since those positions. If they have, the append fails with {:error, :append_condition_failed} and Counterpoint.CommandRunner retries the entire command from scratch.

You typically import Counterpoint.ReadAppender inside a command module to bring read_events/2, append_event/2, and append_events/2 into scope without qualifying them.

Example

import Counterpoint.ReadAppender
alias Counterpoint.Query

def run(%__MODULE__{order_id: id}, ra) do
  {events, ra} = read_events(ra, Query.new() |> Query.add_item(tags: ["order_id:#{id}"]))
  # ... validate ...
  append_event(ra, %OrderPlaced{order_id: id})
end

Summary

Functions

Serialize and append a single event, guarded by all positions recorded so far.

Append multiple pre-serialized events atomically, guarded by recorded positions.

Create a new ReadAppender bound to the given store.

Read events matching query and record the read position for concurrency checks.

Types

t()

@type t() :: %Counterpoint.ReadAppender{
  records: [{map(), binary() | nil}],
  store_name: atom()
}

Functions

append_event(ra, event)

Serialize and append a single event, guarded by all positions recorded so far.

Returns {:ok, _} on success or {:error, :append_condition_failed} on a concurrent-write conflict.

append_events(ra, events)

Append multiple pre-serialized events atomically, guarded by recorded positions.

Prefer append_event/2 for single events. Use this when you need to emit several events in one atomic write.

new(store_name)

Create a new ReadAppender bound to the given store.

read_events(ra, query)

Read events matching query and record the read position for concurrency checks.

Returns {envelopes, updated_ra}. Always use the returned updated_ra for subsequent reads and appends — it carries the accumulated read positions.