The pure core behind the Messages panel: a bounded record of messages moving
through a network. Each event is one edge traversal (a [:bloccs, :emit]
correlated with the emitting node's outcome and latency) or a failure/drop.
Keeps a ring of the most recent events for the activity feed and per-second
buckets for the throughput chart. Like the rest of the collector core it owns
no clock — now (ms) is passed in — so it is deterministically testable.
Summary
Types
A normalized flow event (before timestamping). msg_id / parents /
trace_id are the emitted message's Bloccs.Lineage (bloccs 0.5+): msg_id
is this message, parents the input id(s) that caused it (many on a fan-in),
trace_id the root correlation. They let one message be tracked across hops.
Functions
The lineage journey of msg_id: every recorded event in its connected
causal component — ancestors (via parents) and descendants (events that list
it as a parent), transitively — ordered oldest-first. Branches and merges to
match the topology, so a fan-in (batch/join) journey includes all the inputs
that were combined. Bounded by what is still in the recent ring.
A snapshot for the panel: the recent events (newest first), a per-second series for the chart, and the current rate (events in the last full second).
Types
@type event() :: %{ node: atom(), out_port: atom() | nil, to: endpoint() | nil, outcome: outcome(), duration_ms: number() | nil, reason: term() | nil, payload: String.t() | nil, msg_id: pos_integer() | nil, parents: [pos_integer()], trace_id: pos_integer() | nil }
A normalized flow event (before timestamping). msg_id / parents /
trace_id are the emitted message's Bloccs.Lineage (bloccs 0.5+): msg_id
is this message, parents the input id(s) that caused it (many on a fan-in),
trace_id the root correlation. They let one message be tracked across hops.
@type outcome() :: :ok | :failed | :dropped | :skipped | :retry | :dispatch_error
Functions
@spec journey([map()], pos_integer() | nil) :: [map()]
The lineage journey of msg_id: every recorded event in its connected
causal component — ancestors (via parents) and descendants (events that list
it as a parent), transitively — ordered oldest-first. Branches and merges to
match the topology, so a fan-in (batch/join) journey includes all the inputs
that were combined. Bounded by what is still in the recent ring.
@spec new() :: t()
@spec snapshot(t(), integer()) :: %{ events: [map()], series: [map()], rate: non_neg_integer() }
A snapshot for the panel: the recent events (newest first), a per-second series for the chart, and the current rate (events in the last full second).