Supabase.Realtime behaviour (supabase_realtime v0.5.0)

Copy Markdown

Client for Supabase Realtime.

This module provides a behavior for building Realtime clients that connect to Supabase Realtime. It supports database change subscriptions, broadcast messages, and presence tracking.

Core Architecture

The client has three main components:

  1. Connection - WebSocket management, heartbeats, reconnection with exponential backoff, send/push buffers, and HTTP fallback for broadcasts.
  2. Channel Registry - Routes messages to handlers, tracks channel states, supports wildcard event matching.
  3. Channel Store - ETS-based storage for channel data.

Installation

Add supabase_realtime to your dependencies in mix.exs:

def deps do
  [
    {:supabase_realtime, "~> 0.4.0"}
  ]
end

Quick Start

First, create a Supabase client with Supabase.init_client/3:

{:ok, client} = Supabase.init_client(
  "https://your-project.supabase.co",
  "your-api-key"
)

Then define a module that uses Supabase.Realtime:

defmodule MyApp.Realtime do
  use Supabase.Realtime

  def start_link(opts \\ []) do
    Supabase.Realtime.start_link(__MODULE__, opts)
  end

  @impl true
  def handle_event({:postgres_changes, :insert, payload}) do
    IO.inspect(payload, label: "New record")
    :ok
  end

  @impl true
  def handle_event({:broadcast, event_name, payload}) do
    IO.inspect(payload, label: "Broadcast")
    :ok
  end

  @impl true
  def handle_event({:presence, event, payload}) do
    IO.inspect(payload, label: "Presence")
    :ok
  end
end

Add it to your supervision tree:

{:ok, client} = Supabase.init_client(
  System.fetch_env!("SUPABASE_URL"),
  System.fetch_env!("SUPABASE_KEY")
)

children = [
  {MyApp.Realtime, client: client}
]

Subscribing to Database Changes

{:ok, channel} = MyApp.Realtime.channel("public:users")

# Listen to INSERT events on the users table
:ok = MyApp.Realtime.on(channel, "postgres_changes",
  event: :insert,
  schema: "public",
  table: "users"
)

# Listen to all events on the users table
:ok = MyApp.Realtime.on(channel, "postgres_changes",
  event: :all,
  schema: "public",
  table: "users"
)

# Listen with a filter
:ok = MyApp.Realtime.on(channel, "postgres_changes",
  event: :update,
  schema: "public",
  table: "users",
  filter: "id=eq.1"
)

Broadcast Messages

:ok = MyApp.Realtime.send(channel, %{
  type: "broadcast",
  event: "new_message",
  payload: %{text: "Hello!"}
})

Wildcard Events

Use event: :all or event: "*" to listen to all broadcast events on a channel:

:ok = MyApp.Realtime.on(channel, "broadcast", event: "*")

Unsubscribe

:ok = MyApp.Realtime.unsubscribe(channel)
:ok = MyApp.Realtime.remove_all_channels()

Options

These options can be passed to start_link/2:

  • :client (required) - A %Supabase.Client{} struct from Supabase.init_client/3.
  • :name - Process registration name.
  • :timeout - Connection timeout in milliseconds (default: 10000).
  • :heartbeat_interval - Milliseconds between heartbeats (default: 30000).
  • :reconnect_after_ms - A function that receives the attempt count and returns a delay in milliseconds. Defaults to exponential backoff.
  • :http_fallback - When true, broadcast messages are sent over HTTP if the WebSocket connection is down (default: false).
  • :access_token_fn - A zero-arity function or {mod, fun, args} tuple that returns {:ok, token} or {:error, reason}. Used to refresh tokens before each connection attempt.
  • :params - A map of extra query params appended to the WebSocket URL.

Example with all options

{:ok, _pid} = MyApp.Realtime.start_link(
  client: client,
  heartbeat_interval: :timer.seconds(15),
  reconnect_after_ms: fn tries -> min(:timer.seconds(10), :timer.seconds(tries)) end,
  http_fallback: true,
  access_token_fn: fn -> MyApp.Auth.get_token() end,
  params: %{log_level: "debug"}
)

Event Handling

