Usage
Copy MarkdownHow It Works
The Realtime client has three internal components:
Connection manages the WebSocket lifecycle: connecting, heartbeats, reconnection, and message transmission. When disconnected, messages go into a send buffer (up to 100 entries) and are flushed when the socket reopens.
Channel Registry receives messages from the Connection, matches them against your subscriptions, and calls your
handle_event/1callback.Channel Store keeps channel data in an ETS table so it survives across the application.
Events flow like this: WebSocket -> Connection -> Registry -> your callback.
Channels
Channels scope your subscriptions to a topic. Create one with:
{:ok, channel} = MyApp.Realtime.channel("room:lobby")Topics are automatically prefixed with realtime: if not already present.
Broadcast
Sending
MyApp.Realtime.broadcast(channel, "new_message", %{body: "hello"})Or using the lower-level send/2:
MyApp.Realtime.send(channel, %{
type: "broadcast",
event: "new_message",
payload: %{body: "hello"}
})Receiving
Subscribe to broadcast events on a channel:
:ok = MyApp.Realtime.on(channel, "broadcast", event: "new_message")Then handle them in your callback:
@impl true
def handle_event({:broadcast, "new_message", payload}) do
IO.inspect(payload)
:ok
endBroadcast Self
By default, you do not receive your own broadcast messages. Enable it with:
{:ok, channel} = MyApp.Realtime.channel("room:lobby", broadcast: [self: true])Broadcast Acknowledgment
Enable delivery confirmation with:
{:ok, channel} = MyApp.Realtime.channel("room:lobby", broadcast: [ack: true])Then use broadcast_with_ack/3 and wait_for_ack/2:
{:ok, ack_ref} = MyApp.Realtime.broadcast_with_ack(channel, "event", %{data: 1})
case MyApp.Realtime.wait_for_ack(ack_ref, timeout: 5000) do
{:ok, :acknowledged} -> :ok
{:error, :timeout} -> :retry
endWildcard Events
Listen to all broadcast events on a channel:
:ok = MyApp.Realtime.on(channel, "broadcast", event: "*")
# or equivalently
:ok = MyApp.Realtime.on(channel, "broadcast", event: :all)Presence
Track your own state so other clients can see it:
MyApp.Realtime.track(channel, %{user_id: 123, online_at: DateTime.utc_now()})Stop tracking:
MyApp.Realtime.untrack(channel)Handle presence events:
@impl true
def handle_event({:presence, :join, joins}) do
IO.inspect(joins, label: "Users joined")
:ok
end
@impl true
def handle_event({:presence, :leave, leaves}) do
IO.inspect(leaves, label: "Users left")
:ok
end
@impl true
def handle_event({:presence, :sync, state}) do
IO.inspect(state, label: "Full presence state")
:ok
endSet a custom presence key:
{:ok, channel} = MyApp.Realtime.channel("room:lobby", presence: [key: "user_123"])Postgres Changes
Subscribe to database changes:
{:ok, channel} = MyApp.Realtime.channel("db-changes")
# All changes on a table
:ok = MyApp.Realtime.on(channel, "postgres_changes",
event: :all, schema: "public", table: "messages"
)
# Only inserts
:ok = MyApp.Realtime.on(channel, "postgres_changes",
event: :insert, schema: "public", table: "messages"
)
# With a filter
:ok = MyApp.Realtime.on(channel, "postgres_changes",
event: :update, schema: "public", table: "messages",
filter: "room_id=eq.42"
)Handle the events:
@impl true
def handle_event({:postgres_changes, :insert, payload}) do
# payload includes "record", "old_record", "columns", etc.
# Column values are automatically transformed to Elixir types
# (integers, booleans, dates, JSON, etc.)
:ok
endSee Supabase.Realtime.PostgresTypes for the full list of supported type
transforms.
Connection State
Check the current connection state:
MyApp.Realtime.connection_state()
# Returns :connecting | :open | :closing | :closedYou also receive connection state changes in your callback:
@impl true
def handle_event({:connection, :state_change, %{old: old, new: new}}) do
IO.puts("Connection changed from #{old} to #{new}")
:ok
endToken Refresh
If your access token expires, provide an :access_token_fn when starting:
{MyApp.Realtime,
client: client,
access_token_fn: fn -> MyApp.Auth.get_fresh_token() end}This function is called before each WebSocket upgrade. If it returns
{:error, _}, the client falls back to client.access_token or
client.apikey.
You can also update the token at runtime for all channels:
MyApp.Realtime.set_auth("new-jwt-token")Or for a specific channel:
MyApp.Realtime.set_auth(channel, "new-jwt-token")Error Handling
Realtime errors use Supabase.Realtime.Error:
error = Supabase.Realtime.Error.new(:timeout, "Channel join timed out", %{topic: "realtime:room"})Convert to the shared supabase-ex error format:
supabase_error = Supabase.Realtime.Error.to_supabase_error(error)
# => %Supabase.Error{code: :timeout, service: :realtime, ...}