X3m.System.Message (X3m System v0.9.1)

Copy Markdown View Source

System Message.

This module defines a X3m.System.Message struct and the main functions for working with it.

Fields:

  • service_name - the name of the service that should handle this message. Example: :create_job.
  • id - unique id of the message.
  • correlation_id - id of the message that "started" conversation.
  • causation_id - id of the message that "caused" this message.
  • logger_metadata - In each new process Logger.metadata should be set to this value.
  • invoked_at - utc time when message was generated.
  • dry_run - specifies dry run option. It can be either false, true or :verbose.
  • request - request structure converted to Ecto.Changeset (or anything else useful).
  • raw_request - request as it is received before converting to Message (i.e. params from controller action).
  • assigns - shared Data as a map.
  • response - the response for invoker.
  • events - list of generated events.
  • aggregate_meta - metadata for aggregate.
  • valid? - true by default on a new message; set to false by put_request/2 when the structured request fails validation. It means "not known to be invalid" rather than "already validated".
  • origin_node - Node.self() of invoker
  • reply_to - Pid of process that is waiting for response.
  • halted? - when set to true it means that response should be returned to the invoker without further processing of Message.

Summary

Functions

Adds event in message.events list. If event is nil it behaves as noop.

Assigns a value to a key in the message. The "assigns" storage is meant to be used to store values in the message so that others in pipeline can use them when needed. The assigns storage is a map.

Returns message it received with Response.created(id) result set.

Returns message with a Response.error/1 result wrapping the given reason, with halted? = true.

Generates a new, URL-safe, unique message id.

Creates new message with given service_name and provided opts

Creates new message with given service_name that is caused by other msg.

Returns message carrying Response.ok/0's bare :ok result, with halted? = true.

Returns message with a Response.ok/1 result wrapping the given value, with halted? = true.

Extracts the aggregate id from message.raw_request under id_field and copies it into message.aggregate_meta.id.

Puts value under key in message.raw_request map.

Stores a validated request (e.g. an Ecto.Changeset or a command struct) on the message.

Returns sys_msg with provided response and as halted? = true.

Returns sys_msg re-targeted at a different service_name, leaving its ids, payload and assigns intact. Useful for re-dispatching the same request to another service.

Types

dry_run()

@type dry_run() :: boolean() | :verbose

error()

@type error() :: {String.t(), Keyword.t()}

errors()

@type errors() :: [{atom(), error()}]

t()

@type t() :: %X3m.System.Message{
  aggregate_meta: map(),
  assigns: assigns(),
  causation_id: String.t(),
  correlation_id: String.t(),
  dry_run: dry_run(),
  events: [map()],
  halted?: boolean(),
  id: String.t(),
  invoked_at: DateTime.t(),
  logger_metadata: Keyword.t(),
  origin_node: Node.t(),
  raw_request: map(),
  reply_to: pid(),
  request: nil | request(),
  response: nil | X3m.System.Response.t(),
  service_name: atom(),
  valid?: boolean()
}

Functions

add_event(message, event)

@spec add_event(message :: t(), event :: nil | any()) :: t()

Adds event in message.events list. If event is nil it behaves as noop.

After return/2 (and friends) order of msg.events will be the same as they've been added.

Examples

Events are prepended, so they accumulate in reverse-insertion order until return/2 reverses them back into the order they were added:

iex> msg = X3m.System.Message.new(:open_account)
iex> msg = msg |> X3m.System.Message.add_event(:opened) |> X3m.System.Message.add_event(:limit_set)
iex> msg.events
[:limit_set, :opened]

A nil event is ignored:

iex> msg = X3m.System.Message.new(:open_account)
iex> X3m.System.Message.add_event(msg, nil).events
[]

assign(sys_msg, key, val)

@spec assign(t(), atom(), any()) :: t()

Assigns a value to a key in the message. The "assigns" storage is meant to be used to store values in the message so that others in pipeline can use them when needed. The assigns storage is a map.

Examples

iex> sys_msg = X3m.System.Message.new(:create_user)
iex> sys_msg.assigns[:user_id]
nil
iex> sys_msg = X3m.System.Message.assign(sys_msg, :user_id, 123)
iex> sys_msg.assigns[:user_id]
123

created(message, id)

@spec created(t(), any()) :: t()

Returns message it received with Response.created(id) result set.

Examples

iex> msg = X3m.System.Message.new(:open_account) |> X3m.System.Message.created("acc-1")
iex> {msg.response, msg.halted?}
{{:created, "acc-1"}, true}

error(message, any)

@spec error(t(), any()) :: t()

Returns message with a Response.error/1 result wrapping the given reason, with halted? = true.

Examples

iex> msg = X3m.System.Message.new(:open_account) |> X3m.System.Message.error(:not_found)
iex> {msg.response, msg.halted?}
{{:error, :not_found}, true}

gen_msg_id()

@spec gen_msg_id() :: String.t()

Generates a new, URL-safe, unique message id.

This is the same id format used by new/2 when no :id is given.

new(service_name, opts \\ [])

@spec new(atom(), Keyword.t()) :: t()

Creates new message with given service_name and provided opts:

  • id - id of the message. If not provided it generates random one.
  • correlation_id - id of "conversation". If not provided it is set to id.
  • causation_id - id of message that "caused" this message. If not provided it is set to id.
  • reply_to - sets pid of process that expects response. If not provided it is set to self().
  • raw_request - sets raw request as it is received (i.e. params from controller action).
  • logger_metadata - if not provided Logger.metadata is used by default.

Examples

iex> msg = X3m.System.Message.new(:open_account, raw_request: %{"id" => "acc-1"})
iex> {msg.service_name, msg.raw_request, msg.valid?, msg.halted?}
{:open_account, %{"id" => "acc-1"}, true, false}
iex> msg.correlation_id == msg.id and msg.causation_id == msg.id
true

