The messaging layer — X3m.System.Message, X3m.System.Router and
X3m.System.Dispatcher — is the foundation everything else builds on. It works on its
own: you do not need aggregates or an event store to use it.
The message
X3m.System.Message is the envelope that travels from the caller to a service and
back. You create one with X3m.System.Message.new/2:
alias X3m.System.Message, as: SysMsg
msg = SysMsg.new(:open_account, raw_request: %{"id" => "acc-1", "owner" => "Ada"})Useful fields:
service_name— which service should handle it (:open_accountabove).raw_request— the request as received (e.g. controllerparams).request— a validated/structured request, set withput_request/2once you've castraw_requestinto a changeset or struct.assigns— a map for values you want to carry alongside the request; set withassign/3(e.g. the authenticated user).response— set by the handler, read by the caller.correlation_id/causation_id— ids that tie a conversation together (see below).
msg = SysMsg.assign(msg, :invoked_by, current_user)Correlating and causing messages
Every message has a correlation_id (the id of the message that started the
conversation) and a causation_id (the id of the message that directly caused this
one). When one service needs to call another, build the child message with
new_caused_by/3 so the chain is preserved:
:get_owner_details
|> SysMsg.new_caused_by(msg, raw_request: %{"owner_id" => owner_id})
|> X3m.System.Dispatcher.dispatch()The router
A router registers services and decides who is allowed to call them.
defmodule MyApp.Router do
use X3m.System.Router
service :open_account, MyApp.Accounts
service :get_account, MyApp.Accounts, :read # different function name
servicep :rebuild_projection, MyApp.Accounts # private to this node
def authorize(%X3m.System.Message{service_name: :open_account, assigns: %{invoked_by: %{admin?: true}}}),
do: :ok
def authorize(_message), do: :forbidden
endservice/2andservice/3register a public service — one that is announced to other nodes in the cluster.servicep/2andservicep/3register a private service — usable on the local node but never advertised to peers.authorize/1runs before the handler. Return:okto proceed; any other value becomes the response sent back to the caller. If you don't define a catch-all clause, the router denies by default.
Announce the services at runtime (typically from start/2):
:ok = MyApp.Router.register_services()You can introspect what a router registered with registered_services/1
(:public, :private, or :all).
Handlers
A handler is just a function that takes a X3m.System.Message and returns either:
{:reply, message}— sendmessageback to the caller, or:noreply— send nothing.
defmodule MyApp.Accounts do
alias X3m.System.Message
def read(%Message{} = message) do
account = lookup(message.raw_request["id"])
{:reply, Message.ok(message, account)}
end
endX3m.System.Message provides helpers that set the response and mark the message done:
ok/1, ok/2, created/2, error/2, and the lower-level return/2.
Dispatching
X3m.System.Dispatcher.dispatch/2 discovers a node offering the service, invokes it,
and waits for the reply:
:open_account
|> SysMsg.new(raw_request: %{"id" => "acc-1"})
|> X3m.System.Dispatcher.dispatch(timeout: 10_000)sequenceDiagram
participant Caller
participant Dispatcher
participant Registry as Service registry
participant Handler as Service handler
Caller->>Dispatcher: dispatch(message)
Dispatcher->>Registry: which node offers service_name?
alt no provider
Registry-->>Dispatcher: none
Dispatcher-->>Caller: response {:service_unavailable, name}
else provider found
Registry-->>Dispatcher: node
Dispatcher->>Handler: authorize/1, then invoke
Handler-->>Caller: %Message{response: ...}
end- The default timeout is 5000 ms; on expiry the response becomes
{:service_timeout, service_name, message_id, timeout}. - If no node offers the service, the response is
{:service_unavailable, service_name}.
To check a command without committing its effects, use validate/1 — it dispatches
with dry_run: true, so aggregate-backed services run the command and report whether
it would succeed without persisting events. See
Aggregates & event sourcing for dry_run details.
Responses
By the time dispatch/2 returns, message.response holds one of the shapes described
by X3m.System.Response. Callers pattern-match on it:
case X3m.System.Dispatcher.dispatch(msg) do
%SysMsg{response: {:ok, account}} -> ...
%SysMsg{response: {:created, id, _version}} -> ...
%SysMsg{response: {:validation_error, changeset}} -> ...
%SysMsg{response: {:error, reason}} -> ...
endTo run services across more than one node, continue with Distribution.