Bloccs.Web.Telemetry.Flow (bloccs_web v0.1.0)

Copy Markdown View Source

The pure core behind the Messages panel: a bounded record of messages moving through a network. Each event is one edge traversal (a [:bloccs, :emit] correlated with the emitting node's outcome and latency) or a failure/drop.

Keeps a ring of the most recent events for the activity feed and per-second buckets for the throughput chart. Like the rest of the collector core it owns no clock — now (ms) is passed in — so it is deterministically testable.

Summary

Types

A normalized flow event (before timestamping).

t()

Functions

A snapshot for the panel: the recent events (newest first), a per-second series for the chart, and the current rate (events in the last full second).

Types

endpoint()

@type endpoint() :: {atom(), atom()}

event()

@type event() :: %{
  node: atom(),
  out_port: atom() | nil,
  to: endpoint() | nil,
  outcome: outcome(),
  duration_ms: number() | nil,
  reason: term() | nil,
  payload: String.t() | nil
}

A normalized flow event (before timestamping).

outcome()

@type outcome() :: :ok | :failed | :dropped | :skipped | :retry | :dispatch_error

t()

@type t() :: %{recent: [map()], buckets: %{required(integer()) => map()}}

Functions

new()

@spec new() :: t()

record(state, event, now)

@spec record(t(), event(), integer()) :: t()

snapshot(state, now)

@spec snapshot(t(), integer()) :: %{
  events: [map()],
  series: [map()],
  rate: non_neg_integer()
}

A snapshot for the panel: the recent events (newest first), a per-second series for the chart, and the current rate (events in the last full second).