Bloccs.Web.Telemetry.Flow (bloccs_web v0.3.0)

Copy Markdown View Source

The pure core behind the Messages panel: a bounded record of messages moving through a network. Each event is one edge traversal (a [:bloccs, :emit] correlated with the emitting node's outcome and latency) or a failure/drop.

Keeps a ring of the most recent events for the activity feed and per-second buckets for the throughput chart. Like the rest of the collector core it owns no clock — now (ms) is passed in — so it is deterministically testable.

Summary

Types

A normalized flow event (before timestamping). msg_id / parents / trace_id are the emitted message's Bloccs.Lineage (bloccs 0.5+): msg_id is this message, parents the input id(s) that caused it (many on a fan-in), trace_id the root correlation. They let one message be tracked across hops.

t()

Functions

The lineage journey of msg_id: every recorded event in its connected causal component — ancestors (via parents) and descendants (events that list it as a parent), transitively — ordered oldest-first. Branches and merges to match the topology, so a fan-in (batch/join) journey includes all the inputs that were combined. Bounded by what is still in the recent ring.

A snapshot for the panel: the recent events (newest first), a per-second series for the chart, and the current rate (events in the last full second).

Types

endpoint()

@type endpoint() :: {atom(), atom()}

event()

@type event() :: %{
  node: atom(),
  out_port: atom() | nil,
  to: endpoint() | nil,
  outcome: outcome(),
  duration_ms: number() | nil,
  reason: term() | nil,
  payload: String.t() | nil,
  msg_id: pos_integer() | nil,
  parents: [pos_integer()],
  trace_id: pos_integer() | nil
}

A normalized flow event (before timestamping). msg_id / parents / trace_id are the emitted message's Bloccs.Lineage (bloccs 0.5+): msg_id is this message, parents the input id(s) that caused it (many on a fan-in), trace_id the root correlation. They let one message be tracked across hops.

outcome()

@type outcome() :: :ok | :failed | :dropped | :skipped | :retry | :dispatch_error

t()

@type t() :: %{recent: [map()], buckets: %{required(integer()) => map()}}

Functions

journey(events, msg_id)

@spec journey([map()], pos_integer() | nil) :: [map()]

The lineage journey of msg_id: every recorded event in its connected causal component — ancestors (via parents) and descendants (events that list it as a parent), transitively — ordered oldest-first. Branches and merges to match the topology, so a fan-in (batch/join) journey includes all the inputs that were combined. Bounded by what is still in the recent ring.

new()

@spec new() :: t()

record(state, event, now)

@spec record(t(), event(), integer()) :: t()

snapshot(state, now)

@spec snapshot(t(), integer()) :: %{
  events: [map()],
  series: [map()],
  rate: non_neg_integer()
}

A snapshot for the panel: the recent events (newest first), a per-second series for the chart, and the current rate (events in the last full second).