X3m.System.Aggregate behaviour (X3m System v0.9.1)

Copy Markdown View Source

Behaviour and macros for defining an aggregate: the unit that decides how a command is handled and how events change its state.

An aggregate is a plain module that uses this one. It declares its starting state with initial_state/0, one handle_msg/2 (or handle_msg/3) clause per command, and an apply_event/2 clause per event:

defmodule MyApp.Accounts.Aggregate do
  use X3m.System.Aggregate
  alias X3m.System.Message, as: SysMsg

  defmodule State, do: defstruct [:id, balance: 0]

  @impl X3m.System.Aggregate
  def initial_state, do: %State{}

  # validate with Command.new/2, then run Handler.process/2 on success
  handle_msg :open_account, &Command.Open.new/2, &Handler.Open.process/2

  # or a single function that returns {:block | :noblock, message, state}
  handle_msg :deposit, fn %SysMsg{} = msg, %State{} = state ->
    event = %Events.Deposited{id: state.id, amount: msg.request.amount}

    msg =
      msg
      |> SysMsg.add_event(event)
      |> SysMsg.ok()

    {:block, msg, state}
  end

  def apply_event(%Events.Opened{} = e, %State{} = state),
    do: %State{state | id: e.id}

  def apply_event(%Events.Deposited{} = e, %State{} = state),
    do: %State{state | balance: state.balance + e.amount}
end

A command handler returns one of:

  • {:block, message, state} - there are events to persist; the message handler saves them and replies once the commit succeeds.
  • {:noblock, message, state} - nothing to persist; the response is returned as-is.

The macros wrap your functions with idempotency (a message whose id was already processed returns :ok without re-running) and telemetry. State is rebuilt from the event stream by replaying apply_event/2, so aggregates are event-sourced by default; override processed_message_id/1, commit/2 and rollback/2 for finer control.

See the "Aggregates & Event Sourcing" guide for the full flow.

Summary

Callbacks

Returns the aggregate's starting client state (before any events are applied).

Rebuilds the aggregate's client state from loaded_state — the state-based (non-event-sourced) analogue of initial_state/0: "for this loaded data, give me back your state".

Functions

Defines a command handler msg_name/2 from a single fun.

Defines a command handler msg_name/2 that first validates, then processes.

Builds the initial wrapped State for aggregate_mod, seeding client_state from its initial_state/0.

Callbacks

initial_state()

@callback initial_state() :: map()

Returns the aggregate's starting client state (before any events are applied).

set_state(loaded_state)

@callback set_state(loaded_state :: term()) :: client_state :: map()

Rebuilds the aggregate's client state from loaded_state — the state-based (non-event-sourced) analogue of initial_state/0: "for this loaded data, give me back your state".

Called by X3m.System.GenAggregate.set_state/2 when a message handler hydrates a process from saved state instead of replaying events (see the when_pid_is_not_registered/3 override in X3m.System.MessageHandler). Defaults to the identity — loaded_state is taken as the client state — so override it only when the persisted shape differs from your client state.

Functions

handle_msg(msg_name, fun)

(macro)

Defines a command handler msg_name/2 from a single fun.

fun receives the X3m.System.Message and the current client state and must return {:block | :noblock, message, state}.

handle_msg(msg_name, validate_fun, on_success)

(macro)

Defines a command handler msg_name/2 that first validates, then processes.

validate_fun receives the message and client state. If it returns a halted message (e.g. via X3m.System.Message.put_request/2 on an invalid changeset) processing stops and that message is returned. Otherwise on_success runs and must return {:block | :noblock, message, state}.

initial_state(aggregate_mod)

@spec initial_state(aggregate_mod :: module()) :: X3m.System.Aggregate.State.t()

Builds the initial wrapped State for aggregate_mod, seeding client_state from its initial_state/0.

Used by the aggregate process when it spawns, and handy in tests as the starting state you pass to a command function.