PhiAccrualUdp.Listener
(phi_accrual_udp v0.1.2)
View Source
UDP listener that receives phi_accrual_udp heartbeats and feeds
them into the PhiAccrual core detector.
Opens a UDP socket on a configurable port, decodes incoming packets
via PhiAccrualUdp.Packet.decode/1, and on success calls
PhiAccrual.observe(node, receipt_ts) where receipt_ts comes from
:erlang.monotonic_time(:millisecond) at the moment the packet was
pulled from the socket. Decode failures are dropped and reported via
telemetry.
Mapping packet to node
By default, the listener uses the sender's IP+port tuple as the node
identifier ({ip_tuple, port}). Pass a :node_resolver function to
map IP+port to your own node identifiers. This is the recommended
setup for production deployments — see "Operational
considerations" in the README for the failure modes the default
resolver exposes (Sender restart producing new ephemeral source
ports, NAT session recycling, container IP changes).
Listener.start_link(
port: 4370,
node_resolver: fn
{10, 0, 0, 1}, _ -> :node_a
{10, 0, 0, 2}, _ -> :node_b
ip, port -> {ip, port}
end
)Flow control
The listener opens its UDP socket with active: N (rather than
active: true), so the kernel delivers at most N packets to the
GenServer mailbox before falling back to passive mode. On
:udp_passive, the listener re-arms with another batch of N.
This bounds per-burst mailbox growth under packet floods —
important because decode cost is paid as soon as the packet enters
the mailbox, well before any downstream phi_accrual shedding can
help.
Tune via the :active_count option (default 100). Higher values
amortize the re-arm syscall across more packets at the cost of
larger worst-case mailbox bursts; lower values give tighter
back-pressure but more re-arm overhead.
Telemetry
[:phi_accrual_udp, :listener, :started]
measurements: %{}
metadata: %{port}
[:phi_accrual_udp, :listener, :passive]
measurements: %{}
metadata: %{port}
# emitted each time the listener re-arms after consuming
# `active_count` packets; useful for observing ingress saturation
[:phi_accrual_udp, :sample, :received]
measurements: %{packet_timestamp_ms}
metadata: %{node, peer}
# peer is {ip, port}; node is whatever node_resolver returned
[:phi_accrual_udp, :decode, :error]
measurements: %{packet_size}
metadata: %{reason, peer}
# reason ∈ [:wrong_size, :bad_magic, :unsupported_version, :reserved_flags_set]Security caveat
UDP is unauthenticated. Anyone who can reach the listener port can
send packets that pass Packet.decode/1 and feed observations into
the estimator, potentially poisoning detection. In hostile networks,
bind the socket to a private interface, firewall the port, or layer
authentication on top via a node_resolver that rejects unknown
peers.
Summary
Functions
Returns a specification to start this module under a supervisor.
Types
@type opts() :: [ port: :inet.port_number(), node_resolver: (:inet.ip_address(), :inet.port_number() -> term()), active_count: pos_integer(), name: GenServer.name() ]
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec start_link(opts()) :: GenServer.on_start()