new_caused_by(service_name, msg, opts \\ [])

@spec new_caused_by(atom(), t(), Keyword.t()) :: t()

Creates new message with given service_name that is caused by other msg.

The child keeps the parent's correlation_id (the id of the message that started the conversation) and sets its causation_id to the parent's id.

Examples

iex> parent = X3m.System.Message.new(:open_account)
iex> child = X3m.System.Message.new_caused_by(:notify_owner, parent)
iex> {child.service_name, child.correlation_id == parent.correlation_id, child.causation_id == parent.id}
{:notify_owner, true, true}
iex> child.id == parent.id
false

ok(message)

@spec ok(t()) :: t()

Returns message carrying Response.ok/0's bare :ok result, with halted? = true.

Examples

iex> msg = X3m.System.Message.new(:ping) |> X3m.System.Message.ok()
iex> {msg.response, msg.halted?}
{:ok, true}

ok(message, any)

@spec ok(t(), any()) :: t()

Returns message with a Response.ok/1 result wrapping the given value, with halted? = true.

Examples

iex> msg = X3m.System.Message.new(:get_account) |> X3m.System.Message.ok(%{balance: 10})
iex> {msg.response, msg.halted?}
{{:ok, %{balance: 10}}, true}

prepare_aggregate_id(message, id_field, opts \\ [])

@spec prepare_aggregate_id(t(), id_field :: term(), opts :: Keyword.t()) :: t()

Extracts the aggregate id from message.raw_request under id_field and copies it into message.aggregate_meta.id.

Options:

  • :generate_if_missing - when true and the id is absent, a fresh UUID is generated and written into both raw_request and aggregate_meta.

When the id is missing and :generate_if_missing is false (the default), the message is halted with a Response.missing_id/1 response. This is what the X3m.System.MessageHandler on_new_aggregate / on_aggregate macros call for you.

Examples

When the id is present under id_field it is copied into aggregate_meta:

iex> msg = X3m.System.Message.new(:open_account, raw_request: %{"id" => "acc-1"})
iex> X3m.System.Message.prepare_aggregate_id(msg, "id").aggregate_meta.id
"acc-1"

When it is missing and :generate_if_missing is not set, the message is halted:

iex> msg = X3m.System.Message.new(:open_account, raw_request: %{})
iex> msg = X3m.System.Message.prepare_aggregate_id(msg, "id")
iex> {msg.halted?, msg.response}
{true, {:missing_id, "id"}}

With generate_if_missing: true a fresh id is written into both raw_request and aggregate_meta:

iex> msg = X3m.System.Message.new(:open_account, raw_request: %{})
iex> msg = X3m.System.Message.prepare_aggregate_id(msg, "id", generate_if_missing: true)
iex> is_binary(msg.aggregate_meta.id) and msg.raw_request["id"] == msg.aggregate_meta.id
true

put_in_raw_request(message, key, value)

@spec put_in_raw_request(t(), key :: term(), value :: term()) :: t()

Puts value under key in message.raw_request map.

Examples

iex> msg = X3m.System.Message.new(:open_account, raw_request: %{"id" => "acc-1"})
iex> X3m.System.Message.put_in_raw_request(msg, "owner", "Ada").raw_request
%{"id" => "acc-1", "owner" => "Ada"}

When raw_request is nil it is treated as an empty map:

iex> msg = X3m.System.Message.new(:open_account)
iex> X3m.System.Message.put_in_raw_request(msg, :owner, "Ada").raw_request
%{owner: "Ada"}

put_request(request, message)

@spec put_request(request :: map(), t()) :: t()

Stores a validated request (e.g. an Ecto.Changeset or a command struct) on the message.

If request carries valid?: false, the message is halted with a Response.validation_error/1 so dispatch returns the error immediately. Otherwise the request is stored and message.valid? is set to true.

Examples

A valid request is stored and the message stays open:

iex> msg = X3m.System.Message.new(:open_account)
iex> msg = X3m.System.Message.put_request(%{owner: "Ada"}, msg)
iex> {msg.valid?, msg.request, msg.halted?}
{true, %{owner: "Ada"}, false}

An invalid request (e.g. an invalid Ecto.Changeset) halts with a validation error:

iex> msg = X3m.System.Message.new(:open_account)
iex> msg = X3m.System.Message.put_request(%{valid?: false}, msg)
iex> {msg.valid?, msg.halted?, msg.response}
{false, true, {:validation_error, %{valid?: false}}}

return(sys_msg, response)

@spec return(t(), X3m.System.Response.t()) :: t()

Returns sys_msg with provided response and as halted? = true.

Events accumulated with add_event/2 are reversed back into the order they were added.

Examples

iex> alias X3m.System.{Message, Response}
iex> msg = Message.new(:open_account) |> Message.add_event(:opened) |> Message.add_event(:limit_set)
iex> returned = Message.return(msg, Response.ok(:done))
iex> {returned.response, returned.halted?, returned.events}
{{:ok, :done}, true, [:opened, :limit_set]}

to_service(sys_msg, service_name)

@spec to_service(t(), service_name :: atom()) :: t()

Returns sys_msg re-targeted at a different service_name, leaving its ids, payload and assigns intact. Useful for re-dispatching the same request to another service.

Examples

iex> msg = X3m.System.Message.new(:open_account, id: "m-1", raw_request: %{"id" => "acc-1"})
iex> retargeted = X3m.System.Message.to_service(msg, :close_account)
iex> {retargeted.service_name, retargeted.id, retargeted.raw_request}
{:close_account, "m-1", %{"id" => "acc-1"}}