An aggregate is a consistency boundary: a single entity (an account, an order, a
user) that decides whether a command is allowed and what should happen as a result.
In X3m.System an aggregate is event-sourced by default — its state is not stored
directly but rebuilt by replaying the events it has produced. (You can also persist
state directly instead; see State persistence.)
This guide builds a small bank-account aggregate. It assumes you're comfortable with the messaging layer; aggregates plug into the same routers and dispatcher.
The moving parts
| Piece | Responsibility |
|---|---|
X3m.System.Aggregate | decides how a command is handled and how events change state |
X3m.System.MessageHandler | loads the aggregate, runs the command, persists events, replies |
X3m.System.Aggregate.Repo | reads and writes the event stream (you implement it) |
X3m.System.Router | routes a service call to the message handler |
How they collaborate when a command is dispatched — either the aggregate is already in memory, or it must be rehydrated from its event stream first — then the new events are persisted:
sequenceDiagram
participant D as Dispatcher
participant R as Router
participant MH as MessageHandler
participant Repo as Aggregate.Repo
participant Agg as Aggregate
D->>R: command message
R->>R: authorize/1
R->>MH: command message
alt aggregate already in memory (pid registered)
MH->>Agg: handle_msg(message, current state)
else not running (when_pid_is_not_registered/3)
MH->>Repo: load event stream
Repo-->>MH: past events
MH->>Agg: apply_event/2 per event (rebuild state)
MH->>Agg: handle_msg(message, rebuilt state)
end
Agg-->>MH: {:block, message + events, state}
Note over Agg: blocked — no further commands until commit + apply
MH->>Repo: save_events
MH->>Agg: apply_event/2 for new events (new state), then unblock
MH-->>D: response, e.g. {:created, id, version}With :block, the aggregate process holds further commands until the message handler has
committed and applied these events — so commands are serialized per aggregate and never run
against not-yet-persisted state. (:noblock returns immediately; nothing is persisted.)
1. The aggregate
use X3m.System.Aggregate and declare the starting state, one command handler per
command, and one apply_event/2 clause per event:
defmodule MyApp.Accounts.Aggregate do
use X3m.System.Aggregate
alias X3m.System.Message, as: SysMsg
alias MyApp.Accounts.{Commands, Events, State}
@impl X3m.System.Aggregate
def initial_state, do: %State{}
handle_msg :open_account, &Commands.Open.new/2, &handle_open/2
handle_msg :deposit, &Commands.Deposit.new/2, &handle_deposit/2
defp handle_open(%SysMsg{request: %Commands.Open{} = cmd} = msg, %State{status: :new} = state) do
event = %Events.Opened{id: cmd.id, owner: cmd.owner}
msg =
msg
|> SysMsg.add_event(event)
|> SysMsg.created(cmd.id)
{:block, msg, state}
end
defp handle_deposit(%SysMsg{request: %Commands.Deposit{} = cmd} = msg, %State{} = state) do
event = %Events.Deposited{id: state.id, amount: cmd.amount}
msg =
msg
|> SysMsg.add_event(event)
|> SysMsg.ok()
{:block, msg, state}
end
def apply_event(%Events.Opened{} = e, %State{} = state),
do: %State{state | id: e.id, owner: e.owner, status: :open}
def apply_event(%Events.Deposited{} = e, %State{} = state),
do: %State{state | balance: state.balance + e.amount}
endCommand handlers and the block/noblock contract
handle_msg/3 takes a validate function and a process function:
- The validate function (
Commands.Open.new/2) castsmessage.raw_requestinto a structured request and stores it withX3m.System.Message.put_request/2. If the request is invalid,put_request/2halts the message with a:validation_errorand the process function never runs. - The process function returns one of:
{:block, message, state}— there are events to persist. The message handler savesmessage.events, commits, and only then replies.{:noblock, message, state}— nothing to persist; the response is returned as-is (e.g. an idempotent no-op or a rejected command).
A validate function looks like this (using a plain struct here; an Ecto.Changeset
works too, since put_request/2 checks valid?):
defmodule MyApp.Accounts.Commands.Open do
defstruct [:id, :owner, valid?: true]
alias X3m.System.Message, as: SysMsg
def new(%SysMsg{raw_request: raw} = msg, _state) do
cmd = %__MODULE__{
id: raw["id"],
owner: raw["owner"],
valid?: is_binary(raw["id"]) and is_binary(raw["owner"])
}
SysMsg.put_request(cmd, msg)
end
endUse the two-argument handle_msg/2 when a command needs no separate validation step —
pass a single function that returns the {:block | :noblock, message, state} tuple.
Applying events
apply_event/2 is the only place state changes. It runs both when a command produces a
new event and when the aggregate is rehydrated from history, so it must be pure. Events
your aggregate doesn't recognise fall through to a default clause that leaves state
unchanged.
Idempotency
The macros skip a command whose message.id was already processed, returning :ok
without re-running it. Override processed_message_id/1 to pull the originating id out
of your event metadata so this survives restarts.
2. The message handler
The message handler wires the aggregate to a persistence backend and exposes one function per command:
defmodule MyApp.Accounts.MessageHandler do
use X3m.System.MessageHandler,
aggregate_mod: MyApp.Accounts.Aggregate,
aggregate_repo: MyApp.Accounts.AggregateRepo,
stream: "accounts",
pid_facade_mod: X3m.System.AggregatePidFacade,
event_metadata: %{app_version: "1.0.0"}
on_new_aggregate :open_account # id read from raw_request["id"]
on_aggregate :deposit, id: "account_id" # id read from raw_request["account_id"]
endPick the macro that matches the command's intent:
on_new_aggregate— creates a fresh aggregate. The id may be generated if missing.on_aggregate— loads an existing aggregate; responds{:error, :not_found}if there is no stream for that id.on_maybe_new_aggregate— loads the aggregate, creating it if it doesn't exist yet.
Each macro reads the aggregate id from message.raw_request (under "id" by default,
or the :id option), loads or spawns the aggregate process, runs the command, persists
any events, and enriches the response with the new version — {:ok, version} or
{:created, id, version}.
:event_metadata is merged into the metadata stored with every event (handy for
schema/app versions). :commit_timeout (per macro) bounds how long persistence may
take.
3. The event store (Aggregate.Repo)
X3m.System does not ship a concrete store — you implement X3m.System.Aggregate.Repo
against whatever you use (EventStoreDB, Postgres, an in-memory store for tests):
defmodule MyApp.Accounts.AggregateRepo do
use X3m.System.Aggregate.Repo
@impl true
def has?(stream_name), do: # ...
@impl true
def stream_events(stream_name, start_at, per_page), do: # ... enumerable of {event, number, metadata}
@impl true
def delete_stream(stream_name, hard_delete?, expected_version), do: :ok
@impl true
def save_events(stream_name, message, events_metadata), do: {:ok, _last_event_number}
endStream names are built from the handler's :stream option and the aggregate id, e.g.
"accounts-acc-1".
4. Wiring it into your app
Register the router's services, and start the per-node aggregate processes by adding
X3m.System.LocalAggregatesSupervision to your supervision tree. First, list the
aggregate modules to run locally:
defmodule MyApp.LocalAggregates do
use X3m.System.LocalAggregates, [MyApp.Accounts.Aggregate]
endThen in your application:
def start(_type, _args) do
:ok = MyApp.Router.register_services()
children = [
MyApp.Accounts.AggregateRepo,
{X3m.System.LocalAggregatesSupervision, [MyApp.LocalAggregates, MyApp]}
]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
endNow a dispatch flows end to end:
:open_account
|> X3m.System.Message.new(raw_request: %{"id" => "acc-1", "owner" => "Ada"})
|> X3m.System.Dispatcher.dispatch()
#=> %X3m.System.Message{response: {:created, "acc-1", 0}, ...}Testing the aggregate
Because an aggregate is just a module — no process, no event store, no dispatch — you
test it by calling its command functions directly and replaying the events they return.
X3m.System.Aggregate.TestSupport gives you the given/when/then pieces:
command_message/3 builds the command message, state_from_events/3 rebuilds the
given state from prior events, and apply_events/4 folds the events a command emitted
back onto that state so you can assert the result:
defmodule MyApp.Accounts.AggregateTest do
use ExUnit.Case, async: true
import X3m.System.Aggregate.TestSupport
alias MyApp.Accounts.{Aggregate, Events, State}
alias X3m.System.Message, as: SysMsg
test "open_account emits Opened and responds :created" do
# given
state = X3m.System.Aggregate.initial_state(Aggregate)
# when
msg = command_message(:open_account, %{"id" => "acc-1", "owner" => "Ada"})
assert {:block, %SysMsg{events: events, response: {:created, "acc-1"}}, _state} =
Aggregate.open_account(msg, state)
assert [%Events.Opened{id: "acc-1", owner: "Ada"}] = events
# then — replay the emitted events to assert the resulting state
assert %{client_state: %State{status: :open, owner: "Ada"}} =
apply_events(Aggregate, state, events)
end
test "open_account with a missing owner is a validation error" do
state = X3m.System.Aggregate.initial_state(Aggregate)
msg = command_message(:open_account, %{"id" => "acc-1"})
assert {:noblock,
%SysMsg{events: [], response: {:validation_error, _request}, halted?: true},
^state} = Aggregate.open_account(msg, state)
end
test "deposit adds to the balance of an open account" do
# given an already-opened account
state = state_from_events(Aggregate, [%Events.Opened{id: "acc-1", owner: "Ada"}])
# when
deposit = command_message(:deposit, %{"account_id" => "acc-1", "amount" => 100})
assert {:block, %SysMsg{events: events, response: :ok}, _state} =
Aggregate.deposit(deposit, state)
# then
assert [%Events.Deposited{amount: 100}] = events
assert %{client_state: %State{balance: 100}} = apply_events(Aggregate, state, events)
end
endA few things to note:
- The functions you call (
open_account/2,deposit/2) are the oneshandle_msggenerates — named after the message and taking(message, state). command_message/3builds the message (withraw_requestandassigns),state_from_events/3is the given state, andapply_events/4is the then step.- A command's validate function receives
(message, state), so it can validate against state — e.g. rejecting a deposit into a closed account with avalidation_error. - A command may emit events and return an error (
{:block, error, state}) — e.g. recording a rejection while replying{:error, _}— and a single command may emit several events (closing an account can return the remaining balance and then close it). So assert the emitted events, not just the response. - A failed validation comes back as
{:noblock, message, state}withmessage.events == [],halted?: trueand a{:validation_error, request}response (authorization guards look the same with an{:error, _}response) — no setup required to assert. - Calling a successful handler returns the events but does not apply them; state
changes happen in
apply_event/2at commit time. Replaying the returned events withapply_events(events, version, state)both gives you the resulting state to assert on and exercises yourapply_event/2clauses. - No message handler, repo, or running process is involved, so these are plain, fast,
async: trueunit tests.
Testing the full lifecycle
The tests above cover your command logic. To exercise the runtime — loading and spawning
the aggregate process, persisting events, idempotency, and your handler's overrides
(save_events/1, save_state/3, when_pid_is_not_registered/3, :unload_aggregate_on) —
you need a running aggregate and a repo. In tests, back it with an in-memory
Aggregate.Repo instead of a real event store:
defmodule MyApp.Accounts.InMemoryEventStore do
@moduledoc false
use X3m.System.Aggregate.Repo
use Agent
alias X3m.System.Message
def start_link(_opts \\ []),
do: Agent.start_link(fn -> %{} end, name: __MODULE__)
def reset(),
do: Agent.update(__MODULE__, fn _state -> %{} end)
@impl true
def has?(stream_name),
do: Agent.get(__MODULE__, &Map.has_key?(&1, stream_name))
@impl true
def stream_events(stream_name, _start_at \\ 0, _per_page \\ 1_000),
do: Agent.get(__MODULE__, &Map.get(&1, stream_name, []))
@impl true
def delete_stream(stream_name, _hard_delete?, _expected_version) do
Agent.update(__MODULE__, &Map.delete(&1, stream_name))
:ok
end
@impl true
def save_events(stream_name, %Message{} = message, metadata) do
__MODULE__
|> Agent.get_and_update(fn streams ->
existing = Map.get(streams, stream_name, [])
current = length(existing) - 1
# optimistic concurrency: the expected version is the aggregate's loaded version
# (-1 for a new aggregate), so a "create" against an existing stream is rejected.
if message.aggregate_meta.version != current do
{{:error, :wrong_expected_version, current}, streams}
else
# stamp message.id into metadata so idempotency survives a rehydrate
meta = Map.put(metadata, :message_id, message.id)
appended =
message.events
|> Enum.with_index(current + 1)
|> Enum.map(fn {event, number} -> {event, number, meta} end)
last = current + length(message.events)
{{:ok, last}, Map.put(streams, stream_name, existing ++ appended)}
end
end)
end
endStart it alongside the aggregate's supervision tree in test/test_helper.exs:
{:ok, _} = MyApp.Accounts.InMemoryEventStore.start_link()
{:ok, _} =
X3m.System.LocalAggregatesSupervision.start_link([MyApp.LocalAggregates, MyApp])Point a test message handler at the in-memory store, then drive it by calling the generated functions directly and asserting on both the reply and the stored events:
defmodule MyApp.Accounts.MessageHandlerTest do
# the pid facade and registry are named per aggregate module
use ExUnit.Case, async: false
import X3m.System.Aggregate.TestSupport
alias MyApp.Accounts.{MessageHandler, InMemoryEventStore, Events}
setup do
InMemoryEventStore.reset()
:ok
end
test "opens an account and persists the event" do
id = UUID.uuid4()
msg = command_message(:open_account, %{"id" => id, "owner" => "Ada"})
assert {:reply, replied} = MessageHandler.open_account(msg)
assert {:created, ^id, 0} == replied.response
assert [{%Events.Opened{}, 0, _}] = InMemoryEventStore.stream_events("accounts-" <> id)
end
endNotes:
- Use a fresh aggregate id per test; reuse an id only when you are deliberately testing
rehydration or idempotency (unload the process —
AggregatePidFacade.exit_process/3— to force the next command to reload). save_events/3stampsmessage.idinto each event's metadata so idempotency survives a rehydrate (the aggregate re-reads it viaprocessed_message_id/1while replaying).- This is how you test your overrides —
save_events/1reacting to or filtering events,:unload_aggregate_onrules, the save/load callbacks below — rather than the library internals. For a non-event-sourced handler, pointsave_state/3andwhen_pid_is_not_registered/3at a plain in-memory store the same way.
Validating without persisting (dry_run)
X3m.System.Dispatcher.validate/1 dispatches the command with dry_run: true: the
aggregate runs the command exactly as it would normally, but the message handler does
not persist any events and the aggregate's rollback/2 callback is invoked so it
can undo any side effects. The response tells you whether the command would have
succeeded. If a command performs side effects during handling (e.g. reserving a unique
value), implement the commit/2 and rollback/2 callbacks on the aggregate.
State persistence: event sourcing or not
Event sourcing is the default, but how an aggregate's state is saved and restored is governed by two overridable message-handler callbacks:
save_state/3— called after each successful commit. Override it to persist the aggregate's state.when_pid_is_not_registered/3— called when a command targets an aggregate whose process isn't currently running. Override it to decide how that process is hydrated.
Together they support a spectrum:
- Pure event sourcing (default):
when_pid_is_not_registered/3replays the event stream throughapply_event/2;save_state/3does nothing. - Snapshots on top of events:
save_state/3stores a periodic snapshot;when_pid_is_not_registered/3loads the latest snapshot and replays only the events after it. - Non-event-sourced (state-based) aggregates:
save_state/3persists the full current state (it receives the wrapped%X3m.System.Aggregate.State{}, so you can store itsclient_stateandversion);when_pid_is_not_registered/3loads that row, spawns the process and seeds it withX3m.System.GenAggregate.set_state(pid, loaded_state, version)— no event replay at all.set_state/3rebuilds the process'sclient_statethrough your aggregate'sset_state/1callback (the state-based analogue ofinitial_state/0— "for this loaded data, give me back your state"; defaults to the identity) and restores the version.
Your command logic (handle_msg) is identical in all three cases; only how state is
saved and restored differs.
To unload idle aggregates and reclaim memory, pass :unload_aggregate_on to
use X3m.System.MessageHandler with rules keyed on emitted events or current state.
When several nodes can host the same aggregate, see Distribution for how a request is routed to the node where the aggregate already lives.