SwimEx.GossipQueue (SwimEx v0.1.0)

View Source

Priority gossip event queue with transmit-count tracking.

Events are packed into outgoing messages in priority order: dead (0) > suspect (1) > alive (2). Within the same priority, events with the lowest transmit count are packed first.

Transmit multiplier: ceil(log2(N+1)) where N = alive+suspect count. Events are dropped once their count reaches the limit.

Higher-incarnation events for the same node supersede older entries immediately on enqueue.

Summary

Functions

Enqueues a gossip event.

Returns all entries in the queue in priority order.

Creates a new, empty gossip queue.

Pack events into a message payload up to mtu bytes. Returns {packed_events, updated_queue}.

Returns the number of events currently in the queue.

Calculates the transmit limit for events based on the cluster size n.

Types

entry()

@type entry() :: %{
  event: event(),
  priority: 0 | 1 | 2,
  transmit_count: non_neg_integer(),
  multiplier: pos_integer()
}

event()

@type event() :: SwimEx.Codec.event()

node_id()

@type node_id() :: {String.t(), :inet.port_number()}

t()

@type t() :: %SwimEx.GossipQueue{
  by_node: %{required(node_id()) => entry()},
  sorted_keys: term()
}

Functions

enqueue(q, event, multiplier \\ 1)

@spec enqueue(t(), event(), pos_integer()) :: t()

Enqueues a gossip event.

If an event for the same node already exists, the one with the higher incarnation wins. If incarnations are equal, the event with higher priority (lower priority value) wins.

Returns the updated queue.

entries(q)

@spec entries(t()) :: [entry()]

Returns all entries in the queue in priority order.

Used primarily for testing and debugging.

new()

@spec new() :: t()

Creates a new, empty gossip queue.

pack(q, n, mtu)

@spec pack(t(), non_neg_integer(), non_neg_integer()) :: {[event()], t()}

Pack events into a message payload up to mtu bytes. Returns {packed_events, updated_queue}.

Increments transmit count on packed events and drops those that have reached the limit for the given alive+suspect count N.

size(q)

@spec size(t()) :: non_neg_integer()

Returns the number of events currently in the queue.

transmit_limit(n)

@spec transmit_limit(non_neg_integer()) :: non_neg_integer()

Calculates the transmit limit for events based on the cluster size n.

Returns the integer transmit limit.