distribute/global

Types

pub type CallError {
  Timeout
  TargetDown
  CallEncodeFailed(codec.EncodeError)
  CallDecodeFailed(codec.DecodeError)
  CallPayloadTooLarge(Int)
}

Constructors

  • Timeout

    No reply arrived within the timeout.

  • TargetDown

    The target process was not alive at call time, or died before replying.

  • CallEncodeFailed(codec.EncodeError)

    Encoding the request failed.

  • CallDecodeFailed(codec.DecodeError)

    The response could not be decoded.

  • CallPayloadTooLarge(Int)

    The request or response payload exceeds config.max_payload_size_bytes. The Int is the actual byte size of the offending payload.

A subject bundled with its codec, usable across nodes.

pub opaque type GlobalSubject(msg)
pub type SendError {
  SendEncodeFailed(codec.EncodeError)
  PayloadTooLarge(Int)
}

Constructors

  • SendEncodeFailed(codec.EncodeError)

    The encoder returned an error.

  • PayloadTooLarge(Int)

    The encoded payload exceeds config.max_payload_size_bytes. The Int is the actual byte size.

Values

pub fn call(
  target: GlobalSubject(req),
  make_request: fn(process.Subject(BitArray)) -> req,
  response_decoder: fn(BitArray) -> Result(
    resp,
    codec.DecodeError,
  ),
  timeout_ms: Int,
) -> Result(resp, CallError)

Synchronous request/response with monitor-based TargetDown detection.

See also: call_default/3 (uses configured timeout), call_isolated/4 (mailbox-safe variant for long-running callers).

Flow:

  1. Resolve target PID and set a monitor. Dead target → TargetDown
  2. Encode request. Failure → CallEncodeFailed
  3. Size-check request. Oversized → CallPayloadTooLarge
  4. Send request
  5. Wait for reply OR monitor Down. Timeout → Timeout, Down → TargetDown
  6. Size-check response. Oversized → CallPayloadTooLarge
  7. Decode response. Failure → CallDecodeFailed

Late-reply caveat (read carefully)

drain_reply clears reply messages already in the caller’s mailbox at the moment the timeout fires, but the BEAM has no API (without erlang:alias/0-aware Subjects, which would require bypassing gleam_erlang’s Subject layout) to drop messages that arrive after call returns. A late reply lands in the caller’s mailbox tagged with the orphan reply Subject and stays there forever. no selector ever matches it again.

gleam/otp/actor is NOT immune. Its receive loop is built on process.selector_receive, so unmatched messages are not dropped: they accumulate in the actor’s mailbox. The BEAM then pays the “selective receive penalty”. Every subsequent selector_receive scans past every orphan first. A long-running actor that issues thousands of timed-out calls will gradually grind to a halt and can OOM.

The only safe mitigations:

  • Short-lived callers (CLI tools, scripts, request handlers that exit after returning their response): orphan messages die with the process. call directly is fine.
  • High-volume callers, including long-running OTP actors that issue many calls: isolate every call in a short-lived proxy process. The proxy makes the call, forwards the result to the real caller via a one-shot Subject, then exits. Its mailbox (and any orphan reply) is reaped. See the “Isolated call” recipe in docs/recipes.md. This is the only design that bounds the caller’s mailbox under sustained timeouts.
pub fn call_default(
  target: GlobalSubject(req),
  make_request: fn(process.Subject(BitArray)) -> req,
  response_decoder: fn(BitArray) -> Result(
    resp,
    codec.DecodeError,
  ),
) -> Result(resp, CallError)

Like call, but uses config.get().default_call_timeout_ms as the timeout.

pub fn call_error_to_string(error: CallError) -> String
pub fn call_isolated(
  target: GlobalSubject(req),
  make_request: fn(process.Subject(BitArray)) -> req,
  response_decoder: fn(BitArray) -> Result(
    resp,
    codec.DecodeError,
  ),
  timeout_ms: Int,
) -> Result(resp, CallError)
pub fn call_isolated_default(
  target: GlobalSubject(req),
  make_request: fn(process.Subject(BitArray)) -> req,
  response_decoder: fn(BitArray) -> Result(
    resp,
    codec.DecodeError,
  ),
) -> Result(resp, CallError)

Mailbox-safe variant of call: each invocation runs the actual call inside a short-lived proxy process. The caller waits on a one-shot result Subject; when the proxy exits, its mailbox and any orphan late-reply tagged with the proxy-owned reply Subject are reaped by the BEAM at zero cost.

Use this from long-running callers that issue many calls and would otherwise accumulate orphan messages under sustained timeouts: see the “Late-reply caveat” on call/4. For one-shot callers (CLI, scripts) and short-lived processes the plain call/4 is cheaper.

The proxy is spawned unlinked and tracked via process.monitor. An earlier draft used process.spawn (linked); a linked proxy that somehow exits abnormally would propagate the exit signal back to the caller, defeating the isolation guarantee. With unlinked + monitor, the caller observes either the result (Ok(...)), an unexpected proxy DOWN (returned as Error(Timeout)), or its own receive timeout buffer expiring. The inner call could not produce a reply. On caller-side timeout, call_isolated now kills the proxy, waits for its DOWN, and only then drains the result Subject; this closes the late-send race where a still-running proxy could otherwise pollute the caller mailbox after return. In every path the caller stays alive.

