The replicated substrate
View Sourcebarrel_p2p_replica is the low-level gossip/CRDT engine behind
barrel_p2p_map, the service registry,
leader election, sharded placement, and durable reminders. It drives a
gossiped OR-Map: it broadcasts add/remove deltas, routes incoming deltas
to your merge callback, full-syncs state to peers on connect (and pulls
state on start), and drops a node's entries on peer_down.
Reach for it directly only when barrel_p2p_map does not fit, that is when
you need custom merge or snapshot semantics: layering extra invariants
on top of the OR-Map (leader election layers fencing tokens), a different
projection, or a tailored full-sync. For an ordinary replicated key-value
map, use barrel_p2p_map. Stability: beta.
Starting an instance
barrel_p2p_replica:start_link(#{name => my_instance, callback => my_module}).name is BOTH the registered process name AND the Plumtree tag that
scopes this instance's broadcasts, so it must be a unique atom. All
instances share the one Plumtree bus; each ignores payloads carrying
another instance's tag. One callback module can back several
independently-named instances, because every callback receives the
instance name as its first argument (this is how every barrel_p2p_map
shares the barrel_p2p_map module).
The OWNER process holds the actual OR-Map and implements the callbacks, so
it can run its side effects synchronously. Start the owner BEFORE its
replica instance: the callbacks run in the replica process and cast into
the owner, which must already exist. The per-instance supervisor uses
rest_for_one (owner then replica) to enforce that on every restart.
The callback contract
-callback replica_merge_delta(Name, Delta) -> ok.
-callback replica_apply_full_sync(Name, Snapshot) -> ok.
-callback replica_full_sync_snapshot(Name) -> {sync, Snapshot} | empty.
-callback replica_remove_node(Name, node()) -> ok.
-callback replica_merge_custom(Name, Payload) -> ok. %% optional
-callback replica_anti_entropy() -> boolean(). %% optionalreplica_merge_delta/2— merge an incoming{Key, entry}delta into the owner's map and run its side effects.replica_apply_full_sync/2— apply a snapshot received from a peer on connect (or pulled on start). Same path as a delta for most consumers.replica_full_sync_snapshot/1— produce the snapshot to push to a newly connected peer, oremptywhen there is nothing to send.replica_remove_node/2— drop entries owned by a node that left or failed. A no-op is a valid choice (most built-ins keep their data; see theprune_on_peer_downdiscussion in replicated maps).replica_merge_custom/2— merge a feature-specific broadcast (see below). Optional; omit it if you do not usebroadcast_custom/2.replica_anti_entropy/0— returntrueto make periodic anti-entropy intrinsic to this module's instances (see below). Optional; omit it (the default) to leave it off.
Broadcasting
barrel_p2p_replica:broadcast_update(Name, {add, Key, Value}).
barrel_p2p_replica:broadcast_update(Name, {remove, Key}).
barrel_p2p_replica:broadcast_custom(Name, Payload).broadcast_update/2 gossips OR-Map add/remove deltas: an add carries a
fresh dot, a remove a tombstone, so the receiver's merge resolves against
any in-flight value by HLC. broadcast_custom/2 gossips an arbitrary
payload on the instance's tag, delivered to replica_merge_custom/2. Use
it for invariants the plain OR-Map cannot express: leader election
broadcasts {Name, Fence} this way to publish the fencing token alongside
the election (barrel_p2p_leader.erl:142, barrel_p2p_leader.erl:316).
Late start and recovery
An instance started after the cluster has already formed gets no peer_up
for already-connected peers, so on start it seeds its peer set from the
active view and pulls a full sync from those peers via
replica_full_sync_snapshot/1 / replica_apply_full_sync/2. A restarted
owner recovers state the same way. There is nothing to do in your
callbacks for this; it is built into the driver.
Anti-entropy
The full-sync above is one-shot (on start / peer_up). State learned via
full-sync is not re-broadcast, so after a partition heals a node whose link
survived the split gets no fresh peer_up and could stay behind. To close that
gap an instance can run periodic anti-entropy: it pulls a full sync from one
random peer every replica_anti_entropy_ms (default 30000), so it reconverges
on its own. Because the merge is idempotent and the snapshot is full-state,
repeated pulls are safe and state propagates transitively.
A callback module turns this on by exporting replica_anti_entropy/0 returning
true. It is a property of the module, not a per-instance or operator flag:
the only knob is the global interval. The built-in reminder and barrel_p2p_map
do so, so their convergence is intrinsic with no opt-out. Implement it only for
a store whose replica_full_sync_snapshot/1 returns the WHOLE state and whose
removals are tombstones, so a re-pull cannot resurrect a hard-deleted entry. The
registry (local-only snapshot, overlay-lookup fallback), leader, and shard
(heartbeat-driven self-healing) do not implement it, so they stay off.
Wire safety
Your callbacks receive entries straight off gossip. There are two distinct concerns:
- Wrapper safety. Feeding malformed dots/HLCs, an empty dot map, or a
non-map payload to
barrel_p2p_ormap:absorb_clock/mergecan crash the merge or the sharedbarrel_p2p_hlcserver. An implementer that merges deltas from sources it does not fully control SHOULD validate the wrapper before merging. - Leaf/payload validation (is this value well-formed for my app?) is entirely your own concern.
barrel_p2p_crdt_wire is the provided helper for the
first, with an optional leaf hook for the second. Using it is
recommended, not enforced: an implementer with full control of its
writers, or its own validation, may skip it. Be aware that of the
built-ins only the reminder validates today; the registry, leader, and
shard have purely internal writers, so they do not. Validate if your deltas
can come from a source you do not fully control.
barrel_p2p_crdt_wire
barrel_p2p_crdt_wire is the safe gossip-ingest surface. Stability:
supported.
%% Wrapper validity (and an optional leaf-value check).
barrel_p2p_crdt_wire:valid_entry(Entry).
barrel_p2p_crdt_wire:valid_entry(Entry, fun is_my_value/1).
%% Keep only the valid entries of a (possibly non-map) payload.
Accepted = barrel_p2p_crdt_wire:accept(Payload, LeafFun).
%% accept + absorb_clock + merge, in one step.
{Merged, Accepted} = barrel_p2p_crdt_wire:ingest(LocalMap, Incoming, LeafFun).accept/2 and ingest/3 guard the top-level argument: a non-map payload
(a malformed broadcast could deliver {delta, Node, garbage}) returns
#{} / leaves the local map unchanged, never crashing the caller on any
peer-supplied term. ingest/3 returns BOTH the merged OR-Map AND the
accepted (validated, filtered) sub-map, so you can reconcile and emit
events for exactly the keys that changed instead of rescanning. On a
full-sync snapshot, Accepted is the whole validated snapshot, which is
how a map populates on first sync.
Transport coupling
Gossip rides barrel_p2p_plumtree + barrel_p2p_hyparview_events over
barrel_p2p's distribution carrier, so a consumer must run on barrel_p2p's
distribution. A pluggable transport (for apps with their own membership)
is future work.
Related
- Replicated maps is the high-level map built on this behaviour; start there unless you need custom merge.
- Leader election is the canonical
custom-merge consumer (fencing via
broadcast_custom/2). - Service registry explains the OR-Map model the deltas carry.