Electric.Shapes.Consumer.Subqueries.MoveQueue (electric v1.6.6)

Copy Markdown View Source

Multi-dependency move queue. Tracks move_in/move_out operations per dependency index, with deduplication and redundancy elimination scoped per dependency.

Move-outs from any dependency are drained before move-ins from any dependency.

Each per-dep batch also accumulates the upstream Postgres transaction ids that contributed to it, so move-in/move-out broadcasts can carry txids for client attribution (mirroring Electric.LogItems.from_change/4).

Summary

Functions

Enqueue a materializer payload for a specific dependency. dep_view is the current view for this dependency, used for redundancy elimination.

Pop the next batch of operations. Returns move-out batches (any dep) before move-in batches. Returns {batch, updated_queue} or nil if the queue is empty.

Types

batch()

@type batch() :: {batch_kind(), non_neg_integer(), [move_value()], [txid()]}

batch_kind()

@type batch_kind() :: :move_out | :move_in

entry()

@type entry() :: {[move_value()], MapSet.t(txid())}

move_value()

@type move_value() :: {term(), term()}

t()

@type t() :: %Electric.Shapes.Consumer.Subqueries.MoveQueue{
  move_in: %{required(non_neg_integer()) => entry()},
  move_out: %{required(non_neg_integer()) => entry()}
}

txid()

@type txid() :: pos_integer()

Functions

enqueue(queue, dep_index, payload, dep_view)

@spec enqueue(t(), non_neg_integer(), map() | keyword(), MapSet.t()) :: t()

Enqueue a materializer payload for a specific dependency. dep_view is the current view for this dependency, used for redundancy elimination.

The payload may include a :txids key listing the upstream xids that produced the moves. Those xids are unioned with any already accumulated for this dep.

length(move_queue)

@spec length(t()) :: non_neg_integer()

new()

@spec new() :: t()

pop_next(queue)

@spec pop_next(t()) :: {batch(), t()} | nil

Pop the next batch of operations. Returns move-out batches (any dep) before move-in batches. Returns {batch, updated_queue} or nil if the queue is empty.