Vdr.TS.Watch (veidrodelis v0.1.8)

Copy Markdown View Source

Watch storage for tracking key-based and prefix-based subscriptions with database scoping.

Allows multiple processes to register watches for specific keys and key prefixes in specific databases, associating each watch with a reference value.

Internal Structure

The watch storage uses these collections:

  1. key_to_pids: %{db => %{key => %{pid => ref}}} - maps database to key to pid-to-ref mappings
  2. pid_to_keys: %{pid => MapSet.t({db, key})} - maps pids to sets of database-scoped keys
  3. prefix_tree: Rust radix tree storing watcher indexes by database-scoped prefix
  4. prefix_to_pids: %{db => %{prefix => %{pid => ref}}} - maps database to prefix to pid-to-ref mappings
  5. pid_to_prefixes: %{pid => MapSet.t({db, prefix})} - maps pids to database-scoped prefixes
  6. pid_by_idx and idx_by_pid: maps between pids and integer indexes stored in the Rust tree

Example

watch = Vdr.TS.Watch.create()

{:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "user:123", :my_ref)
{:ok, watch} = Vdr.TS.Watch.add_prefix(watch, self(), 0, "team:42:", :prefix_ref)

[{:my_ref, _pid}] = Vdr.TS.Watch.lookup(watch, 0, "user:123")
[{:prefix_ref, _pid}] = Vdr.TS.Watch.lookup_prefix(watch, 0, "team:42:user:1")

{:ok, watch, _remaining} = Vdr.TS.Watch.delete(watch, self(), 0, "user:123")
{:ok, watch, _remaining} = Vdr.TS.Watch.delete_prefix(watch, self(), 0, "team:42:")

watch = Vdr.TS.Watch.delete_all(watch, self())

Summary

Functions

Adds a watch entry for the given pid, database, key, and ref.

Adds a prefix watch entry for the given pid, database, prefix, and ref.

Returns all unique {pid, ref} pairs across all exact and prefix watches.

Creates a new empty watch storage.

Deletes a single watch entry for the given pid, database, and key.

Deletes all watch entries for the given pid.

Deletes a single prefix watch entry for the given pid, database, and prefix.

Looks up all watches for the given database and exact key.

Returns all unique {ref, pid} pairs for exact and prefix watches in a specific database.

Looks up all prefix watches matching the given database and key.

Types

db_key()

@type db_key() :: {non_neg_integer(), String.t()}

t()

@type t() :: %Vdr.TS.Watch{
  idx_by_pid: %{required(pid()) => integer()},
  key_to_pids: %{
    required(non_neg_integer()) => %{
      required(String.t()) => %{required(pid()) => watcher_ref()}
    }
  },
  pid_by_idx: %{required(integer()) => pid()},
  pid_to_keys: %{required(pid()) => MapSet.t(db_key())},
  pid_to_prefixes: %{required(pid()) => MapSet.t(db_key())},
  prefix_to_pids: %{
    required(non_neg_integer()) => %{
      required(String.t()) => %{required(pid()) => watcher_ref()}
    }
  },
  prefix_tree: reference()
}

watcher_ref()

@type watcher_ref() :: term()

Functions

add(watch, pid, db, key, ref)

@spec add(t(), pid(), non_neg_integer(), String.t(), term()) ::
  {:ok, t()} | {:error, atom()}

Adds a watch entry for the given pid, database, key, and ref.

Returns {:ok, updated_watch} on success, or {:error, reason} if the key is already registered for this pid in this database.

Parameters

  • watch - The watch storage
  • pid - The process identifier
  • db - The database number
  • key - The key to watch (string)
  • ref - The reference value to associate with this watch

Examples

iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> {:error, :already_registered} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref2)

add_prefix(watch, pid, db, prefix, ref)

@spec add_prefix(t(), pid(), non_neg_integer(), String.t(), term()) ::
  {:ok, t()} | {:error, atom()}

Adds a prefix watch entry for the given pid, database, prefix, and ref.

Returns {:ok, updated_watch} on success, or {:error, reason} if the prefix is already registered for this pid in this database.

Parameters

  • watch - The watch storage
  • pid - The process identifier
  • db - The database number
  • prefix - The key prefix to watch (string)
  • ref - The reference value to associate with this watch

Examples

iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add_prefix(watch, self(), 0, "user:", :ref1)
iex> {:error, :already_registered} = Vdr.TS.Watch.add_prefix(watch, self(), 0, "user:", :ref2)

all_watchers(watch)

@spec all_watchers(t()) :: [{pid(), term()}]

