Supabase.Realtime behaviour
(supabase_realtime v0.5.0)
Copy Markdown
Client for Supabase Realtime.
This module provides a behavior for building Realtime clients that connect to Supabase Realtime. It supports database change subscriptions, broadcast messages, and presence tracking.
Core Architecture
The client has three main components:
- Connection - WebSocket management, heartbeats, reconnection with exponential backoff, send/push buffers, and HTTP fallback for broadcasts.
- Channel Registry - Routes messages to handlers, tracks channel states, supports wildcard event matching.
- Channel Store - ETS-based storage for channel data.
Installation
Add supabase_realtime to your dependencies in mix.exs:
def deps do
[
{:supabase_realtime, "~> 0.4.0"}
]
endQuick Start
First, create a Supabase client with Supabase.init_client/3:
{:ok, client} = Supabase.init_client(
"https://your-project.supabase.co",
"your-api-key"
)Then define a module that uses Supabase.Realtime:
defmodule MyApp.Realtime do
use Supabase.Realtime
def start_link(opts \\ []) do
Supabase.Realtime.start_link(__MODULE__, opts)
end
@impl true
def handle_event({:postgres_changes, :insert, payload}) do
IO.inspect(payload, label: "New record")
:ok
end
@impl true
def handle_event({:broadcast, event_name, payload}) do
IO.inspect(payload, label: "Broadcast")
:ok
end
@impl true
def handle_event({:presence, event, payload}) do
IO.inspect(payload, label: "Presence")
:ok
end
endAdd it to your supervision tree:
{:ok, client} = Supabase.init_client(
System.fetch_env!("SUPABASE_URL"),
System.fetch_env!("SUPABASE_KEY")
)
children = [
{MyApp.Realtime, client: client}
]Subscribing to Database Changes
{:ok, channel} = MyApp.Realtime.channel("public:users")
# Listen to INSERT events on the users table
:ok = MyApp.Realtime.on(channel, "postgres_changes",
event: :insert,
schema: "public",
table: "users"
)
# Listen to all events on the users table
:ok = MyApp.Realtime.on(channel, "postgres_changes",
event: :all,
schema: "public",
table: "users"
)
# Listen with a filter
:ok = MyApp.Realtime.on(channel, "postgres_changes",
event: :update,
schema: "public",
table: "users",
filter: "id=eq.1"
)Broadcast Messages
:ok = MyApp.Realtime.send(channel, %{
type: "broadcast",
event: "new_message",
payload: %{text: "Hello!"}
})Wildcard Events
Use event: :all or event: "*" to listen to all broadcast events
on a channel:
:ok = MyApp.Realtime.on(channel, "broadcast", event: "*")Unsubscribe
:ok = MyApp.Realtime.unsubscribe(channel)
:ok = MyApp.Realtime.remove_all_channels()Options
These options can be passed to start_link/2:
:client(required) - A%Supabase.Client{}struct fromSupabase.init_client/3.:name- Process registration name.:timeout- Connection timeout in milliseconds (default: 10000).:heartbeat_interval- Milliseconds between heartbeats (default: 30000).:reconnect_after_ms- A function that receives the attempt count and returns a delay in milliseconds. Defaults to exponential backoff.:http_fallback- Whentrue, broadcast messages are sent over HTTP if the WebSocket connection is down (default: false).:access_token_fn- A zero-arity function or{mod, fun, args}tuple that returns{:ok, token}or{:error, reason}. Used to refresh tokens before each connection attempt.:params- A map of extra query params appended to the WebSocket URL.
Example with all options
{:ok, _pid} = MyApp.Realtime.start_link(
client: client,
heartbeat_interval: :timer.seconds(15),
reconnect_after_ms: fn tries -> min(:timer.seconds(10), :timer.seconds(tries)) end,
http_fallback: true,
access_token_fn: fn -> MyApp.Auth.get_token() end,
params: %{log_level: "debug"}
)Event Handling
The handle_event/1 callback receives events matching your subscriptions.
It must return :ok.
Event shapes:
- Database changes:
{:postgres_changes, operation, payload} - Broadcast messages:
{:broadcast, event_name, payload} - Presence updates:
{:presence, event, payload}
Summary
Types
Acknowledgment reference type used for tracking broadcast acknowledgments.
Acknowledgment response types.
Channel states for subscriptions.
Configuration options for realtime connection.
Connection states for the WebSocket client.
Error response for various operations.
Represents a channel subscription event.
Types of PostgreSQL database changes.
Filter for PostgreSQL database changes.
Types of presence events.
Types of events that can be received from the server.
Realtime message payload structure.
Message reference type used for tracking message delivery.
Subscription states for channels.
Callbacks
Callback invoked when a realtime event is received.
Functions
Sends a broadcast message to a channel.
Sends a broadcast message to a channel with acknowledgment support.
Creates a new channel for subscription.
Gets the connection status.
Fetches the channel registry for a client.
Fetches the connection PID for a client.
Lists all active channel subscriptions.
Subscribes to events on a channel with an event filter.
Removes all channel subscriptions.
Sends a message on a channel.
Updates the access token for all connections.
Updates the access token for a specific channel.
Starts a Realtime client process linked to the current process.
Tracks presence state on a channel.
Unsubscribes from a channel.
Untracks presence on a channel.
Wait for an acknowledgment for a broadcast message.
Types
@type ack_ref() :: String.t()
Acknowledgment reference type used for tracking broadcast acknowledgments.
@type ack_response() ::
{:ok, :acknowledged} | {:error, :timeout} | {:error, :not_supported}
Acknowledgment response types.
@type channel() :: Supabase.Realtime.Channel.t()
@type channel_opts() :: keyword()
@type channel_state() :: :closed | :errored | :joined | :joining | :leaving
Channel states for subscriptions.
:closed- Channel is not subscribed:errored- Channel encountered an error during subscription:joined- Channel is successfully subscribed:joining- Channel is in the process of subscribing:leaving- Channel is in the process of unsubscribing
@type client() :: Supabase.Client.t()
@type config_options() :: %{ optional(:broadcast) => %{ optional(:self) => boolean(), optional(:ack) => boolean() }, optional(:presence) => %{optional(:key) => String.t()}, optional(:postgres_changes) => [postgres_changes_filter()], optional(:private) => boolean() }
Configuration options for realtime connection.
@type connection_state() :: :connecting | :open | :closing | :closed
Connection states for the WebSocket client.
:connecting- Attempting to establish connection:open- Connection is established and operational:closing- Connection is in the process of closing:closed- Connection is closed
@type error_response() :: {:error, :subscription_error, String.t()} | {:error, :connection_error, String.t()} | {:error, :invalid_channel, String.t()} | {:error, :timeout, String.t()} | {:error, atom(), String.t()}
Error response for various operations.
@type event() :: postgres_change_event() | broadcast_event() | presence_event()
Represents a channel subscription event.
@type event_filter() :: Enumerable.t()
@type event_type() :: :postgres_changes | :broadcast | :presence
@type postgres_change_event() :: {:postgres_changes, postgres_changes_event_type(), postgres_changes_filter()}
@type postgres_changes_event_type() :: :all | :insert | :update | :delete
Types of PostgreSQL database changes.
:all- All event types (represented as*in filters):insert- Record insertion:update- Record update:delete- Record deletion
@type postgres_changes_filter() :: %{ :event => postgres_changes_event_type() | String.t(), :schema => String.t(), optional(:table) => String.t(), optional(:filter) => String.t() }
Filter for PostgreSQL database changes.
@type presence_event() :: {:presence, :join | :leave | :sync, map()}
Types of presence events.
:join- User has joined:leave- User has left:sync- Presence state synchronization
@type realtime_listen_type() :: :broadcast | :presence | :postgres_changes | :system
Types of events that can be received from the server.
:broadcast- User-generated broadcast messages:presence- Presence state updates:postgres_changes- Database change events:system- System events like heartbeats
@type realtime_message() :: %{ topic: String.t(), event: String.t(), payload: map(), ref: ref(), join_ref: ref() | nil }
Realtime message payload structure.
@type ref() :: String.t()
Message reference type used for tracking message delivery.
@type start_option() :: {:name, atom()} | {:client, Supabase.Client.t()} | {:timeout, pos_integer()}
@type subscribe_state() :: :subscribed | :timed_out | :closed | :channel_error
Subscription states for channels.
:subscribed- Successfully subscribed to events:timed_out- Subscription attempt timed out:closed- Subscription has been closed:channel_error- Error occurred during subscription
Callbacks
@callback handle_event(event()) :: :ok
Callback invoked when a realtime event is received.
This callback is called whenever an event matching the subscribed patterns is received from the server.
Parameters
event- The event data structured based on its type:{:postgres_changes, operation, payload}- Database change events{:broadcast, event_name, payload}- Broadcast messages{:presence, presence_event, payload}- Presence updates
Functions
Sends a broadcast message to a channel.
This is a helper function that simplifies sending broadcast messages by constructing the appropriate payload format.
Parameters
conn- The connection PID or namechannel- The channel structevent- The event namepayload- The message payload
Returns
:ok- Successfully sent the broadcast{:error, term()}- Failed to send the broadcast
@spec broadcast_with_ack(pid() | module(), channel(), String.t(), map()) :: {:ok, ack_ref()} | :ok | {:error, term()}
Sends a broadcast message to a channel with acknowledgment support.
This function sends a broadcast message and optionally returns an acknowledgment reference if acknowledgments are enabled on the channel.
Parameters
conn- The connection PID or namechannel- The channel structevent- The event namepayload- The message payload
Returns
{:ok, ack_ref}- Successfully sent with acknowledgment enabled:ok- Successfully sent without acknowledgment{:error, term()}- Failed to send the broadcast
Creates a new channel for subscription.
Parameters
client- The client module or PIDtopic- The topic to subscribe toopts- Channel options
Returns
{:ok, channel}- A channel struct
@spec connection_state(client() | t()) :: connection_state()
Gets the connection status.
Parameters
client- The client module or PID
Fetches the channel registry for a client.
Parameters
realtime- The client module or PID
Fetches the connection PID for a client.
Parameters
realtime- The client module or PID
Lists all active channel subscriptions.
Parameters
registry- The channel registry module or PID
Returns
- A list of
%Channel{}structs
@spec on(channel(), String.t(), event_filter()) :: :ok | {:error, term()}
Subscribes to events on a channel with an event filter.
Parameters
channel- The channel struct returned bychannel/3type- The event type (e.g., "postgres_changes", "broadcast", "presence")filter- Filter options like event type, schema, table, etc.
Examples
Realtime.on(channel, "postgres_changes", event: :insert, schema: "public", table: "users")
Realtime.on(channel, "broadcast", event: "new_message")
Realtime.on(channel, "presence", event: "join")
Removes all channel subscriptions.
Parameters
registry- The channel registry module or PID
Sends a message on a channel.
Parameters
conn- The connection PID or namechannel- The channel structpayload- The message payload
Examples
Realtime.send(channel, type: "broadcast", event: "new_message", payload: %{body: "Hello"})
Updates the access token for all connections.
This function allows refreshing the authentication token used with the Realtime connection without disconnecting. It updates the token for all channels.
Parameters
conn- The connection PID or nametoken- The new access token
Returns
:ok- Successfully sent the token update{:error, term()}- Failed to send the token update
Updates the access token for a specific channel.
This function allows refreshing the authentication token used with the Realtime connection for a specific channel without disconnecting.
Parameters
conn- The connection PID or namechannel- The channel structtoken- The new access token
Returns
:ok- Successfully sent the token update{:error, term()}- Failed to send the token update
@spec start_link(module(), [start_option()]) :: Supervisor.on_start()
Starts a Realtime client process linked to the current process.
Options
:client(required) - A%Supabase.Client{}struct fromSupabase.init_client/3.:name- Registers the process with the given name.:timeout- Connection timeout in milliseconds (default: 10000).:heartbeat_interval- Milliseconds between heartbeats (default: 30000).:reconnect_after_ms- Function that receives attempt count and returns delay in ms.:http_fallback- Send broadcasts over HTTP when WebSocket is down (default: false).:access_token_fn- Zero-arity function returning{:ok, token}for token refresh.:params- Map of extra query params for the WebSocket URL.
Tracks presence state on a channel.
This function sends a presence tracking message that allows other connected clients to see this client's state through presence events.
Parameters
conn- The connection PID or namechannel- The channel structpresence_state- Map of presence state to track
Examples
Realtime.track(conn, channel, %{user_id: 123, online_at: DateTime.utc_now()})Returns
:ok- Successfully sent track message{:error, term()}- Failed to send track message
Unsubscribes from a channel.
Parameters
channel- The channel to unsubscribe from
Untracks presence on a channel.
This function sends a presence untracking message that removes this client's state from the shared presence state visible to other clients.
Parameters
conn- The connection PID or namechannel- The channel struct
Examples
Realtime.untrack(conn, channel)Returns
:ok- Successfully sent untrack message{:error, term()}- Failed to send untrack message
@spec wait_for_ack( ack_ref(), keyword() ) :: ack_response()
Wait for an acknowledgment for a broadcast message.
This function blocks the calling process until an acknowledgment is received or a timeout occurs.
Parameters
ack_ref- The acknowledgment referenceopts- Options including:timeout(default: 5000ms)
Returns
{:ok, :acknowledged}- Message was acknowledged{:error, :timeout}- No acknowledgment received within timeout{:error, :not_found}- Acknowledgment reference not found