distribute/telemetry
Single-event observability sink.
distribute emits structured Event values at every load-bearing
boundary: registry operations, atom-budget exhaustion, payload
rejection, call lifecycle, decode failures, orphan cleanup. The
sink is opt-in: with no sink installed, every emit point is a
branch on a missing ETS entry and a return. Microseconds, lock-
free, zero allocation.
Why one global sink instead of _observed variants
_observed start variants already cover decode errors at the
per-actor boundary. They are kept for that use case, where the
hook needs to capture the exact handler context. The telemetry
sink is for cross-cutting observability: a single subscriber that
wants visibility into all registry/cluster/payload/call events
without instrumenting every call site.
No structured error types in Event
Event variants carry stringified failure reasons (via
*_error_to_string) instead of structured RegisterError /
DecodeError / UnregisterError references. Two reasons:
- No import cycles.
telemetryis a leaf module that every other module can depend on; the moment we re-exportedregistry.RegisterErrorhere,registry → telemetry → registrywould close. Stringified reasons keeptelemetryindependent. - Operationally, downstream sinks log/meter strings anyway.
A
Prometheusexporter increments a counter labelled with the failure kind; a structured logger writes the rendered message. Pattern-matching on a typed sum is rarely the right shape for observability output.
Building on top
Downstream packages (distribute_telemetry, distribute_audit,
distribute_metrics, distribute_otel) install a single sink and
fan out from there. The contract is intentionally narrow: Event
is a pure data sum, the sink is fn(Event) -> Nil. No subscription
manager, no filtering, no async fan-out. Compose with whatever
your stack already has.
Example
import distribute/telemetry
import gleam/io
import gleam/int
pub fn main() {
telemetry.install(fn(event) {
case event {
telemetry.ActorRegistered(name) ->
io.println("registered: " <> name)
telemetry.PayloadRejected(size, _, _) ->
io.println("payload rejected: " <> int.to_string(size))
_ -> Nil
}
})
// ... rest of your app
}
Contract for downstream consumers
Stability. Event variants are append-only within a major
version: a 4.x release will never remove or rename a variant, but
it may add new ones. Downstream sinks must include a _ -> Nil
catch-all to stay forward-compatible across minor releases. New
variants will appear in CHANGELOG.md under “Added”.
Field semantics, by event:
ActorRegistered(name),ActorRegistrationFailed(name, reason),ActorUnregistered(name, removed):nameis the binary name string the caller passed;reasonis the renderedregister_error_to_string;removedisTrueonly when:globalactually removed the entry on this VM (i.e. local ownership ACL passed andwhereissucceeded).AtomBudgetExhausted(attempted_input, where):attempted_inputis the raw binary the caller passed (treat as untrusted: log bytes, never feed back to atom creation).wheredistinguishesconnect/start_node/ping. All three paths emit on budget refusal;ping’s emit is wired at the FFI layer (cluster_ffi:ping/1) because its public Gleam return isBoolwith no error variant to carry the typed refusal.PayloadRejected(size_bytes, cap_bytes, where):size_bytesis the encoded payload size (post-codec), not the source value’s heap size.cap_bytesis the activeconfig.max_payload_size_bytesat the moment of rejection.DecodeFailed(error_message, where):error_messageis the rendereddecode_error_to_string.wheredistinguishes one-shotreceive/callreply / actor-inbound paths.CallTimedOut(elapsed_ms):elapsed_msis the effective timeout used by the receive path (user input clamped to non-negative), not the real wall-clock elapsed (the BEAM does not expose that without monotonic measurement at the call site). For real elapsed measurement, time around the call in the sink itself. Forcall_isolated, treat this as at-least-once telemetry: in a timeout race you may observe twoCallTimedOutemits for one user call (innercalltimeout + outer caller-side timeout).CallTargetDowncarries no payload. Two emit paths fold here: the caller observed a dead target at call time, or the monitor fired during the wait.OrphanKillEscalated(pid): the carried Pid is the orphan actor that ignoredexit(_, shutdown)past the grace window and was force-killed. Operationally significant: log and alert.
Concurrency model. emit/1 runs the installed sink inline
in the calling process. There is no buffering, no async
dispatch, and no retry. If the sink raises, the FFI catches the
exception and logs it via logger:warning/2 (event tag plus
stack trace, no payload, so sensitive attempted_input strings
do not leak into shared logs). The library-internal process that
emitted the event survives. This protects actors, the call
selector, and the call_isolated proxy from being taken down by
a buggy sink, while still surfacing the bug operationally.
Production sinks should still be written fail-fast and total:
the catch is a safety net for shipped bugs, not a license to
raise. Async fan-out (routing to a metrics aggregator) is best
done by process.send-ing the event to a background worker
from inside the sink itself.
Visibility model. install/1 is last-wins, the most recent
caller replaces the previous sink. There is no subscription list,
no priority, no filter chain. A library that wants multiple
downstream consumers (e.g. logger + metrics + audit) installs
one sink that fans out internally to whatever it cares about.
Storage. Sink reference is held in an ETS table
(distribute_telemetry_sink_table, public, read_concurrency).
Reads on the hot path are lock-free; writes (install/reset)
trigger no global GC. Production code is expected to call
install exactly once at boot.
Types
Where in the system an atom-budget reservation was refused.
pub type AtomBudgetOrigin {
AtomBudgetOnConnect
AtomBudgetOnPing
AtomBudgetOnStartNode
}
Constructors
-
AtomBudgetOnConnect -
AtomBudgetOnPing -
AtomBudgetOnStartNode
Where in the system a decode failure happened.
pub type DecodeOrigin {
DecodeOnReceive
DecodeOnCallReply
DecodeOnActorInbound
}
Constructors
-
DecodeOnReceive -
DecodeOnCallReply -
DecodeOnActorInbound
A structured observability event emitted by distribute at every
load-bearing boundary. Pattern-match in your sink to project the
fields you care about and drop the rest.
Variants are added in minor versions; downstream sinks should
always have a _ -> Nil catch-all to stay forward-compatible.
pub type Event {
ActorRegistered(name: String)
ActorRegistrationFailed(name: String, reason: String)
ActorUnregistered(name: String, removed: Bool)
AtomBudgetExhausted(
attempted_input: String,
where: AtomBudgetOrigin,
)
PayloadRejected(
size_bytes: Int,
cap_bytes: Int,
where: PayloadOrigin,
)
DecodeFailed(error_message: String, where: DecodeOrigin)
CallTimedOut(elapsed_ms: Int)
CallTargetDown
CallProxyCrashed
OrphanKillEscalated(pid: process.Pid)
ConflictResolved(
name: String,
winner: option.Option(process.Pid),
)
ConflictResolverFailed(name: String, reason: String)
}
Constructors
-
ActorRegistered(name: String)A
:globalregistration succeeded forname. -
ActorRegistrationFailed(name: String, reason: String)A
:globalregistration attempt failed.reasonis the renderedregister_error_to_string; the library kills the orphan actor automatically. -
ActorUnregistered(name: String, removed: Bool):globalunregisterreturned a result.removedisTruewhen the entry was successfully removed;FalseforNotFound/NotOwnedoutcomes. -
AtomBudgetExhausted( attempted_input: String, where: AtomBudgetOrigin, )cluster.connect/ping/start_noderejected a fresh-atom reservation pastconfig.max_distribution_atoms. -
PayloadRejected( size_bytes: Int, cap_bytes: Int, where: PayloadOrigin, )A typed boundary refused a payload that exceeded
config.max_payload_size_bytes.cap_bytesis the active limit at the moment of rejection;wheredistinguishes which boundary. -
DecodeFailed(error_message: String, where: DecodeOrigin)A binary failed
decode_checkedor a top-level decoder.error_messageis the rendereddecode_error_to_string. -
CallTimedOut(elapsed_ms: Int)A
callreturnedError(Timeout).elapsed_msis the effective timeout used by the receive path (input clamped to non-negative) Not the real elapsed time, which the BEAM does not expose without monotonic measurement at the call site. Forcall_isolated, emits are at-least-once: a timeout race can produce twoCallTimedOutevents for one user call. -
CallTargetDownA
callreturnedError(TargetDown). The target was either dead at call time or died during the call. -
CallProxyCrashedcall_isolated’s proxy process died before sending its result. The caller still seesError(Timeout)for typed-error consistency, but this event distinguishes the cause: a crash insidemake_request, the response codec, or another component evaluated by the proxy. Operationally significant: a steady stream of these events points at user-side bugs, not at network or peer-side latency. -
OrphanKillEscalated(pid: process.Pid)terminate_orphan_gracefullyexhausted the shutdown grace window and escalated toprocess.kill. Operationally significant: orphan kill means a registered actor either trapped exits and failed to unregister, or registration races outpaced cleanup. -
ConflictResolved( name: String, winner: option.Option(process.Pid), ):globalinvoked the conflict resolver because the same name was claimed by two PIDs (typically after a partition heal).winneris the PID the resolver kept;Some(pid)forKeep(pid),NoneforKillBoth. Operationally critical: a steady stream of these events means the cluster is “flapping”. partitions are healing repeatedly. One-off events after a known network incident are normal; high frequency points at infrastructure instability that the application code cannot fix on its own. -
ConflictResolverFailed(name: String, reason: String)The user-supplied conflict resolver crashed or ran past its timeout.
reasoncarries the rendered exception class+reason or the literal"resolver timed out". The library applied a deterministic fallback (lowest term-ordered PID wins) so the global_name_server worker never blocks; the fallback’s pick is emitted as a separateConflictResolvedevent right after.
A telemetry sink. Pure side-effect from the library’s perspective: the return value is discarded. The sink runs inline in the emit point’s process, so a slow sink slows the emit point. Write async fan-out into the sink itself if you need it.
pub type EventSink =
fn(Event) -> Nil
Where in the system a payload was rejected for size.
pub type PayloadOrigin {
PayloadOnSend
PayloadOnCallRequest
PayloadOnCallResponse
PayloadOnReply
PayloadOnReceive
PayloadOnActorInbound
}
Constructors
-
PayloadOnSend -
PayloadOnCallRequest -
PayloadOnCallResponse -
PayloadOnReply -
PayloadOnReceive -
PayloadOnActorInbound
Values
pub fn install(sink: fn(Event) -> Nil) -> Nil
Install (or replace) the telemetry sink. Last-wins, a second
install call replaces the previous sink without warning. This is
intentional: tests that swap sinks between cases stay terse, and
production code should call install exactly once at boot.