The handle_event/1 callback receives events matching your subscriptions. It must return :ok.

Event shapes:

  • Database changes: {:postgres_changes, operation, payload}
  • Broadcast messages: {:broadcast, event_name, payload}
  • Presence updates: {:presence, event, payload}

Summary

Types

Acknowledgment reference type used for tracking broadcast acknowledgments.

Acknowledgment response types.

Channel states for subscriptions.

Configuration options for realtime connection.

Connection states for the WebSocket client.

Error response for various operations.

Represents a channel subscription event.

Types of PostgreSQL database changes.

Filter for PostgreSQL database changes.

Types of presence events.

Types of events that can be received from the server.

Realtime message payload structure.

Message reference type used for tracking message delivery.

Subscription states for channels.

t()

Callbacks

Callback invoked when a realtime event is received.

Functions

Sends a broadcast message to a channel.

Sends a broadcast message to a channel with acknowledgment support.

Creates a new channel for subscription.

Gets the connection status.

Fetches the channel registry for a client.

Fetches the connection PID for a client.

Lists all active channel subscriptions.

Subscribes to events on a channel with an event filter.

Removes all channel subscriptions.

Sends a message on a channel.

Updates the access token for all connections.

Updates the access token for a specific channel.

Starts a Realtime client process linked to the current process.

Tracks presence state on a channel.

Unsubscribes from a channel.

Untracks presence on a channel.

Wait for an acknowledgment for a broadcast message.

Types

ack_ref()

@type ack_ref() :: String.t()

Acknowledgment reference type used for tracking broadcast acknowledgments.

ack_response()

@type ack_response() ::
  {:ok, :acknowledged} | {:error, :timeout} | {:error, :not_supported}

Acknowledgment response types.

broadcast_event()

@type broadcast_event() :: {:broadcast, String.t(), map()}

channel()

@type channel() :: Supabase.Realtime.Channel.t()

channel_opts()

@type channel_opts() :: keyword()

channel_state()

@type channel_state() :: :closed | :errored | :joined | :joining | :leaving

Channel states for subscriptions.

  • :closed - Channel is not subscribed
  • :errored - Channel encountered an error during subscription
  • :joined - Channel is successfully subscribed
  • :joining - Channel is in the process of subscribing
  • :leaving - Channel is in the process of unsubscribing

client()

@type client() :: Supabase.Client.t()

config_options()

@type config_options() :: %{
  optional(:broadcast) => %{
    optional(:self) => boolean(),
    optional(:ack) => boolean()
  },
  optional(:presence) => %{optional(:key) => String.t()},
  optional(:postgres_changes) => [postgres_changes_filter()],
  optional(:private) => boolean()
}

Configuration options for realtime connection.

connection_state()

@type connection_state() :: :connecting | :open | :closing | :closed

Connection states for the WebSocket client.

  • :connecting - Attempting to establish connection
  • :open - Connection is established and operational
  • :closing - Connection is in the process of closing
  • :closed - Connection is closed

error_response()

@type error_response() ::
  {:error, :subscription_error, String.t()}
  | {:error, :connection_error, String.t()}
  | {:error, :invalid_channel, String.t()}
  | {:error, :timeout, String.t()}
  | {:error, atom(), String.t()}

Error response for various operations.

event()

Represents a channel subscription event.

event_filter()

@type event_filter() :: Enumerable.t()

event_type()

@type event_type() :: :postgres_changes | :broadcast | :presence

postgres_change_event()

@type postgres_change_event() ::
  {:postgres_changes, postgres_changes_event_type(), postgres_changes_filter()}

postgres_changes_event_type()

@type postgres_changes_event_type() :: :all | :insert | :update | :delete

Types of PostgreSQL database changes.

  • :all - All event types (represented as * in filters)
  • :insert - Record insertion
  • :update - Record update
  • :delete - Record deletion

postgres_changes_filter()

@type postgres_changes_filter() :: %{
  :event => postgres_changes_event_type() | String.t(),
  :schema => String.t(),
  optional(:table) => String.t(),
  optional(:filter) => String.t()
}

Filter for PostgreSQL database changes.

presence_event()

