Behaviour and macros for defining an aggregate: the unit that decides how a command is handled and how events change its state.
An aggregate is a plain module that uses this one. It declares its starting state
with initial_state/0, one handle_msg/2 (or handle_msg/3) clause per command,
and an apply_event/2 clause per event:
defmodule MyApp.Accounts.Aggregate do
use X3m.System.Aggregate
alias X3m.System.Message, as: SysMsg
defmodule State, do: defstruct [:id, balance: 0]
@impl X3m.System.Aggregate
def initial_state, do: %State{}
# validate with Command.new/2, then run Handler.process/2 on success
handle_msg :open_account, &Command.Open.new/2, &Handler.Open.process/2
# or a single function that returns {:block | :noblock, message, state}
handle_msg :deposit, fn %SysMsg{} = msg, %State{} = state ->
event = %Events.Deposited{id: state.id, amount: msg.request.amount}
msg =
msg
|> SysMsg.add_event(event)
|> SysMsg.ok()
{:block, msg, state}
end
def apply_event(%Events.Opened{} = e, %State{} = state),
do: %State{state | id: e.id}
def apply_event(%Events.Deposited{} = e, %State{} = state),
do: %State{state | balance: state.balance + e.amount}
endA command handler returns one of:
{:block, message, state}- there are events to persist; the message handler saves them and replies once the commit succeeds.{:noblock, message, state}- nothing to persist; the response is returned as-is.
The macros wrap your functions with idempotency (a message whose id was already
processed returns :ok without re-running) and telemetry. State is rebuilt from the
event stream by replaying apply_event/2, so aggregates are event-sourced by default;
override processed_message_id/1, commit/2 and rollback/2 for finer control.
See the "Aggregates & Event Sourcing" guide for the full flow.
Summary
Callbacks
Returns the aggregate's starting client state (before any events are applied).
Rebuilds the aggregate's client state from loaded_state — the state-based
(non-event-sourced) analogue of initial_state/0: "for this loaded data, give me back
your state".
Functions
Defines a command handler msg_name/2 from a single fun.
Defines a command handler msg_name/2 that first validates, then processes.
Builds the initial wrapped State for aggregate_mod, seeding client_state from its
initial_state/0.
Callbacks
@callback initial_state() :: map()
Returns the aggregate's starting client state (before any events are applied).
Rebuilds the aggregate's client state from loaded_state — the state-based
(non-event-sourced) analogue of initial_state/0: "for this loaded data, give me back
your state".
Called by X3m.System.GenAggregate.set_state/2 when a message handler hydrates a process
from saved state instead of replaying events (see the when_pid_is_not_registered/3 override
in X3m.System.MessageHandler). Defaults to the identity — loaded_state is taken as the
client state — so override it only when the persisted shape differs from your client state.
Functions
Defines a command handler msg_name/2 from a single fun.
fun receives the X3m.System.Message and the current client state and must return
{:block | :noblock, message, state}.
Defines a command handler msg_name/2 that first validates, then processes.
validate_fun receives the message and client state. If it returns a halted message
(e.g. via X3m.System.Message.put_request/2 on an invalid changeset) processing
stops and that message is returned. Otherwise on_success runs and must return
{:block | :noblock, message, state}.
@spec initial_state(aggregate_mod :: module()) :: X3m.System.Aggregate.State.t()
Builds the initial wrapped State for aggregate_mod, seeding client_state from its
initial_state/0.
Used by the aggregate process when it spawns, and handy in tests as the starting state you pass to a command function.