Monitorex.Cluster (monitorex v0.3.0)

Copy Markdown

Cluster support for Monitorex — provides multi-node data aggregation across distributed Erlang nodes.

Use fetch_from_all_nodes/2 to query all nodes in the cluster, then pass the results to the appropriate merge_* function to produce consolidated aggregates.

Summary

Functions

Returns all reachable nodes including Node.self().

Calls the given Storage function on all connected nodes via RPC.

Merges consumer aggregates from multiple nodes.

Merges endpoint aggregates from multiple nodes.

Merges host aggregates collected from multiple nodes.

Merges recent event lists from multiple nodes.

Merges route aggregates from multiple nodes.

Functions

connected_nodes()

@spec connected_nodes() :: [node()]

Returns all reachable nodes including Node.self().

When cluster_mode config is :single, returns only [Node.self()]. Otherwise returns [Node.self() | Node.list()].

fetch_from_all_nodes(func_name, args)

@spec fetch_from_all_nodes(atom(), list()) :: [{node(), term()}]

Calls the given Storage function on all connected nodes via RPC.

Returns a list of {node, result} tuples for successful calls. Nodes that return {:badrpc, _} are silently omitted.

Parameters

  • func_name — atom name of a function on Monitorex.Storage (e.g. :list_hosts, :list_routes, :list_recent_outbound)
  • args — list of arguments to pass to the function

Configuration

  • :cluster_max_concurrency — max concurrent RPC calls (default 3)
  • :cluster_rpc_timeout — per-call timeout in ms (default 5_000)

merge_consumers(node_consumers)

@spec merge_consumers([{node(), [map()]}]) :: [map()]

Merges consumer aggregates from multiple nodes.

Same merge strategy as merge_hosts/1 but for consumer data.

merge_endpoints(node_endpoints)

@spec merge_endpoints([{node(), [map()]}]) :: [map()]

Merges endpoint aggregates from multiple nodes.

Same merge strategy as merge_hosts/1 but for endpoint data.

merge_hosts(node_hosts)

@spec merge_hosts([{node(), [map()]}]) :: [map()]

Merges host aggregates collected from multiple nodes.

Input

A list of {node, [host_map]} tuples — as returned by fetch_from_all_nodes(:list_hosts, []).

Merge strategy

  • requests, errors, total_duration are summed
  • avg_latency is recomputed as total_duration / requests
  • p50, p95, p99 are weighted by each node's request count
  • :node is set to a list of all source nodes that contributed

merge_recent(node_events, top_n \\ 50)

@spec merge_recent([{node(), [map()]}], pos_integer()) :: [map()]

Merges recent event lists from multiple nodes.

Input

A list of {node, [event_struct]} tuples.

Merge strategy

  • All events are flattened into a single list
  • Each event is tagged with its source :node
  • Sorted by timestamp descending
  • Returns the top top_n events (default 50)

merge_routes(node_routes)

@spec merge_routes([{node(), [map()]}]) :: [map()]

Merges route aggregates from multiple nodes.

Same merge strategy as merge_hosts/1 but for route data.