@type presence_event() :: {:presence, :join | :leave | :sync, map()}

Types of presence events.

  • :join - User has joined
  • :leave - User has left
  • :sync - Presence state synchronization

realtime_listen_type()

@type realtime_listen_type() :: :broadcast | :presence | :postgres_changes | :system

Types of events that can be received from the server.

  • :broadcast - User-generated broadcast messages
  • :presence - Presence state updates
  • :postgres_changes - Database change events
  • :system - System events like heartbeats

realtime_message()

@type realtime_message() :: %{
  topic: String.t(),
  event: String.t(),
  payload: map(),
  ref: ref(),
  join_ref: ref() | nil
}

Realtime message payload structure.

ref()

@type ref() :: String.t()

Message reference type used for tracking message delivery.

start_option()

@type start_option() ::
  {:name, atom()} | {:client, Supabase.Client.t()} | {:timeout, pos_integer()}

subscribe_state()

@type subscribe_state() :: :subscribed | :timed_out | :closed | :channel_error

Subscription states for channels.

  • :subscribed - Successfully subscribed to events
  • :timed_out - Subscription attempt timed out
  • :closed - Subscription has been closed
  • :channel_error - Error occurred during subscription

t()

@type t() :: pid() | atom()

Callbacks

handle_event(event)

@callback handle_event(event()) :: :ok

Callback invoked when a realtime event is received.

This callback is called whenever an event matching the subscribed patterns is received from the server.

Parameters

  • event - The event data structured based on its type:
    • {:postgres_changes, operation, payload} - Database change events
    • {:broadcast, event_name, payload} - Broadcast messages
    • {:presence, presence_event, payload} - Presence updates

Functions

broadcast(conn, channel, event, payload)

@spec broadcast(pid() | module(), channel(), String.t(), map()) ::
  :ok | {:error, term()}

Sends a broadcast message to a channel.

This is a helper function that simplifies sending broadcast messages by constructing the appropriate payload format.

Parameters

  • conn - The connection PID or name
  • channel - The channel struct
  • event - The event name
  • payload - The message payload

Returns

  • :ok - Successfully sent the broadcast
  • {:error, term()} - Failed to send the broadcast

broadcast_with_ack(conn, channel, event, payload)

@spec broadcast_with_ack(pid() | module(), channel(), String.t(), map()) ::
  {:ok, ack_ref()} | :ok | {:error, term()}

Sends a broadcast message to a channel with acknowledgment support.

This function sends a broadcast message and optionally returns an acknowledgment reference if acknowledgments are enabled on the channel.

Parameters

  • conn - The connection PID or name
  • channel - The channel struct
  • event - The event name
  • payload - The message payload

Returns

  • {:ok, ack_ref} - Successfully sent with acknowledgment enabled
  • :ok - Successfully sent without acknowledgment
  • {:error, term()} - Failed to send the broadcast

channel(client, topic, opts \\ [])

@spec channel(client() | t(), String.t(), channel_opts()) ::
  {:ok, channel()} | {:error, term()}

Creates a new channel for subscription.

Parameters

  • client - The client module or PID
  • topic - The topic to subscribe to
  • opts - Channel options

Returns

  • {:ok, channel} - A channel struct

connection_state(client)

@spec connection_state(client() | t()) :: connection_state()

Gets the connection status.

Parameters

  • client - The client module or PID

fetch_channel_registry(realtime)

Fetches the channel registry for a client.

Parameters

  • realtime - The client module or PID

fetch_connection(realtime)

Fetches the connection PID for a client.

Parameters

  • realtime - The client module or PID

list_channels(registry)

@spec list_channels(client() | t()) :: [channel()] | {:error, term()}

Lists all active channel subscriptions.

Parameters

  • registry - The channel registry module or PID

Returns

  • A list of %Channel{} structs

on(channel, type, filter)

@spec on(channel(), String.t(), event_filter()) :: :ok | {:error, term()}

Subscribes to events on a channel with an event filter.

Parameters

  • channel - The channel struct returned by channel/3
  • type - The event type (e.g., "postgres_changes", "broadcast", "presence")
  • filter - Filter options like event type, schema, table, etc.

