PhiAccrualUdp.Sender (phi_accrual_udp v1.0.0)

View Source

Periodic UDP heartbeat sender.

Opens a UDP socket and periodically transmits a phi_accrual_udp v2 heartbeat packet to each configured target. Fire-and-forget — UDP delivery failure is the receiver's problem to detect (which is exactly what phi_accrual is for).

Required configuration

Sender.start_link(
  sender_id: 0xA1B2C3D4_E5F60718,
  targets: [
    {{10, 0, 0, 2}, 4370},
    {~c"peer-c.internal", 4370}
  ],
  interval_ms: 1_000
)

:sender_id

Required. A non-zero unsigned 64-bit integer that identifies this node on the wire. 0 is reserved at the packet-format level.

Pick a stable identifier — node name hashed, partner ID, terminal ID, whatever your topology provides. The receiver uses this value (not the packet's source IP/port) as the default node identity, so a stable sender_id survives:

  • Sender restarts (which change the ephemeral source port)
  • NAT session recycling
  • Container reschedules that change IP

Missing or zero :sender_id raises at start_link/1. That's intentional: there is no "anonymous" Sender mode.

:targets

Each target is a {host, port} tuple. Host can be an IP tuple, a charlist, or an atom; port is an integer. Resolution happens on every send so DNS changes are picked up without restart, at the cost of a resolver lookup per target per tick. For deployments where DNS reliability is uncertain, prefer pre-resolved IP tuples — see "Operational considerations" in the README.

Address family and binding

  • :inet6 (default false) — when true, opens the source socket with the IPv6 family AND sets {:ipv6_v6only, true} explicitly. All IP-tuple targets must be 8-element; start_link/1 raises ArgumentError on mismatch.

  • :ip (default: kernel-chosen) — bind address for the Sender's source socket. Operationally different from PhiAccrualUdp.Listener's :ip. The Listener's :ip only filters incoming traffic; the Sender's :ip sets the source address of outbound packets, which affects the kernel's routing-table choice. A misconfigured :ip on the Sender can cause packets to fail delivery silently (wrong gateway, no route to host). Ensure the configured source is on a routable path to all targets.

    During v1/v2 migration, changing the Sender's :ip also changes the {:peer, ip, port} identity seen by v1 receivers (the v1 default node resolver keys on source address). v2 deployments using :sender_id are unaffected — that's one of the reasons :sender_id exists.

Dual-stack deployments run two Senders, one per family. Mixing v4 and v6 targets in a single Sender is not supported — :gen_udp cannot send to both families from a single family-bound socket.

Hostname targets are not validated against :inet6 at start_link/1 (resolution happens per-send, see :targets above). A hostname that resolves to the wrong family will surface as a per-target [:sender, :send, :error] event with reason: :eafnosupport (or similar). IP-tuple targets are validated at start.

Concurrency and timeouts

On each tick, every target is sent in parallel via Task.async_stream/3. A slow target (e.g., one whose DNS lookup stalls) only affects itself — it does not delay sends to other targets in the same tick.

  • :max_send_concurrency — maximum number of concurrent sends per tick. Default: 64. The actual concurrency for a given tick is min(length(targets), max_send_concurrency).
  • :send_timeout_ms — per-target send timeout. Default: max(50, div(interval_ms, 2)). Must be strictly less than :interval_msstart_link/1 raises otherwise, because a timeout >= interval means slow targets can pile up across ticks. Tasks that exceed the timeout are killed and surfaced as [:sender, :send, :timeout] telemetry.

Timestamp source

By default the sender stamps packets with :erlang.system_time(:millisecond) (wall clock, NTP-corrected on most systems). Pass :timestamp_fn to override — for example, to use monotonic time if you don't trust the wall clock. Receivers do not use the packet timestamp for the EWMA; it is diagnostic-only.

Telemetry

[:phi_accrual_udp, :sender, :started]
  measurements: %{}
  metadata:     %{interval_ms, target_count, sender_id,
                  max_send_concurrency, send_timeout_ms,
                  inet6, ip}
  # ip is nil when not explicitly set (kernel-chosen source)

[:phi_accrual_udp, :sender, :send, :ok]
  measurements: %{duration}
  metadata:     %{target, sender_id}
  # one event per successful send per target per tick
  # duration in native time units (see System.convert_time_unit/3)
  # HIGH VOLUME: 1 event per target per tick. Subscribe only
  # if you need per-target latency histograms.

[:phi_accrual_udp, :sender, :send, :error]
  measurements: %{duration}
  metadata:     %{target, sender_id, reason}
  # reason is whatever :gen_udp.send/4 returned, e.g.
  # :ehostunreach, :enetunreach, :emsgsize.

[:phi_accrual_udp, :sender, :send, :timeout]
  measurements: %{duration}
  metadata:     %{target, sender_id}
  # the Task was killed by :send_timeout_ms. duration is the
  # configured timeout, in native time units.

[:phi_accrual_udp, :sender, :tick]
  measurements: %{sent, errors, timeouts, duration}
  metadata:     %{sender_id}
  # aggregate counts across all targets for this tick
  # sent + errors + timeouts == target_count
  # duration is wall-clock of the parallel send phase, native units

Summary

Functions

Build a child specification for use under a Supervisor.

Types

opts()

@type opts() :: [
  sender_id: pos_integer(),
  targets: [target()],
  interval_ms: pos_integer(),
  send_timeout_ms: pos_integer(),
  max_send_concurrency: pos_integer(),
  inet6: boolean(),
  ip: :inet.ip_address(),
  timestamp_fn: (-> non_neg_integer()),
  name: GenServer.name()
]

target()

@type target() :: {:inet.ip_address() | charlist() | atom(), :inet.port_number()}

Functions

child_spec(init_arg)

@spec child_spec(opts()) :: Supervisor.child_spec()

Build a child specification for use under a Supervisor.

Honors the standard supervisor options :id, :restart, and :shutdown when present in the keyword list, alongside the Sender's own options. Useful for running multiple Sender instances under one supervisor — e.g., one per family for dual-stack deployments, or one per logical target group:

children = [
  {PhiAccrualUdp.Sender,
    sender_id: 0xA1, targets: v4_peers, id: :sender_v4},
  {PhiAccrualUdp.Sender,
    sender_id: 0xA1, targets: v6_peers,
    inet6: true, id: :sender_v6}
]

Defaults: id: PhiAccrualUdp.Sender, restart: :permanent, shutdown: 5_000, type: :worker.

start_link(opts \\ [])

@spec start_link(opts()) :: GenServer.on_start()