distribute/receiver
Types
Result of a typed message handler. Renamed in v4.0.0 from Next so
it no longer collides with gleam/otp/actor.Next when both modules
are imported unqualified.
pub type HandlerStep(state) {
Continue(state)
Stop
StopAbnormal(reason: String)
}
Constructors
-
Continue(state) -
Stop -
StopAbnormal(reason: String)
pub type ReceiveError {
DecodeError(codec.DecodeError)
ReceiveTimeout
}
Constructors
-
DecodeError(codec.DecodeError) -
ReceiveTimeoutReceive timed out before a message arrived. Renamed in v4.0.0 from
Timeoutto disambiguate fromglobal.CallError.Timeout.
Values
pub fn receive_error_to_string(err: ReceiveError) -> String
pub fn receive_typed(
subject: process.Subject(BitArray),
decoder: fn(BitArray) -> Result(a, codec.DecodeError),
timeout_ms: Int,
) -> Result(a, ReceiveError)
Receive and decode one message.
Creates a fresh selector on each call. For tight loops, prefer building
a selector once with selecting_typed and reusing it with
process.selector_receive.
Rejects payloads that exceed config.get().max_payload_size_bytes with
Error(DecodeError(PayloadTooLarge(size))). The decoder is never invoked
on oversized binaries.
Negative timeout_ms is clamped to 0 (poll-once-and-return). Erlang’s
receive after Timeout -> ... would otherwise raise timeout_value
and crash the caller; a typed ReceiveTimeout is a better contract.
pub fn selecting_typed(
selector: process.Selector(b),
subject: process.Subject(BitArray),
decoder: fn(BitArray) -> Result(a, codec.DecodeError),
mapper: fn(Result(a, ReceiveError)) -> b,
) -> process.Selector(b)
Add a typed handler to a Selector.
Rejects payloads exceeding config.get().max_payload_size_bytes with
Error(DecodeError(PayloadTooLarge(size))). The decoder is never invoked
on oversized binaries.
pub fn start_distributed_worker(
name: String,
initial_state: state,
encoder: fn(msg) -> Result(BitArray, codec.EncodeError),
decoder: fn(BitArray) -> Result(msg, codec.DecodeError),
handler: fn(msg, state) -> HandlerStep(state),
init_timeout_ms: Int,
) -> Result(
actor.Started(global.GlobalSubject(msg)),
actor.StartError,
)
Start a distributed actor whose subject carries a deterministic name-based
tag. Remote nodes can reconstruct the subject via registry.lookup.
Decode errors (including PayloadTooLarge) are silently dropped; use
start_distributed_worker_observed to log or meter them.
init_timeout_ms is the OTP initialiser timeout passed to
actor.new_with_initialiser.
pub fn start_distributed_worker_observed(
name: String,
initial_state: state,
encoder: fn(msg) -> Result(BitArray, codec.EncodeError),
decoder: fn(BitArray) -> Result(msg, codec.DecodeError),
handler: fn(msg, state) -> HandlerStep(state),
on_decode_error: fn(codec.DecodeError) -> Nil,
init_timeout_ms: Int,
) -> Result(
actor.Started(global.GlobalSubject(msg)),
actor.StartError,
)
Like start_distributed_worker but calls on_decode_error for every
binary that fails decoding. Use this to log codec mismatches across nodes
(e.g. during rolling deploys).
Oversized payloads (over config.get().max_payload_size_bytes) trigger
on_decode_error(PayloadTooLarge(size)) and are NOT forwarded to the
decoder. Critical defence against cross-node OOM attacks.
Non-Subject mailbox terms (raw erlang:send(global:whereis_name(...), garbage) from any cluster node) are silently dropped via
select_other. Without this drain the unmatched terms would
accumulate in the mailbox forever and pay the linear selective-
receive penalty on every subsequent message. A remote DoS vector
open to any peer that knows the registered name.
pub fn start_receiver(
initial_state: state,
decoder: fn(BitArray) -> Result(msg, codec.DecodeError),
handler: fn(msg, state) -> HandlerStep(state),
) -> Result(process.Subject(BitArray), actor.StartError)
Start an OTP actor that decodes binary messages and forwards
them to handler. Returns the raw Subject(BitArray).
Decode errors (including PayloadTooLarge) are silently dropped.
Use start_receiver_observed to log or meter malformed messages.
pub fn start_receiver_observed(
initial_state: state,
decoder: fn(BitArray) -> Result(msg, codec.DecodeError),
handler: fn(msg, state) -> HandlerStep(state),
on_decode_error: fn(codec.DecodeError) -> Nil,
) -> Result(process.Subject(BitArray), actor.StartError)
Like start_receiver but calls on_decode_error for every binary
that fails decoding, before continuing with the current state.
Oversized payloads (over config.get().max_payload_size_bytes) trigger
on_decode_error(PayloadTooLarge(size)) and are NOT forwarded to the
decoder. This protects the actor from OOM on malicious or buggy senders.
Non-Subject mailbox terms (raw Pid ! garbage from anywhere) are
silently dropped via select_other so they cannot accumulate and
pay the selective-receive penalty.