X3m.System.MessageHandler (X3m System v0.9.1)

Copy Markdown View Source

Connects a router service to an aggregate, taking care of loading the aggregate, running the command, persisting the produced events and replying to the caller.

A message handler is the glue between X3m.System.Router (which routes a service call) and X3m.System.Aggregate (which decides what happens). You use it with the collaborators it needs, then declare one function per service with the on_*_aggregate macros:

defmodule MyApp.Accounts.MessageHandler do
  use X3m.System.MessageHandler,
    aggregate_mod: MyApp.Accounts.Aggregate,
    aggregate_repo: MyApp.Accounts.AggregateRepo,
    stream: "accounts",
    pid_facade_mod: X3m.System.AggregatePidFacade,
    event_metadata: %{app_version: "1.0.0"}

  on_new_aggregate :open_account, id: :id
  on_aggregate :deposit, id: :account_id
  on_maybe_new_aggregate :ensure_account, id: :id
end

use options

  • :aggregate_mod (required) - the X3m.System.Aggregate module that handles the command and applies events.
  • :aggregate_repo (required) - a module implementing X3m.System.Aggregate.Repo, used to read and persist the event stream.
  • :pid_facade_mod (required) - the process facade that spawns/locates the aggregate process. Use the default shown above unless you provide your own (e.g. to use a distributed registry such as Horde).
  • :stream - prefix for the per-aggregate stream name ("#{stream}-#{id}").
  • :event_metadata - a map merged into the metadata stored with every event.
  • :unload_aggregate_on - rules for tearing the aggregate process down after certain events or states, to free memory.

Lifecycle

Each declared function:

  1. extracts the aggregate id from message.raw_request (see X3m.System.Message.prepare_aggregate_id/3),
  2. loads or spawns the aggregate process and runs the command on it,
  3. on a {:block, ...} result, persists the events via the repo and commits,
  4. enriches the response with the new aggregate version ({:ok, version} / {:created, id, version}) and replies to message.reply_to.

Snapshotting and other persistence strategies are supported by overriding the generated save_state/3 and when_pid_is_not_registered/3 callbacks.

See the "Aggregates & Event Sourcing" guide for a full walk-through.

Summary

Functions

on_aggregate(cmd, opts \\ [])

(macro)

on_maybe_new_aggregate(cmd, opts \\ [])

(macro)

on_new_aggregate(cmd, opts \\ [])

(macro)