Supervisor that manages API workers and handles remote method calls.
Supports:
- Default unicast (first available worker)
- Unicast with node selector
- Multicast to multiple nodes
Multicast strategies
Every per-node response is a {node, value} pair, where value is the body's
return value, verbatim. A node whose call failed at the transport level yields
{node, {:nebula_error, reason}}.
:all- Wait for every targeted node (or the timeout). Returns one{node, value}per targeted node — nodes that did not answer in time are reported as{node, {:nebula_error, :timeout}}, never silently dropped.:first- Return the first response that counts as a success (see the:success/:failureoptions) as a single{node, value}. If no response qualifies:{:nebula_error, :no_success, results}(never a bare list).:quorum- Wait for N successes. N isat_least:when given, otherwise a strict majority of the quorum set chosen byquorum:::configured(the default) = the configured nodes serving the method that match the selector (connected or not);:available= the connected workers. Reached: the list of collected{node, value}responses. Not reached:{:nebula_error, :quorum_not_reached, results}or{:nebula_error, :quorum_timeout, results}. Impossible quorum (required > available workers — including a:configuredmajority no live set can reach):{:nebula_error, :quorum_unreachable, %{workers: n, required: m}}— returned before any call is made.
Node Info Cache
Node information is cached in an ETS table (:nebula_nodes_cache) to avoid
expensive RPC calls on every request. Each node entry includes a last_seen_at
timestamp that indicates when the node was last successfully contacted.
This allows detection of stale nodes - a node may still be marked as connected: true
but if last_seen_at is old, it might be unresponsive.
Intelligent Node Selection Examples
# Select the node with lowest memory usage
call_on_node fn nodes_info ->
nodes_info
|> Enum.filter(fn {_, info} -> info.connected && info.runtime end)
|> Enum.min_by(fn {_, info} -> info.runtime.memory_percent end)
|> elem(0)
end do
MyAPI.heavy_task()
end
# Select nodes seen in the last 30 seconds
call_on_nodes fn nodes_info ->
thirty_seconds_ago = DateTime.add(DateTime.utc_now(), -30, :second)
nodes_info
|> Enum.filter(fn {_, info} ->
info.last_seen_at && DateTime.compare(info.last_seen_at, thirty_seconds_ago) == :gt
end)
|> Enum.map(fn {node, _} -> node end)
end do
MyAPI.broadcast_update()
end
Summary
Functions
Builds the nodes_info map with metadata about each node.
Results are cached in ETS and updated with last_seen_at timestamps.
Caches node info in ETS.
Calls a remote method with optional routing options.
Returns a specification to start this module under a supervisor.
Collects complete health data for the current node. This is used by build_nodes_info/0 via direct RPC calls to avoid circular dependencies.
Collects runtime info for the current node. This is a helper function used by collect_node_health_data_local/0.
Gets all available workers for a method across all nodes.
Gets nodes_info for workers that are actually available for a method. Only includes nodes that have registered workers.
Gets cached node info from ETS. Returns empty map if not found.
Returns the latest cluster node-info snapshot — a pure ETS read.
Refreshes the nodes cache by fetching fresh runtime info from all nodes.
Functions
Builds the nodes_info map with metadata about each node.
Results are cached in ETS and updated with last_seen_at timestamps.
Returns a map like:
%{
:"worker@host.example" => %{
short_name: :worker,
long_name: :"worker@host.example",
host: "host.example",
tags: [:worker, :video],
connected: true,
last_seen_at: ~U[2024-01-15 10:30:00Z],
runtime: %{
memory_used_mb: 256,
memory_total_mb: 1024,
memory_percent: 25.0,
process_count: 1234,
schedulers: 8,
otp_release: "26",
uptime_seconds: 3600
}
}
}Runtime info and last_seen_at are only updated for connected/reachable nodes.
If a node becomes unreachable, last_seen_at keeps its last value, allowing
detection of stale nodes.
Caches node info in ETS.
Calls a remote method with optional routing options.
Options
For every option, nil means "not set": the call behaves as if the option
were absent (a computed strategy: maybe_strategy holding nil resolves to
the default). Any other malformed value raises ArgumentError up front, and
so does any unknown option key — the option set below is closed, a typo'd
key must not be silently dropped.
:timeout- Timeout in milliseconds. Default: the module'sdefault_timeout, thenconfig :nebula_api, default_timeout:, then 5000.nilmeans "not set" (the default resolution applies); any other non-integer raises.:node_selector- 1-arity function that takes the nodes_info map and returns node(s) to call. Anything else (besidesnil, "not set") raisesArgumentErrorup front, like every other malformed call opt.:multicast- If true, calls multiple nodes and returns a list of results:strategy- Multicast strategy::all,:first,:quorum(default::all):quorum- (:quorumstrategy only) which set the default majority is taken over::configured(the default) = the configured nodes serving the method that match the selector,div(set, 2) + 1, connected or not;:available= the connected workers,div(present, 2) + 1. A function selector has no static configured set, so withstrategy: :quorumit must declarequorum: :availableorat_least:— the:configureddefault is a compile error there, never silently downgraded. Mutually exclusive with:at_least.:at_least- Positive integer: an exact number of successes required by the:quorumstrategy, overriding thequorum:majority. Mutually exclusive with:quorum.:success- (:first/:quorumonly) predicatefn value -> booleandefining what counts as a business success. Default: any worker that replied counts (a{:nebula_error, _}never does). Mutually exclusive with:failure.:failure- mirror of:success:fn value -> booleanreturning true for values that must NOT count as successes. Mutually exclusive with:success.
Returns
- For unicast: the body's return value, verbatim. A library/transport failure
(timeout, no worker available, worker crash) yields
{:nebula_error, reason}. - For multicast: per-node
{node, value}pairs — see "Multicast strategies" in the moduledoc for the exact shape per strategy.
Returns a specification to start this module under a supervisor.
See Supervisor.
Collects complete health data for the current node. This is used by build_nodes_info/0 via direct RPC calls to avoid circular dependencies.
Collects runtime info for the current node. This is a helper function used by collect_node_health_data_local/0.
memory_used: Total memory allocated by the Erlang VM (:erlang.memory(:total))memory_total: Total system RAM (from/proc/meminfoon Linux, falls back to VM total)memory_percent: VM memory usage as percentage of system RAM
Gets all available workers for a method across all nodes.
Gets nodes_info for workers that are actually available for a method. Only includes nodes that have registered workers.
pg is the source of truth for WHO serves the method; the snapshot only
enriches HOW they're doing. A node whose worker just registered but is not
in the background snapshot yet gets a synthesized entry (runtime: nil,
last_seen_at: nil until the next refresh).
Gets cached node info from ETS. Returns empty map if not found.
Returns the latest cluster node-info snapshot — a pure ETS read.
The snapshot is written exclusively by NebulaAPI.APIServer.NodesInfoCache
on its background interval; readers NEVER build it themselves, so there is
no fan-out on the read path, ever. Before the first refresh completes (boot
window) this returns %{} — selectors still see every node with a
registered worker through synthesized entries (runtime: nil, see
get_available_nodes_info/2).
Refreshes the nodes cache by fetching fresh runtime info from all nodes.