X3m.System.Dispatcher (X3m System v0.9.1)

Copy Markdown View Source

Sends a X3m.System.Message to whichever node offers its service and waits for the response.

This is the client-facing entry point of the messaging layer. You build a message with X3m.System.Message.new/2, optionally assign values onto it, and then call dispatch/2:

:open_account
|> X3m.System.Message.new(raw_request: %{"id" => id})
|> X3m.System.Dispatcher.dispatch()

Service discovery is transparent: the dispatcher asks the (internal) service registry which nodes provide message.service_name. A local provider is invoked directly; remote providers are invoked over :rpc. The reply is delivered back to the calling process, so dispatch/2 returns the resolved X3m.System.Message with its response set. See the "Distribution" guide for how nodes are chosen and how a service can ask the dispatcher to :try_another_node.

Using the dispatcher does not require aggregates or event sourcing — any module registered through X3m.System.Router can be a dispatch target.

Summary

Functions

Returns whether the (discovered) service authorizes message.

Looks up which nodes offer message.service_name.

Discovers a node offering message.service_name, invokes the service there and returns the resolved message with its response set.

Sets message.dry_run to true if it was (by default) false and dispatches service call.

Functions

authorized?(message)

@spec authorized?(X3m.System.Message.t()) ::
  boolean() | {:service_unavailable, atom()}

Returns whether the (discovered) service authorizes message.

Discovery is performed first; if no node offers the service :service_unavailable is returned. Otherwise authorization is delegated to the providing node's router (X3m.System.Router authorize/1).

discover_service(message)

@spec discover_service(X3m.System.Message.t()) ::
  :not_found | [{:local | atom(), router_mod :: module()}]

Looks up which nodes offer message.service_name.

Returns :not_found when no node provides it, or a list of {:local | node, router_module} pairs otherwise. Used internally by dispatch/2 and authorized?/1; exposed for introspection.

dispatch(message)

Discovers a node offering message.service_name, invokes the service there and returns the resolved message with its response set.

A halted message (halted?: true) is returned untouched.

Options:

  • :timeout - milliseconds to wait for the service reply (default 5_000). On expiry the response is set to Response.service_timeout/3.

If no node offers the service the response is set to Response.service_unavailable/1. When several nodes offer it, one is picked at random; a node may reply with {:error, {:try_another_node, reason}} to make the dispatcher try the next one.

dispatch(message, opts \\ [])

@spec dispatch(X3m.System.Message.t(), opts :: Keyword.t()) :: X3m.System.Message.t()

validate(message)

Sets message.dry_run to true if it was (by default) false and dispatches service call.

Pay attention if you have some side effects (like persistence of unique values in DB) in your command handling. Such validations should be either avoided or Aggregate needs to implement rollback/2 and commit/2 callbacks.

If service call is valid, message.response will be in {:ok, aggregate_version} format, otherwise response will have error message as it would have if dispatch was called.