System Message.
This module defines a X3m.System.Message struct and the main functions
for working with it.
Fields:
service_name- the name of the service that should handle this message. Example::create_job.id- unique id of the message.correlation_id- id of the message that "started" conversation.causation_id- id of the message that "caused" this message.logger_metadata- In each new processLogger.metadatashould be set to this value.invoked_at- utc time when message was generated.dry_run- specifies dry run option. It can be eitherfalse,trueor:verbose.request- request structure converted to Ecto.Changeset (or anything else useful).raw_request- request as it is received before converting to Message (i.e.paramsfrom controller action).assigns- shared Data as a map.response- the response for invoker.events- list of generated events.aggregate_meta- metadata for aggregate.valid?-trueby default on a new message; set tofalsebyput_request/2when the structured request fails validation. It means "not known to be invalid" rather than "already validated".origin_node- Node.self() of invokerreply_to- Pid of process that is waiting for response.halted?- when set totrueit means that response should be returned to the invoker without further processing of Message.
Summary
Functions
Adds event in message.events list. If event is nil
it behaves as noop.
Assigns a value to a key in the message. The "assigns" storage is meant to be used to store values in the message so that others in pipeline can use them when needed. The assigns storage is a map.
Returns message it received with Response.created(id) result set.
Returns message with a Response.error/1 result wrapping the given reason, with
halted? = true.
Generates a new, URL-safe, unique message id.
Creates new message with given service_name and provided opts
Creates new message with given service_name that is caused by other msg.
Returns message carrying Response.ok/0's bare :ok result, with halted? = true.
Returns message with a Response.ok/1 result wrapping the given value, with
halted? = true.
Extracts the aggregate id from message.raw_request under id_field and copies it
into message.aggregate_meta.id.
Puts value under key in message.raw_request map.
Stores a validated request (e.g. an Ecto.Changeset or a command struct) on the
message.
Returns sys_msg with provided response and as halted? = true.
Returns sys_msg re-targeted at a different service_name, leaving its ids,
payload and assigns intact. Useful for re-dispatching the same request to another
service.
Types
@type dry_run() :: boolean() | :verbose
@type t() :: %X3m.System.Message{ aggregate_meta: map(), assigns: assigns(), causation_id: String.t(), correlation_id: String.t(), dry_run: dry_run(), events: [map()], halted?: boolean(), id: String.t(), invoked_at: DateTime.t(), logger_metadata: Keyword.t(), origin_node: Node.t(), raw_request: map(), reply_to: pid(), request: nil | request(), response: nil | X3m.System.Response.t(), service_name: atom(), valid?: boolean() }
Functions
Adds event in message.events list. If event is nil
it behaves as noop.
After return/2 (and friends) order of msg.events will be the same as
they've been added.
Examples
Events are prepended, so they accumulate in reverse-insertion order until return/2
reverses them back into the order they were added:
iex> msg = X3m.System.Message.new(:open_account)
iex> msg = msg |> X3m.System.Message.add_event(:opened) |> X3m.System.Message.add_event(:limit_set)
iex> msg.events
[:limit_set, :opened]A nil event is ignored:
iex> msg = X3m.System.Message.new(:open_account)
iex> X3m.System.Message.add_event(msg, nil).events
[]
Assigns a value to a key in the message. The "assigns" storage is meant to be used to store values in the message so that others in pipeline can use them when needed. The assigns storage is a map.
Examples
iex> sys_msg = X3m.System.Message.new(:create_user)
iex> sys_msg.assigns[:user_id]
nil
iex> sys_msg = X3m.System.Message.assign(sys_msg, :user_id, 123)
iex> sys_msg.assigns[:user_id]
123
Returns message it received with Response.created(id) result set.
Examples
iex> msg = X3m.System.Message.new(:open_account) |> X3m.System.Message.created("acc-1")
iex> {msg.response, msg.halted?}
{{:created, "acc-1"}, true}
Returns message with a Response.error/1 result wrapping the given reason, with
halted? = true.
Examples
iex> msg = X3m.System.Message.new(:open_account) |> X3m.System.Message.error(:not_found)
iex> {msg.response, msg.halted?}
{{:error, :not_found}, true}
@spec gen_msg_id() :: String.t()
Generates a new, URL-safe, unique message id.
This is the same id format used by new/2 when no :id is given.
Creates new message with given service_name and provided opts:
id- id of the message. If not provided it generates random one.correlation_id- id of "conversation". If not provided it is set toid.causation_id- id of message that "caused" this message. If not provided it is set toid.reply_to- sets pid of process that expects response. If not provided it is set toself().raw_request- sets raw request as it is received (i.e.paramsfrom controller action).logger_metadata- if not providedLogger.metadatais used by default.
Examples
iex> msg = X3m.System.Message.new(:open_account, raw_request: %{"id" => "acc-1"})
iex> {msg.service_name, msg.raw_request, msg.valid?, msg.halted?}
{:open_account, %{"id" => "acc-1"}, true, false}
iex> msg.correlation_id == msg.id and msg.causation_id == msg.id
true
Creates new message with given service_name that is caused by other msg.
The child keeps the parent's correlation_id (the id of the message that started
the conversation) and sets its causation_id to the parent's id.
Examples
iex> parent = X3m.System.Message.new(:open_account)
iex> child = X3m.System.Message.new_caused_by(:notify_owner, parent)
iex> {child.service_name, child.correlation_id == parent.correlation_id, child.causation_id == parent.id}
{:notify_owner, true, true}
iex> child.id == parent.id
false
Returns message carrying Response.ok/0's bare :ok result, with halted? = true.
Examples
iex> msg = X3m.System.Message.new(:ping) |> X3m.System.Message.ok()
iex> {msg.response, msg.halted?}
{:ok, true}
Returns message with a Response.ok/1 result wrapping the given value, with
halted? = true.
Examples
iex> msg = X3m.System.Message.new(:get_account) |> X3m.System.Message.ok(%{balance: 10})
iex> {msg.response, msg.halted?}
{{:ok, %{balance: 10}}, true}
Extracts the aggregate id from message.raw_request under id_field and copies it
into message.aggregate_meta.id.
Options:
:generate_if_missing- whentrueand the id is absent, a fresh UUID is generated and written into bothraw_requestandaggregate_meta.
When the id is missing and :generate_if_missing is false (the default), the
message is halted with a Response.missing_id/1 response. This is what the
X3m.System.MessageHandler on_new_aggregate / on_aggregate macros call for you.
Examples
When the id is present under id_field it is copied into aggregate_meta:
iex> msg = X3m.System.Message.new(:open_account, raw_request: %{"id" => "acc-1"})
iex> X3m.System.Message.prepare_aggregate_id(msg, "id").aggregate_meta.id
"acc-1"When it is missing and :generate_if_missing is not set, the message is halted:
iex> msg = X3m.System.Message.new(:open_account, raw_request: %{})
iex> msg = X3m.System.Message.prepare_aggregate_id(msg, "id")
iex> {msg.halted?, msg.response}
{true, {:missing_id, "id"}}With generate_if_missing: true a fresh id is written into both raw_request and
aggregate_meta:
iex> msg = X3m.System.Message.new(:open_account, raw_request: %{})
iex> msg = X3m.System.Message.prepare_aggregate_id(msg, "id", generate_if_missing: true)
iex> is_binary(msg.aggregate_meta.id) and msg.raw_request["id"] == msg.aggregate_meta.id
true
Puts value under key in message.raw_request map.
Examples
iex> msg = X3m.System.Message.new(:open_account, raw_request: %{"id" => "acc-1"})
iex> X3m.System.Message.put_in_raw_request(msg, "owner", "Ada").raw_request
%{"id" => "acc-1", "owner" => "Ada"}When raw_request is nil it is treated as an empty map:
iex> msg = X3m.System.Message.new(:open_account)
iex> X3m.System.Message.put_in_raw_request(msg, :owner, "Ada").raw_request
%{owner: "Ada"}
Stores a validated request (e.g. an Ecto.Changeset or a command struct) on the
message.
If request carries valid?: false, the message is halted with a
Response.validation_error/1 so dispatch returns the error immediately. Otherwise
the request is stored and message.valid? is set to true.
Examples
A valid request is stored and the message stays open:
iex> msg = X3m.System.Message.new(:open_account)
iex> msg = X3m.System.Message.put_request(%{owner: "Ada"}, msg)
iex> {msg.valid?, msg.request, msg.halted?}
{true, %{owner: "Ada"}, false}An invalid request (e.g. an invalid Ecto.Changeset) halts with a validation error:
iex> msg = X3m.System.Message.new(:open_account)
iex> msg = X3m.System.Message.put_request(%{valid?: false}, msg)
iex> {msg.valid?, msg.halted?, msg.response}
{false, true, {:validation_error, %{valid?: false}}}
@spec return(t(), X3m.System.Response.t()) :: t()
Returns sys_msg with provided response and as halted? = true.
Events accumulated with add_event/2 are reversed back into the order they were
added.
Examples
iex> alias X3m.System.{Message, Response}
iex> msg = Message.new(:open_account) |> Message.add_event(:opened) |> Message.add_event(:limit_set)
iex> returned = Message.return(msg, Response.ok(:done))
iex> {returned.response, returned.halted?, returned.events}
{{:ok, :done}, true, [:opened, :limit_set]}
Returns sys_msg re-targeted at a different service_name, leaving its ids,
payload and assigns intact. Useful for re-dispatching the same request to another
service.
Examples
iex> msg = X3m.System.Message.new(:open_account, id: "m-1", raw_request: %{"id" => "acc-1"})
iex> retargeted = X3m.System.Message.to_service(msg, :close_account)
iex> {retargeted.service_name, retargeted.id, retargeted.raw_request}
{:close_account, "m-1", %{"id" => "acc-1"}}