Supabase.Realtime.Channel.Registry (supabase_realtime v0.5.0)

Copy Markdown

Registry for managing Realtime channel subscriptions.

Tracks active channels and their subscriptions, and routes incoming messages to the appropriate callback functions.

Message Routing

When a message arrives, the registry matches it against channel bindings:

  • Broadcast events are matched by event name. Bindings with event: "*" (wildcard) match every broadcast event on the channel.
  • Postgres changes are matched by server-assigned binding IDs. After a channel joins, the server reply includes IDs for each postgres_changes binding. The registry stores these IDs so that incoming database events are dispatched only to the correct bindings.
  • Presence events are dispatched to all channels subscribed to the topic.

Connection State Notifications

When the WebSocket connection state changes (for example, from :open to :closed), the connection process notifies the registry. The registry then dispatches a {:connection, :state_change, %{old: old, new: new}} event to all registered callback modules.

Postgres Type Transforms

Database change payloads include column metadata. The registry uses Supabase.Realtime.PostgresTypes to convert string values in record and old_record maps to native Elixir types before dispatching events.

Summary

Types

Registry state holding all subscription information.

Functions

Returns a specification to start this module under a supervisor.

Creates a new channel in the registry.

Handles a message from the server.

Lists all active channels.

Removes all channel subscriptions.

Starts the channel registry process.

Subscribes to events on a channel.

Unsubscribes from a channel.

Types

state()

@type state() :: %{
  module: module(),
  pending_events: list(),
  presence_state: map(),
  store: pid() | module(),
  connection: pid() | module() | nil
}

Registry state holding all subscription information.

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

create_channel(server, topic, opts \\ [])

@spec create_channel(GenServer.server(), String.t(), keyword()) ::
  {:ok, Supabase.Realtime.Channel.t()} | {:error, term()}

Creates a new channel in the registry.

Parameters

  • server - The server PID or name
  • topic - The topic to subscribe to
  • opts - Channel options

handle_message(server, message)

@spec handle_message(GenServer.server(), Supabase.Realtime.realtime_message()) :: :ok

Handles a message from the server.

Parameters

  • server - The server PID or name
  • message - The message to handle

is_database_event(event)

(macro)

list_channels(server)

@spec list_channels(GenServer.server()) :: [Supabase.Realtime.Channel.t()]

Lists all active channels.

Parameters

  • server - The server PID or name

remove_all_channels(server)

@spec remove_all_channels(GenServer.server()) :: :ok | {:error, term()}

Removes all channel subscriptions.

Parameters

  • server - The server PID or name

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts the channel registry process.

Options

  • :name - Optional registration name
  • :module - The callback module for event handling

subscribe(server, channel, type, filter)

@spec subscribe(
  GenServer.server(),
  Supabase.Realtime.Channel.t(),
  String.t(),
  map() | keyword()
) ::
  :ok | {:error, term()}

Subscribes to events on a channel.

Parameters

  • server - The server PID or name
  • channel - The channel struct
  • type - The event type
  • filter - The event filter

unsubscribe(server, channel)

@spec unsubscribe(GenServer.server(), Supabase.Realtime.Channel.t()) ::
  :ok | {:error, term()}

Unsubscribes from a channel.

Parameters

  • server - The server PID or name
  • channel - The channel to unsubscribe from