Concord.Sync (Concord v2.0.0)

Copy Markdown View Source

Public API for Concord's sync and watch protocol.

Pull model — changes/3

events = Concord.Sync.changes(1840, 1850)

Push model — watch/2

{:ok, ref} = Concord.Sync.watch({:prefix, "/tasks/"}, self())

receive do
  {:concord_event, ^ref, %Event{}} -> ...
end

:ok = Concord.Sync.unwatch(ref)

Stream wrapper — watch_stream/2

Concord.Sync.watch_stream({:prefix, "/tasks/"})
|> Stream.each(&IO.inspect/1)
|> Stream.run()

Summary

Functions

Returns events in the revision range [from, to] (inclusive).

Compacts the change log, removing entries before keep_revision.

Returns the earliest revision still in the change log.

Unsubscribes a watch by its reference.

Subscribes the given process to events matching the selector.

Returns a Stream that yields events matching the selector.

Returns the number of active watchers.

Functions

changes(from_revision, to_revision, opts \\ [])

Returns events in the revision range [from, to] (inclusive).

Options

  • :limit — max events to return (default: 1000)

compact(keep_revision)

@spec compact(non_neg_integer()) :: non_neg_integer()

Compacts the change log, removing entries before keep_revision.

earliest_revision()

@spec earliest_revision() :: non_neg_integer()

Returns the earliest revision still in the change log.

unwatch(watch_ref)

@spec unwatch(reference()) :: :ok

Unsubscribes a watch by its reference.

watch(selector, subscriber_pid \\ self(), opts \\ [])

@spec watch(Concord.KV.Selector.t(), pid(), keyword()) ::
  {:ok, reference()} | {:error, term()}

Subscribes the given process to events matching the selector.

Returns {:ok, watch_ref}.

Options

  • :max_queue — max pending events before backpressure (default: 1000)

watch_stream(selector, opts \\ [])

@spec watch_stream(
  Concord.KV.Selector.t(),
  keyword()
) :: Enumerable.t()

Returns a Stream that yields events matching the selector.

The stream blocks on receive and yields one event at a time. Terminates when the calling process receives :concord_watch_done.

watcher_count()

@spec watcher_count() :: non_neg_integer()

Returns the number of active watchers.