distribute

distribute: Typed messaging for the BEAM.

This library provides a safety layer over Erlang’s native distribution. It focuses on hardening the Node Boundary: the physical edge where typed Gleam values become raw binary terms and vice versa.

The Typed Boundary Concept

In standard Erlang distribution, messages cross the wire as raw terms without type information. distribute forces you to define a protocol using TypedName and Codec before any data is sent.

  1. At the Sender: messages are encoded, checked against payload limits, and transmitted via BEAM distribution.
  2. At the Receiver: binaries have their size verified before decoding. If they exceed limits, they are rejected. If they are valid, they are decoded into typed Gleam values.

This architecture ensures that an actor’s mailbox is never flooded with unparseable or oversized data, and the compiler can prevent protocol mismatches at the call site.

Core Philosophy

Typical Usage

// 1. Configure once at application startup
let assert Ok(Nil) = distribute.configure(config.default())

// 2. Define a protocol (Name + Codec)
let counter = distribute.named("counter", codec.int())

// 3. Start a globally registered, supervised singleton
let assert Ok(pid) = distribute.start_supervised(counter, 0, handler)

// 4. Call from any node. Monitor-based TargetDown detection,
   default timeout from config
let assert Ok(val) = distribute.call(gs, Get, codec.int_decoder())

Default vs explicit timeout

Every long-running operation has two shapes:

Error handling without extra imports

Every public error type (and its *_to_string formatter) is re-exported here. Pattern-matching on CallError, RegisterError, LookupError, etc., requires only import distribute.

Reply-Subject helpers live in distribute/receiver

Handlers that answer a call typically need receiver.receive_typed or receiver.selecting_typed to operate on the raw reply-Subject. These are intra-handler primitives: import the module directly.

import distribute/receiver
case receiver.receive_typed(reply_to, codec.int_decoder(), 1000) {
  Ok(value) -> ...
  Error(receiver.ReceiveTimeout) -> ...
  Error(receiver.DecodeError(_)) -> ...
}

API surface, by concern

Most users only need import distribute. Low-level modules (distribute/actor, distribute/global, distribute/registry, distribute/codec, distribute/receiver, distribute/cluster, distribute/cluster_monitor, distribute/codec/composite, distribute/codec/tagged, distribute/config) remain available for advanced cases not covered by the facade. e.g. whereis, register_pid, register_typed, from_pid, from_subject, lookup_with_timeout, lookup_async, start_registered_observed, receive_typed, selecting_typed.

Types

pub type ActorStartError =
  actor.StartError
pub type Config =
  config.Config
pub type GlobalSubject(msg) =
  global.GlobalSubject(msg)
pub type HandlerStep(state) =
  receiver.HandlerStep(state)
pub type NodeStartError =
  cluster.StartError
pub type TelemetryEvent =
  telemetry.Event
pub type TelemetrySink =
  fn(telemetry.Event) -> Nil
pub type TypedName(msg) =
  registry.TypedName(msg)

Values

pub fn actor_start_error_to_string(
  err: actor.StartError,
) -> String

Render a gleam_otp/actor.StartError as a human-readable string.

pub fn call(
  target: global.GlobalSubject(req),
  make_request: fn(process.Subject(BitArray)) -> req,
  response_decoder: fn(BitArray) -> Result(
    resp,
    codec.DecodeError,
  ),
) -> Result(resp, global.CallError)

Synchronous request/response with monitor-based TargetDown detection. Uses config.get().default_call_timeout_ms. Use call_with_timeout/4 for an explicit timeout.

Late-reply caveat: choose call_isolated for long-running callers

call is the cheap default. It is safe for short-lived callers (CLI tools, request handlers, scripts) whose process exits shortly after the call returns: orphan late-replies die with the process.

It is not the right choice for long-lived processes (OTP actors, supervisors, manager loops) that issue many calls under sustained timeouts. A reply that arrives after call returns Error(Timeout) cannot be evicted from the caller’s mailbox by the BEAM (no erlang:alias/0-aware Subject layout in the current gleam_erlang); selective receive scans every orphan on every subsequent process.receive, and tens of thousands of orphans quietly degrade the caller’s throughput.

For that shape, prefer call_isolated/3: it runs each call inside a short-lived unlinked proxy process whose mailbox is reaped on exit. See global.call/4 and docs/safety_and_limits.md for the full design rationale.

pub fn call_error_to_string(err: global.CallError) -> String
pub fn call_isolated(
  target: global.GlobalSubject(req),
  make_request: fn(process.Subject(BitArray)) -> req,
  response_decoder: fn(BitArray) -> Result(
    resp,
    codec.DecodeError,
  ),
) -> Result(resp, global.CallError)