Examples

Realtime.on(channel, "postgres_changes", event: :insert, schema: "public", table: "users")
Realtime.on(channel, "broadcast", event: "new_message")
Realtime.on(channel, "presence", event: "join")

remove_all_channels(registry)

@spec remove_all_channels(client() | t()) :: :ok | {:error, term()}

Removes all channel subscriptions.

Parameters

  • registry - The channel registry module or PID

send(conn, channel, payload)

@spec send(pid() | module(), channel(), map()) :: :ok | {:error, term()}

Sends a message on a channel.

Parameters

  • conn - The connection PID or name
  • channel - The channel struct
  • payload - The message payload

Examples

Realtime.send(channel, type: "broadcast", event: "new_message", payload: %{body: "Hello"})

set_auth(conn, token)

@spec set_auth(pid() | module(), String.t()) :: :ok | {:error, term()}

Updates the access token for all connections.

This function allows refreshing the authentication token used with the Realtime connection without disconnecting. It updates the token for all channels.

Parameters

  • conn - The connection PID or name
  • token - The new access token

Returns

  • :ok - Successfully sent the token update
  • {:error, term()} - Failed to send the token update

set_auth(conn, channel, token)

@spec set_auth(pid() | module(), channel(), String.t()) :: :ok | {:error, term()}

Updates the access token for a specific channel.

This function allows refreshing the authentication token used with the Realtime connection for a specific channel without disconnecting.

Parameters

  • conn - The connection PID or name
  • channel - The channel struct
  • token - The new access token

Returns

  • :ok - Successfully sent the token update
  • {:error, term()} - Failed to send the token update

start_link(module, opts \\ [])

@spec start_link(module(), [start_option()]) :: Supervisor.on_start()

Starts a Realtime client process linked to the current process.

Options

  • :client (required) - A %Supabase.Client{} struct from Supabase.init_client/3.
  • :name - Registers the process with the given name.
  • :timeout - Connection timeout in milliseconds (default: 10000).
  • :heartbeat_interval - Milliseconds between heartbeats (default: 30000).
  • :reconnect_after_ms - Function that receives attempt count and returns delay in ms.
  • :http_fallback - Send broadcasts over HTTP when WebSocket is down (default: false).
  • :access_token_fn - Zero-arity function returning {:ok, token} for token refresh.
  • :params - Map of extra query params for the WebSocket URL.

track(conn, channel, presence_state)

@spec track(pid() | module(), channel(), map()) :: :ok | {:error, term()}

Tracks presence state on a channel.

This function sends a presence tracking message that allows other connected clients to see this client's state through presence events.

Parameters

  • conn - The connection PID or name
  • channel - The channel struct
  • presence_state - Map of presence state to track

Examples

Realtime.track(conn, channel, %{user_id: 123, online_at: DateTime.utc_now()})

Returns

  • :ok - Successfully sent track message
  • {:error, term()} - Failed to send track message

unsubscribe(channel)

@spec unsubscribe(channel()) :: :ok | {:error, term()}

Unsubscribes from a channel.

Parameters

  • channel - The channel to unsubscribe from

untrack(conn, channel)

@spec untrack(pid() | module(), channel()) :: :ok | {:error, term()}

Untracks presence on a channel.

This function sends a presence untracking message that removes this client's state from the shared presence state visible to other clients.

Parameters

  • conn - The connection PID or name
  • channel - The channel struct

Examples

Realtime.untrack(conn, channel)

Returns

  • :ok - Successfully sent untrack message
  • {:error, term()} - Failed to send untrack message

wait_for_ack(ack_ref, opts \\ [])

@spec wait_for_ack(
  ack_ref(),
  keyword()
) :: ack_response()

Wait for an acknowledgment for a broadcast message.

This function blocks the calling process until an acknowledgment is received or a timeout occurs.

Parameters

  • ack_ref - The acknowledgment reference
  • opts - Options including :timeout (default: 5000ms)

Returns

  • {:ok, :acknowledged} - Message was acknowledged
  • {:error, :timeout} - No acknowledgment received within timeout
  • {:error, :not_found} - Acknowledgment reference not found