Costs: one extra process spawn + one cross-process hop per call. On the BEAM this is microseconds. Like call_isolated, with config.get().default_call_timeout_ms.

pub fn decoder(
  global: GlobalSubject(msg),
) -> fn(BitArray) -> Result(msg, codec.DecodeError)
pub fn encoder(
  global: GlobalSubject(msg),
) -> fn(msg) -> Result(BitArray, codec.EncodeError)
pub fn from_pid(
  pid: process.Pid,
  encoder: fn(msg) -> Result(BitArray, codec.EncodeError),
  decoder: fn(BitArray) -> Result(msg, codec.DecodeError),
) -> GlobalSubject(msg)

Send-only. A GlobalSubject built via from_pid must only be used with send. Calling receive, call, or call_isolated on it is a logic error: the shared Nil tag collides with the actor’s own mailbox selectors and produces silent message interleaving. For any bidirectional path, register the actor and use registry.lookup (which goes through the internal unsafe_from_name constructor with a verified codec pairing).

When to use

Diagnostic and probe paths only. For example, calling on a PID you just confirmed dead to test TargetDown semantics. Production messaging should always go through registry.lookup.

What goes wrong otherwise

All subjects produced by from_pid on the same PID share the same Nil tag. The BEAM does not distinguish them: messages sent through any one of them all land in the same mailbox slot, interleaved with the actor’s default subject. Two from_pid Subjects with different msg types pointing at the same actor will silently mix on the wire. The compiler catches the type mismatch only if you use the same GlobalSubject value at both ends. This is exactly what from_pid makes hard to guarantee.

The unsafe_from_name path used by registry.lookup avoids this: each name produces a distinct deterministic tag, so messages routed through the registry cannot collide with each other or with the actor’s own selectors.

pub fn from_subject(
  subject: process.Subject(BitArray),
  encoder: fn(msg) -> Result(BitArray, codec.EncodeError),
  decoder: fn(BitArray) -> Result(msg, codec.DecodeError),
) -> GlobalSubject(msg)

Wrap an existing Subject(BitArray), keeping its tag.

pub fn new(
  encoder: fn(msg) -> Result(BitArray, codec.EncodeError),
  decoder: fn(BitArray) -> Result(msg, codec.DecodeError),
) -> GlobalSubject(msg)

New subject owned by the current process, with a unique tag.

pub fn owner(
  global: GlobalSubject(msg),
) -> Result(process.Pid, Nil)
pub fn receive(
  global: GlobalSubject(msg),
  timeout_ms: Int,
) -> Result(msg, codec.DecodeError)

Receive and decode a message. Only works on subjects you own.

Returns Error(codec.PayloadTooLarge(size)) when the received binary exceeds config.get().max_payload_size_bytes. Decode is skipped.

Negative timeout_ms is clamped to 0 (poll-once-and-return) rather than propagated to Erlang’s receive after Timeout clause, which would raise timeout_value and crash the caller. A bug at the call site that produces a negative number surfaces as an immediate DecodeTimeout instead of a process exit.

pub fn receive_default(
  global: GlobalSubject(msg),
) -> Result(msg, codec.DecodeError)

Like receive, but uses config.get().default_call_timeout_ms as the timeout. Receive and call share the same “wait for a message” semantics, so they share the configured default rather than introducing a third timeout knob.

pub fn reply(
  reply_to: process.Subject(BitArray),
  response: resp,
  encoder: fn(resp) -> Result(BitArray, codec.EncodeError),
) -> Result(Nil, SendError)

Send a response through a reply subject. Used by the handler to answer a call.

Returns Error(PayloadTooLarge(size)) when the encoded payload exceeds config.get().max_payload_size_bytes. The message is never sent.

pub fn send(
  global: GlobalSubject(msg),
  message: msg,
) -> Result(Nil, SendError)

Encode and send a message.

Returns Error(PayloadTooLarge(size)) when the encoded payload exceeds config.get().max_payload_size_bytes. The message is never enqueued.

Fire-and-forget semantics

Ok(Nil) means the BEAM accepted the message for local dispatch. It does not mean the target received it. The receiver may have died after a successful lookup, the inter-node connection may drop before the payload is on the wire, the receiving node may panic before its mailbox is read. Successful send is the BEAM contract for erlang:send/2: “queued for delivery”, not “delivered”.

If you need delivery confirmation, use call (synchronous, monitor-backed: Error(TargetDown) fires immediately when the PID is dead, Error(Timeout) fires when the reply does not arrive in time). For higher-level reliability (at-least-once, idempotency tokens, durable queues) build it on top of call or send plus your own ack protocol. The library does not bake retries in because the right strategy is application- specific.

pub fn send_error_to_string(error: SendError) -> String
pub fn subject(
  global: GlobalSubject(msg),
) -> process.Subject(BitArray)
Search Document