distribute/receiver

Types

Result of a typed message handler. Renamed in v4.0.0 from Next so it no longer collides with gleam/otp/actor.Next when both modules are imported unqualified.

pub type HandlerStep(state) {
  Continue(state)
  Stop
  StopAbnormal(reason: String)
}

Constructors

  • Continue(state)
  • Stop
  • StopAbnormal(reason: String)
pub type ReceiveError {
  DecodeError(codec.DecodeError)
  ReceiveTimeout
}

Constructors

  • DecodeError(codec.DecodeError)
  • ReceiveTimeout

    Receive timed out before a message arrived. Renamed in v4.0.0 from Timeout to disambiguate from global.CallError.Timeout.

Values

pub fn receive_error_to_string(err: ReceiveError) -> String
pub fn receive_typed(
  subject: process.Subject(BitArray),
  decoder: fn(BitArray) -> Result(a, codec.DecodeError),
  timeout_ms: Int,
) -> Result(a, ReceiveError)

Receive and decode one message.

Creates a fresh selector on each call. For tight loops, prefer building a selector once with selecting_typed and reusing it with process.selector_receive.

Rejects payloads that exceed config.get().max_payload_size_bytes with Error(DecodeError(PayloadTooLarge(size))). The decoder is never invoked on oversized binaries.

Negative timeout_ms is clamped to 0 (poll-once-and-return). Erlang’s receive after Timeout -> ... would otherwise raise timeout_value and crash the caller; a typed ReceiveTimeout is a better contract.

pub fn selecting_typed(
  selector: process.Selector(b),
  subject: process.Subject(BitArray),
  decoder: fn(BitArray) -> Result(a, codec.DecodeError),
  mapper: fn(Result(a, ReceiveError)) -> b,
) -> process.Selector(b)

Add a typed handler to a Selector.

Rejects payloads exceeding config.get().max_payload_size_bytes with Error(DecodeError(PayloadTooLarge(size))). The decoder is never invoked on oversized binaries.

pub fn start_distributed_worker(
  name: String,
  initial_state: state,
  encoder: fn(msg) -> Result(BitArray, codec.EncodeError),
  decoder: fn(BitArray) -> Result(msg, codec.DecodeError),
  handler: fn(msg, state) -> HandlerStep(state),
  init_timeout_ms: Int,
) -> Result(
  actor.Started(global.GlobalSubject(msg)),
  actor.StartError,
)

Start a distributed actor whose subject carries a deterministic name-based tag. Remote nodes can reconstruct the subject via registry.lookup.

Decode errors (including PayloadTooLarge) are silently dropped; use start_distributed_worker_observed to log or meter them.

init_timeout_ms is the OTP initialiser timeout passed to actor.new_with_initialiser.

pub fn start_distributed_worker_observed(
  name: String,
  initial_state: state,
  encoder: fn(msg) -> Result(BitArray, codec.EncodeError),
  decoder: fn(BitArray) -> Result(msg, codec.DecodeError),
  handler: fn(msg, state) -> HandlerStep(state),
  on_decode_error: fn(codec.DecodeError) -> Nil,
  init_timeout_ms: Int,
) -> Result(
  actor.Started(global.GlobalSubject(msg)),
  actor.StartError,
)

Like start_distributed_worker but calls on_decode_error for every binary that fails decoding. Use this to log codec mismatches across nodes (e.g. during rolling deploys).

Oversized payloads (over config.get().max_payload_size_bytes) trigger on_decode_error(PayloadTooLarge(size)) and are NOT forwarded to the decoder. Critical defence against cross-node OOM attacks.

Non-Subject mailbox terms (raw erlang:send(global:whereis_name(...), garbage) from any cluster node) are silently dropped via select_other. Without this drain the unmatched terms would accumulate in the mailbox forever and pay the linear selective- receive penalty on every subsequent message. A remote DoS vector open to any peer that knows the registered name.

pub fn start_receiver(
  initial_state: state,
  decoder: fn(BitArray) -> Result(msg, codec.DecodeError),
  handler: fn(msg, state) -> HandlerStep(state),
) -> Result(process.Subject(BitArray), actor.StartError)

Start an OTP actor that decodes binary messages and forwards them to handler. Returns the raw Subject(BitArray).

Decode errors (including PayloadTooLarge) are silently dropped. Use start_receiver_observed to log or meter malformed messages.

pub fn start_receiver_observed(
  initial_state: state,
  decoder: fn(BitArray) -> Result(msg, codec.DecodeError),
  handler: fn(msg, state) -> HandlerStep(state),
  on_decode_error: fn(codec.DecodeError) -> Nil,
) -> Result(process.Subject(BitArray), actor.StartError)

Like start_receiver but calls on_decode_error for every binary that fails decoding, before continuing with the current state.

Oversized payloads (over config.get().max_payload_size_bytes) trigger on_decode_error(PayloadTooLarge(size)) and are NOT forwarded to the decoder. This protects the actor from OOM on malicious or buggy senders.

Non-Subject mailbox terms (raw Pid ! garbage from anywhere) are silently dropped via select_other so they cannot accumulate and pay the selective-receive penalty.

Search Document