Public API for Concord's sync and watch protocol.
Pull model — changes/3
events = Concord.Sync.changes(1840, 1850)Push model — watch/2
{:ok, ref} = Concord.Sync.watch({:prefix, "/tasks/"}, self())
receive do
{:concord_event, ^ref, %Event{}} -> ...
end
:ok = Concord.Sync.unwatch(ref)Stream wrapper — watch_stream/2
Concord.Sync.watch_stream({:prefix, "/tasks/"})
|> Stream.each(&IO.inspect/1)
|> Stream.run()
Summary
Functions
Returns events in the revision range [from, to] (inclusive).
Compacts the change log, removing entries before keep_revision.
Returns the earliest revision still in the change log.
Unsubscribes a watch by its reference.
Subscribes the given process to events matching the selector.
Returns a Stream that yields events matching the selector.
Returns the number of active watchers.
Functions
@spec changes(non_neg_integer(), non_neg_integer(), keyword()) :: [ Concord.Sync.Event.t() ]
Returns events in the revision range [from, to] (inclusive).
Options
:limit— max events to return (default: 1000)
@spec compact(non_neg_integer()) :: non_neg_integer()
Compacts the change log, removing entries before keep_revision.
@spec earliest_revision() :: non_neg_integer()
Returns the earliest revision still in the change log.
@spec unwatch(reference()) :: :ok
Unsubscribes a watch by its reference.
@spec watch(Concord.KV.Selector.t(), pid(), keyword()) :: {:ok, reference()} | {:error, term()}
Subscribes the given process to events matching the selector.
Returns {:ok, watch_ref}.
Options
:max_queue— max pending events before backpressure (default: 1000)
@spec watch_stream( Concord.KV.Selector.t(), keyword() ) :: Enumerable.t()
Returns a Stream that yields events matching the selector.
The stream blocks on receive and yields one event at a time.
Terminates when the calling process receives :concord_watch_done.
@spec watcher_count() :: non_neg_integer()
Returns the number of active watchers.