Counterpoint is a CQRS / event-sourcing toolkit built on top of the DCB event store.
Core concepts
| Building block | Role |
|---|---|
Counterpoint.Event | Domain event: serialisable struct stored in the log |
Counterpoint.Command | Write side: reads events, validates, appends new ones |
Counterpoint.CommandWithEffect | Like Command but with external deps injected at runtime |
Counterpoint.OnDemandProjection | Read side: folds events into state on every call |
Counterpoint.Projection | Simpler read side without limit/reverse support |
Counterpoint.Query | Composable filter for event types and tags |
Counterpoint.Envelope | Wraps a deserialized event with store metadata |
Counterpoint.ReadAppender | Captures read positions for optimistic-concurrency appends |
Counterpoint.EventRegistry | Maps type-strings back to modules for deserialization |
Quick start
1. Define an event
defmodule MyApp.Events.OrderPlaced do
use Counterpoint.Event
defstruct [:order_id, :total]
def tags(%__MODULE__{order_id: id}), do: ["order_id:#{id}"]
def to_map(%__MODULE__{order_id: id, total: t}), do: %{"order_id" => id, "total" => t}
def from_map(%{"order_id" => id, "total" => t}), do: %__MODULE__{order_id: id, total: t}
end2. Define a command
defmodule MyApp.Commands.PlaceOrder do
use Counterpoint.Command
import Counterpoint.ReadAppender
alias Counterpoint.Query
alias MyApp.Events.OrderPlaced
defstruct [:order_id, :total]
@impl Counterpoint.Command
def run(%__MODULE__{order_id: id, total: total}, ra) do
{existing, ra} =
read_events(ra, Query.new() |> Query.add_item(types: [OrderPlaced], tags: ["order_id:#{id}"]))
if Enum.any?(existing) do
{:error, :already_placed}
else
append_event(ra, %OrderPlaced{order_id: id, total: total})
end
end
end3. Define a projection
defmodule MyApp.Views.OrderSummary do
use Counterpoint.OnDemandProjection
alias Counterpoint.Query
alias MyApp.Events.OrderPlaced
defstruct [:order_id, :total]
def query(order_id),
do: Query.new() |> Query.add_item(types: [OrderPlaced], tags: ["order_id:#{order_id}"])
def init, do: %__MODULE__{}
def apply(state, %Counterpoint.Envelope{data: %OrderPlaced{order_id: id, total: t}}),
do: %{state | order_id: id, total: t}
end4. Wire up the supervisor
Counterpoint.Supervisor.start_link(
store: [name: :my_store, namespace: "my_app"],
events: [MyApp.Events.OrderPlaced]
)5. Run a command and query the projection
Counterpoint.CommandRunner.run(:my_store, %MyApp.Commands.PlaceOrder{order_id: "1", total: 42})
Counterpoint.OnDemandProjection.run(MyApp.Views.OrderSummary, :my_store, "1")