Mailbox-safe variant of call using config.get().default_call_timeout_ms. Each invocation runs inside a short-lived unlinked proxy process so orphan late-replies die with the proxy instead of polluting the caller’s mailbox. Recommended for long-running callers issuing many RPCs under sustained timeouts. See global.call_isolated for the full design rationale.

pub fn call_isolated_with_timeout(
  target: global.GlobalSubject(req),
  make_request: fn(process.Subject(BitArray)) -> req,
  response_decoder: fn(BitArray) -> Result(
    resp,
    codec.DecodeError,
  ),
  timeout_ms: Int,
) -> Result(resp, global.CallError)

Like call_isolated, with an explicit timeout.

pub fn call_with_timeout(
  target: global.GlobalSubject(req),
  make_request: fn(process.Subject(BitArray)) -> req,
  response_decoder: fn(BitArray) -> Result(
    resp,
    codec.DecodeError,
  ),
  timeout_ms: Int,
) -> Result(resp, global.CallError)

Like call, with an explicit timeout. Inherits the same late-reply caveat. See call/3.

pub fn child_spec(
  typed_name: registry.TypedName(msg),
  initial_state: state,
  handler: fn(msg, state) -> receiver.HandlerStep(state),
) -> supervision.ChildSpecification(global.GlobalSubject(msg))

OTP child spec using config.get().default_init_timeout_ms.

pub fn child_spec_with_timeout(
  typed_name: registry.TypedName(msg),
  initial_state: state,
  handler: fn(msg, state) -> receiver.HandlerStep(state),
  init_timeout_ms: Int,
) -> supervision.ChildSpecification(global.GlobalSubject(msg))

Like child_spec, with an explicit init timeout.

pub fn config_error_to_string(err: config.ConfigError) -> String
pub fn configure(
  cfg: config.Config,
) -> Result(Nil, config.ConfigError)

Set global runtime configuration. Call once at application startup.

pub fn connect(node: String) -> Result(Nil, cluster.ConnectError)
pub fn connect_error_to_string(
  err: cluster.ConnectError,
) -> String
pub fn decode_error_to_string(err: codec.DecodeError) -> String
pub fn encode_error_to_string(err: codec.EncodeError) -> String
pub fn get_config() -> config.Config

Read the current global configuration (or defaults if never configured).

pub fn has_peers() -> Bool

Whether this node has at least one connected peer. Not a health check. A single-node deployment is operationally fine and will return False here. Use this only to gate cluster-wide operations.

pub fn health() -> cluster.ClusterHealth

Full cluster health snapshot with parallel pings.

pub fn install_telemetry(sink: fn(telemetry.Event) -> Nil) -> Nil

Install (or replace) the global telemetry sink for observability. This single opt-in sink receives all load-bearing events from the library (registry, atom budget, payload limits, codec failures, timeouts). See distribute/telemetry for the full semantics and event structure.

pub fn is_distributed() -> Bool

Whether this node is running BEAM distribution (via erlang:is_alive/0).

pub fn lookup(
  typed_name: registry.TypedName(msg),
) -> Result(global.GlobalSubject(msg), Nil)

Look up a GlobalSubject by its TypedName. For polling variants (blocking and async) see registry.lookup_with_timeout / registry.lookup_async.

pub fn lookup_error_to_string(
  err: registry.LookupError,
) -> String
pub fn named(
  name: String,
  c: codec.Codec(msg),
) -> registry.TypedName(msg)

Create a TypedName from a bundled Codec.

let counter = distribute.named("counter", codec.int())
pub fn new_subject(
  c: codec.Codec(msg),
) -> global.GlobalSubject(msg)

Create a new GlobalSubject owned by the current process, from a bundled Codec. For separate encoder/decoder, drop down to global.new directly.

pub fn node_start_error_to_string(
  err: cluster.StartError,
) -> String
pub fn nodes() -> List(String)
pub fn pool(
  typed_name: registry.TypedName(msg),
  size: Int,
  initial_state: state,
  handler: fn(msg, state) -> receiver.HandlerStep(state),
) -> Result(process.Pid, actor.StartError)

Start N supervised actors using config.get().default_init_timeout_ms.

pub fn pool_with_timeout(
  typed_name: registry.TypedName(msg),
  size: Int,
  initial_state: state,
  handler: fn(msg, state) -> receiver.HandlerStep(state),
  init_timeout_ms: Int,
) -> Result(process.Pid, actor.StartError)

Like pool, with an explicit init timeout.

pub fn receive(
  subject: global.GlobalSubject(msg),
) -> Result(msg, codec.DecodeError)

Receive a typed message using config.get().default_call_timeout_ms. Use receive_with_timeout/2 for an explicit timeout.

pub fn receive_error_to_string(
  err: receiver.ReceiveError,
) -> String
pub fn receive_with_timeout(
  subject: global.GlobalSubject(msg),
  timeout_ms: Int,
) -> Result(msg, codec.DecodeError)

Like receive, with an explicit timeout.

