Watch storage for tracking key-based and prefix-based subscriptions with database scoping.
Allows multiple processes to register watches for specific keys and key prefixes in specific databases, associating each watch with a reference value.
Internal Structure
The watch storage uses these collections:
key_to_pids:%{db => %{key => %{pid => ref}}}- maps database to key to pid-to-ref mappingspid_to_keys:%{pid => MapSet.t({db, key})}- maps pids to sets of database-scoped keysprefix_tree: Rust radix tree storing watcher indexes by database-scoped prefixprefix_to_pids:%{db => %{prefix => %{pid => ref}}}- maps database to prefix to pid-to-ref mappingspid_to_prefixes:%{pid => MapSet.t({db, prefix})}- maps pids to database-scoped prefixespid_by_idxandidx_by_pid: maps between pids and integer indexes stored in the Rust tree
Example
watch = Vdr.TS.Watch.create()
{:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "user:123", :my_ref)
{:ok, watch} = Vdr.TS.Watch.add_prefix(watch, self(), 0, "team:42:", :prefix_ref)
[{:my_ref, _pid}] = Vdr.TS.Watch.lookup(watch, 0, "user:123")
[{:prefix_ref, _pid}] = Vdr.TS.Watch.lookup_prefix(watch, 0, "team:42:user:1")
{:ok, watch, _remaining} = Vdr.TS.Watch.delete(watch, self(), 0, "user:123")
{:ok, watch, _remaining} = Vdr.TS.Watch.delete_prefix(watch, self(), 0, "team:42:")
watch = Vdr.TS.Watch.delete_all(watch, self())
Summary
Functions
Adds a watch entry for the given pid, database, key, and ref.
Adds a prefix watch entry for the given pid, database, prefix, and ref.
Returns all unique {pid, ref} pairs across all exact and prefix watches.
Creates a new empty watch storage.
Deletes a single watch entry for the given pid, database, and key.
Deletes all watch entries for the given pid.
Deletes a single prefix watch entry for the given pid, database, and prefix.
Looks up all watches for the given database and exact key.
Returns all unique {ref, pid} pairs for exact and prefix watches in a specific database.
Looks up all prefix watches matching the given database and key.
Types
@type db_key() :: {non_neg_integer(), String.t()}
@type t() :: %Vdr.TS.Watch{ idx_by_pid: %{required(pid()) => integer()}, key_to_pids: %{ required(non_neg_integer()) => %{ required(String.t()) => %{required(pid()) => watcher_ref()} } }, pid_by_idx: %{required(integer()) => pid()}, pid_to_keys: %{required(pid()) => MapSet.t(db_key())}, pid_to_prefixes: %{required(pid()) => MapSet.t(db_key())}, prefix_to_pids: %{ required(non_neg_integer()) => %{ required(String.t()) => %{required(pid()) => watcher_ref()} } }, prefix_tree: reference() }
@type watcher_ref() :: term()
Functions
Adds a watch entry for the given pid, database, key, and ref.
Returns {:ok, updated_watch} on success, or {:error, reason} if the key
is already registered for this pid in this database.
Parameters
watch- The watch storagepid- The process identifierdb- The database numberkey- The key to watch (string)ref- The reference value to associate with this watch
Examples
iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> {:error, :already_registered} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref2)
@spec add_prefix(t(), pid(), non_neg_integer(), String.t(), term()) :: {:ok, t()} | {:error, atom()}
Adds a prefix watch entry for the given pid, database, prefix, and ref.
Returns {:ok, updated_watch} on success, or {:error, reason} if the prefix
is already registered for this pid in this database.
Parameters
watch- The watch storagepid- The process identifierdb- The database numberprefix- The key prefix to watch (string)ref- The reference value to associate with this watch
Examples
iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add_prefix(watch, self(), 0, "user:", :ref1)
iex> {:error, :already_registered} = Vdr.TS.Watch.add_prefix(watch, self(), 0, "user:", :ref2)
Returns all unique {pid, ref} pairs across all exact and prefix watches.
This is useful for broadcasting messages to all watchers, such as sending Init messages when streaming mode starts.
Parameters
watch- The watch storage
Examples
iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key2", :ref2)
iex> watchers = Vdr.TS.Watch.all_watchers(watch)
iex> length(watchers) == 2
true
@spec create() :: t()
Creates a new empty watch storage.
Examples
iex> watch = Vdr.TS.Watch.create()
iex> is_struct(watch, Vdr.TS.Watch)
true
@spec delete(t(), pid(), non_neg_integer(), String.t()) :: {:ok, t(), non_neg_integer()} | {:error, atom()}
Deletes a single watch entry for the given pid, database, and key.
Returns {:ok, updated_watch, remaining_count} on success, where remaining_count
is the number of watches remaining for the pid. Returns {:error, reason} if the
watch entry does not exist.
Parameters
watch- The watch storagepid- The process identifierdb- The database numberkey- The key to unwatch
Examples
iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> {:ok, watch, 0} = Vdr.TS.Watch.delete(watch, self(), 0, "key1")
iex> {:error, :not_found} = Vdr.TS.Watch.delete(watch, self(), 0, "key1")
Deletes all watch entries for the given pid.
Returns the updated watch storage. This operation always succeeds, even if the pid has no watches. Deletes watches across all databases.
Parameters
watch- The watch storagepid- The process identifier
Examples
iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 1, "key2", :ref2)
iex> watch = Vdr.TS.Watch.delete_all(watch, self())
iex> [] = Vdr.TS.Watch.lookup(watch, 0, "key1")
@spec delete_prefix(t(), pid(), non_neg_integer(), String.t()) :: {:ok, t(), non_neg_integer()} | {:error, atom()}
Deletes a single prefix watch entry for the given pid, database, and prefix.
Returns {:ok, updated_watch, remaining_count} on success, where remaining_count
is the number of watches remaining for the pid. Returns {:error, reason} if the
prefix watch entry does not exist.
Parameters
watch- The watch storagepid- The process identifierdb- The database numberprefix- The prefix to unwatch
Examples
iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add_prefix(watch, self(), 0, "user:", :ref1)
iex> {:ok, watch, 0} = Vdr.TS.Watch.delete_prefix(watch, self(), 0, "user:")
iex> {:error, :not_found} = Vdr.TS.Watch.delete_prefix(watch, self(), 0, "user:")
@spec lookup(t(), non_neg_integer(), String.t()) :: [{term(), pid()}]
Looks up all watches for the given database and exact key.
Returns a list of {ref, pid} tuples for all processes watching the key
in the specified database.
Parameters
watch- The watch storagedb- The database numberkey- The key to lookup
Examples
iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> [{:ref1, _pid}] = Vdr.TS.Watch.lookup(watch, 0, "key1")
iex> [] = Vdr.TS.Watch.lookup(watch, 0, "nonexistent")
@spec lookup_by_db(t(), non_neg_integer()) :: [{term(), pid()}]
Returns all unique {ref, pid} pairs for exact and prefix watches in a specific database.
This is useful for database-wide operations like FLUSHDB.
Parameters
watch- The watch storagedb- The database number
Examples
iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 1, "key2", :ref2)
iex> watchers = Vdr.TS.Watch.lookup_by_db(watch, 0)
iex> length(watchers) == 1
true
@spec lookup_prefix(t(), non_neg_integer(), String.t()) :: [{term(), pid()}]
Looks up all prefix watches matching the given database and key.
Returns a list of {ref, pid} tuples for all processes watching a prefix
that the key starts with in the specified database.
Parameters
watch- The watch storagedb- The database numberkey- The key to lookup
Examples
iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add_prefix(watch, self(), 0, "user:", :ref1)
iex> [{:ref1, _pid}] = Vdr.TS.Watch.lookup_prefix(watch, 0, "user:123")
iex> [] = Vdr.TS.Watch.lookup_prefix(watch, 0, "other:123")