PhiAccrualUdp.Listener (phi_accrual_udp v1.0.0)

View Source

UDP listener that receives phi_accrual_udp heartbeats and feeds them into the PhiAccrual core detector.

Opens a UDP socket on a configurable port, decodes incoming packets via PhiAccrualUdp.Packet.decode/1 (dual-decoding both v1 and v2 wire formats throughout 1.x), and on success calls PhiAccrual.observe(node, receipt_ts) where receipt_ts comes from :erlang.monotonic_time(:millisecond) at the moment the packet was pulled from the socket. Decode failures and resolver rejections are dropped and reported via telemetry.

Mapping packet to node

The listener resolves each packet's source to a node term before calling PhiAccrual.observe/2. Resolution is configurable via :node_resolver.

Default resolver

The default resolver returns tagged tuples that distinguish the identity source:

  • v2 packets → {:sender_id, sender_id}
  • v1 packets → {:peer, ip, port}

Tagged tuples prevent accidental collision (a bare u64 could shadow other integer-shaped node identities) and let downstream code pattern-match on the identity source.

Custom resolver

Custom resolvers receive three arguments and return either a node term to use as the identity, or {:reject, reason} to drop the packet:

resolver = fn
  _ip, _port, sender_id when is_integer(sender_id) ->
    lookup_by_id(sender_id) || {:reject, :unknown_sender}

  ip, port, nil ->
    # v1 packet, no sender_id available
    lookup_by_peer(ip, port) || {:reject, :unknown_peer}
end

Listener.start_link(port: 4370, node_resolver: resolver)

Resolver safety

The :node_resolver runs synchronously in the Listener process on every received packet. Keep it cheap and non-blocking: use :persistent_term for static lookup tables, ETS for dynamic ones. Avoid GenServer.call/2, network I/O, or anything else that can block — these will stall packet processing for every peer, not just the one being resolved.

Resolvers must return a term to use as the node identity, or {:reject, reason} to drop the packet. nil is treated as a valid node identity — to drop, you must explicitly return {:reject, reason}.

Exceptions raised by the resolver crash the Listener; the supervisor will restart it, but estimator state for all peers will reset. Validate inputs in your resolver and use {:reject, reason} for rejection paths, not exceptions.

Address family and binding

  • :inet6 (default false) — when true, opens the socket with the IPv6 family AND sets {:ipv6_v6only, true} explicitly. The library does not rely on OS defaults for IPV6_V6ONLY (which vary across Linux/BSD/Windows). The socket accepts ONLY IPv6 traffic.

  • :ip (default: bind to all interfaces) — bind address, passed through to :gen_udp.open/2. Must be an :inet address tuple (4-element for v4, 8-element for v6). Hostnames are not accepted by :gen_udp for this option; resolve them in the caller if needed.

Dual-stack deployments (a single host serving both v4 and v6 peers) should run two listeners, one per family, supervised together. Do not attempt to multiplex via OS-level dual-stack on a single v6 socket: v4 peers would arrive as v4-mapped-v6 addresses, mutating the {:peer, ip, port} identity shape and breaking the stable-identity contract.

IP tuples in resolver arguments differ in shape:

  • v4 → 4-element tuple, e.g. {10, 0, 0, 1}
  • v6 → 8-element tuple, e.g. {0, 0, 0, 0, 0, 0, 0, 1} (::1)

Resolvers running across mixed-family deployments must handle both shapes. The cleanest answer is to configure stable :sender_id values on senders so the resolver can match on the third argument regardless of the source address family.

Flow control

The listener opens its UDP socket with active: N (rather than active: true), so the kernel delivers at most N packets to the GenServer mailbox before falling back to passive mode. On :udp_passive, the listener re-arms with another batch of N. This bounds per-burst mailbox growth under packet floods — important because decode cost is paid as soon as the packet enters the mailbox, well before any downstream phi_accrual shedding can help.

Tune via the :active_count option (default 100). Higher values amortize the re-arm syscall across more packets at the cost of larger worst-case mailbox bursts; lower values give tighter back-pressure but more re-arm overhead.

Telemetry

[:phi_accrual_udp, :listener, :started]
  measurements: %{}
  metadata:     %{port, inet6, ip}
  # ip is nil when not explicitly set (bind to all interfaces)

[:phi_accrual_udp, :listener, :passive]
  measurements: %{}
  metadata:     %{port}
  # emitted each time the listener re-arms after consuming
  # `active_count` packets; useful for observing ingress
  # saturation

[:phi_accrual_udp, :sample, :received]
  measurements: %{packet_timestamp_ms}
  metadata:     %{node, peer, wire_version}
  # wire_version is 1 or 2; group by wire_version to track
  # fleet migration progress (v1 ratio decays to 0 as senders
  # upgrade)

[:phi_accrual_udp, :sample, :rejected]
  measurements: %{}
  metadata:     %{peer, sender_id, reason, wire_version}
  # emitted when the resolver returns {:reject, reason};
  # `reason` is whatever the resolver returned. Prefer
  # atoms or short tagged tuples to keep telemetry
  # cardinality bounded. `sender_id` is nil for rejected
  # v1 packets.

[:phi_accrual_udp, :decode, :error]
  measurements: %{packet_size}
  metadata:     %{reason, peer}
  # reason ∈ [:wrong_size, :bad_magic, :unsupported_version,
  #           :reserved_flags_set, :reserved_sender_id]

Security caveat

UDP is unauthenticated. Anyone who can reach the listener port can send packets that pass Packet.decode/1 and feed observations into the estimator. With v2, a hostile peer can also mint arbitrary sender_id values, creating unbounded cold-start estimator state on the receiver. In hostile networks, bind the socket to a private interface, firewall the port, or layer authentication on top via a node_resolver that returns {:reject, reason} for unknown peers.

Summary

Functions

Build a child specification for use under a Supervisor.

Types

node_resolver()

@type node_resolver() :: (:inet.ip_address(),
                    :inet.port_number(),
                    non_neg_integer()
                    | nil ->
                      term() | {:reject, term()})

opts()

@type opts() :: [
  port: :inet.port_number(),
  node_resolver: node_resolver(),
  active_count: pos_integer(),
  inet6: boolean(),
  ip: :inet.ip_address(),
  name: GenServer.name()
]

Functions

child_spec(init_arg)

@spec child_spec(opts()) :: Supervisor.child_spec()

Build a child specification for use under a Supervisor.

Honors the standard supervisor options :id, :restart, and :shutdown when present in the keyword list, alongside the Listener's own options. Useful for running multiple Listener instances under one supervisor — e.g., one per address family for dual-stack deployments:

children = [
  {PhiAccrualUdp.Listener, port: 4370, id: :listener_v4},
  {PhiAccrualUdp.Listener,
    port: 4370, inet6: true, id: :listener_v6}
]

Defaults: id: PhiAccrualUdp.Listener, restart: :permanent, shutdown: 5_000, type: :worker.

start_link(opts \\ [])

@spec start_link(opts()) :: GenServer.on_start()