pub fn register(
  typed_name: registry.TypedName(msg),
  subject: global.GlobalSubject(msg),
) -> Result(Nil, registry.RegisterError)

Register a GlobalSubject under its TypedName. The typed pair is the recommended path; for raw-PID registration import distribute/registry and call registry.register/2 directly.

pub fn register_error_to_string(
  err: registry.RegisterError,
) -> String
pub fn reply(
  reply_to: process.Subject(BitArray),
  response: resp,
  encoder: fn(resp) -> Result(BitArray, codec.EncodeError),
) -> Result(Nil, global.SendError)

Send a response through a reply subject. Used by handlers to answer a call.

pub fn self_node() -> String
pub fn send(
  subject: global.GlobalSubject(msg),
  message: msg,
) -> Result(Nil, global.SendError)
pub fn send_error_to_string(err: global.SendError) -> String
pub fn start_actor(
  typed_name: registry.TypedName(msg),
  initial_state: state,
  handler: fn(msg, state) -> receiver.HandlerStep(state),
) -> Result(global.GlobalSubject(msg), actor.StartError)

Start a named actor using config.get().default_init_timeout_ms. Use start_actor_with_timeout/4 for an explicit timeout.

pub fn start_actor_observed(
  typed_name: registry.TypedName(msg),
  initial_state: state,
  handler: fn(msg, state) -> receiver.HandlerStep(state),
  on_decode_error: fn(codec.DecodeError) -> Nil,
) -> Result(global.GlobalSubject(msg), actor.StartError)

Start a named actor with a decode-error callback. Useful for logging or metering malformed messages across nodes (e.g. during rolling deploys with mismatched codec versions). Uses config.get().default_init_timeout_ms. If you need a custom init timeout, drop down to actor.start_observed directly.

pub fn start_actor_with_timeout(
  typed_name: registry.TypedName(msg),
  initial_state: state,
  handler: fn(msg, state) -> receiver.HandlerStep(state),
  init_timeout_ms: Int,
) -> Result(global.GlobalSubject(msg), actor.StartError)

Start a named actor with an explicit init timeout.

pub fn start_monitor() -> Result(
  process.Subject(cluster_monitor.Message),
  actor.StartError,
)

Start the cluster monitor actor. It listens for Erlang node events and broadcasts them to all Gleam subscribers.

pub fn start_node(
  name: String,
  cookie: String,
) -> Result(Nil, cluster.StartError)

Start a distributed BEAM node.

name must contain @ (e.g. "myapp@127.0.0.1"). cookie must be [a-zA-Z0-9_-]+ and 1..255 bytes (validated by FFI).

pub fn start_registered(
  typed_name: registry.TypedName(msg),
  initial_state: state,
  handler: fn(msg, state) -> receiver.HandlerStep(state),
) -> Result(global.GlobalSubject(msg), actor.StartRegisteredError)

Start an actor and register it globally in one step. Uses config.get().default_init_timeout_ms. Use start_registered_with_timeout/4 for an explicit timeout, or actor.start_registered_observed for the decode-error hook variant.

pub fn start_registered_error_to_string(
  err: actor.StartRegisteredError,
) -> String
pub fn start_registered_with_timeout(
  typed_name: registry.TypedName(msg),
  initial_state: state,
  handler: fn(msg, state) -> receiver.HandlerStep(state),
  init_timeout_ms: Int,
) -> Result(global.GlobalSubject(msg), actor.StartRegisteredError)

Like start_registered, with an explicit init timeout.

pub fn start_supervised(
  typed_name: registry.TypedName(msg),
  initial_state: state,
  handler: fn(msg, state) -> receiver.HandlerStep(state),
) -> Result(process.Pid, actor.StartError)

Start a supervised actor using config.get().default_init_timeout_ms.

pub fn start_supervised_with_timeout(
  typed_name: registry.TypedName(msg),
  initial_state: state,
  handler: fn(msg, state) -> receiver.HandlerStep(state),
  init_timeout_ms: Int,
) -> Result(process.Pid, actor.StartError)

Like start_supervised, with an explicit init timeout.

pub fn unregister(
  name: String,
) -> Result(Nil, registry.UnregisterError)

Unregister a globally registered name. Idempotent cleanup paths can let _ = unregister(name).

pub fn unregister_error_to_string(
  err: registry.UnregisterError,
) -> String
pub fn unregister_typed(
  typed_name: registry.TypedName(msg),
) -> Result(Nil, registry.UnregisterError)

Type-safe sibling of unregister/1: pulls the name string from the TypedName the caller already holds. Recommended for graceful shutdown paths so the cleanup site never has to hardcode the name.

pub fn version() -> String

Library version, hardcoded. Must stay in sync with gleam.toml at release time. Gleam has no compile-time API to read the manifest, so this is a release-time discipline, not a runtime invariant.

Search Document