PhiAccrualUdp.Listener (phi_accrual_udp v0.1.0)

View Source

UDP listener that receives phi_accrual_udp heartbeats and feeds them into the PhiAccrual core detector.

Opens a UDP socket on a configurable port, decodes incoming packets via PhiAccrualUdp.Packet.decode/1, and on success calls PhiAccrual.observe(node, receipt_ts) where receipt_ts comes from :erlang.monotonic_time(:millisecond) at the moment the packet was pulled from the socket. Decode failures are dropped and reported via telemetry.

Mapping packet to node

By default, the listener uses the sender's IP+port tuple as the node identifier ({ip_tuple, port}). Pass a :node_resolver function to map IP+port to your own node atoms — useful for static cluster topologies where you want stable node ids across sender restarts.

Listener.start_link(
  port: 4370,
  node_resolver: fn
    {10, 0, 0, 1}, _ -> :node_a
    {10, 0, 0, 2}, _ -> :node_b
    ip, port -> {ip, port}
  end
)

Telemetry

[:phi_accrual_udp, :listener, :started]
  measurements: %{}
  metadata:     %{port}

[:phi_accrual_udp, :sample, :received]
  measurements: %{packet_timestamp_ms}
  metadata:     %{node, peer}
  # peer is {ip, port}; node is whatever node_resolver returned

[:phi_accrual_udp, :decode, :error]
  measurements: %{packet_size}
  metadata:     %{reason, peer}
  # reason ∈ [:wrong_size, :bad_magic, :unsupported_version, :reserved_flags_set]

Security caveat

UDP is unauthenticated. Anyone who can reach the listener port can send packets that pass Packet.decode/1 and feed observations into the estimator, potentially poisoning detection. In hostile networks, bind the socket to a private interface, firewall the port, or layer authentication on top via a node_resolver that rejects unknown peers.

Summary

Functions

Returns a specification to start this module under a supervisor.

Types

opts()

@type opts() :: [
  port: :inet.port_number(),
  node_resolver: (:inet.ip_address(), :inet.port_number() -> term()),
  name: GenServer.name()
]

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

start_link(opts \\ [])

@spec start_link(opts()) :: GenServer.on_start()