Returns all unique {pid, ref} pairs across all exact and prefix watches.

This is useful for broadcasting messages to all watchers, such as sending Init messages when streaming mode starts.

Parameters

  • watch - The watch storage

Examples

iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key2", :ref2)
iex> watchers = Vdr.TS.Watch.all_watchers(watch)
iex> length(watchers) == 2
true

create()

@spec create() :: t()

Creates a new empty watch storage.

Examples

iex> watch = Vdr.TS.Watch.create()
iex> is_struct(watch, Vdr.TS.Watch)
true

delete(watch, pid, db, key)

@spec delete(t(), pid(), non_neg_integer(), String.t()) ::
  {:ok, t(), non_neg_integer()} | {:error, atom()}

Deletes a single watch entry for the given pid, database, and key.

Returns {:ok, updated_watch, remaining_count} on success, where remaining_count is the number of watches remaining for the pid. Returns {:error, reason} if the watch entry does not exist.

Parameters

  • watch - The watch storage
  • pid - The process identifier
  • db - The database number
  • key - The key to unwatch

Examples

iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> {:ok, watch, 0} = Vdr.TS.Watch.delete(watch, self(), 0, "key1")
iex> {:error, :not_found} = Vdr.TS.Watch.delete(watch, self(), 0, "key1")

delete_all(watch, pid)

@spec delete_all(t(), pid()) :: t()

Deletes all watch entries for the given pid.

Returns the updated watch storage. This operation always succeeds, even if the pid has no watches. Deletes watches across all databases.

Parameters

  • watch - The watch storage
  • pid - The process identifier

Examples

iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 1, "key2", :ref2)
iex> watch = Vdr.TS.Watch.delete_all(watch, self())
iex> [] = Vdr.TS.Watch.lookup(watch, 0, "key1")

delete_prefix(watch, pid, db, prefix)

@spec delete_prefix(t(), pid(), non_neg_integer(), String.t()) ::
  {:ok, t(), non_neg_integer()} | {:error, atom()}

Deletes a single prefix watch entry for the given pid, database, and prefix.

Returns {:ok, updated_watch, remaining_count} on success, where remaining_count is the number of watches remaining for the pid. Returns {:error, reason} if the prefix watch entry does not exist.

Parameters

  • watch - The watch storage
  • pid - The process identifier
  • db - The database number
  • prefix - The prefix to unwatch

Examples

iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add_prefix(watch, self(), 0, "user:", :ref1)
iex> {:ok, watch, 0} = Vdr.TS.Watch.delete_prefix(watch, self(), 0, "user:")
iex> {:error, :not_found} = Vdr.TS.Watch.delete_prefix(watch, self(), 0, "user:")

lookup(watch, db, key)

@spec lookup(t(), non_neg_integer(), String.t()) :: [{term(), pid()}]

Looks up all watches for the given database and exact key.

Returns a list of {ref, pid} tuples for all processes watching the key in the specified database.

Parameters

  • watch - The watch storage
  • db - The database number
  • key - The key to lookup

Examples

iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> [{:ref1, _pid}] = Vdr.TS.Watch.lookup(watch, 0, "key1")
iex> [] = Vdr.TS.Watch.lookup(watch, 0, "nonexistent")

lookup_by_db(watch, db)

@spec lookup_by_db(t(), non_neg_integer()) :: [{term(), pid()}]

Returns all unique {ref, pid} pairs for exact and prefix watches in a specific database.

This is useful for database-wide operations like FLUSHDB.

Parameters

  • watch - The watch storage
  • db - The database number

Examples

iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 0, "key1", :ref1)
iex> {:ok, watch} = Vdr.TS.Watch.add(watch, self(), 1, "key2", :ref2)
iex> watchers = Vdr.TS.Watch.lookup_by_db(watch, 0)
iex> length(watchers) == 1
true

lookup_prefix(watch, db, key)

@spec lookup_prefix(t(), non_neg_integer(), String.t()) :: [{term(), pid()}]

Looks up all prefix watches matching the given database and key.

Returns a list of {ref, pid} tuples for all processes watching a prefix that the key starts with in the specified database.

Parameters

  • watch - The watch storage
  • db - The database number
  • key - The key to lookup

Examples

iex> watch = Vdr.TS.Watch.create()
iex> {:ok, watch} = Vdr.TS.Watch.add_prefix(watch, self(), 0, "user:", :ref1)
iex> [{:ref1, _pid}] = Vdr.TS.Watch.lookup_prefix(watch, 0, "user:123")
iex> [] = Vdr.TS.Watch.lookup_prefix(watch, 0, "other:123")