Supabase.Realtime.Connection
(supabase_realtime v0.5.0)
Copy Markdown
WebSocket connection manager for Supabase Realtime.
Handles the full lifecycle of a WebSocket connection to the Supabase Realtime service.
Responsibilities
- Establishing and maintaining the WebSocket connection.
- Sending and receiving messages.
- Reconnection with exponential backoff. The default strategy doubles the
delay on each attempt, up to 10 seconds. You can supply your own function
with the
:reconnect_after_msoption. - Heartbeats to detect connection health.
Buffers
Messages sent while the connection is down are placed in a send buffer
(up to 100 entries). Once the WebSocket upgrades, the buffer is flushed in
order. Messages sent while a channel is still joining go into a per-topic
push buffer that flushes when the channel reaches the :joined state.
HTTP Fallback
When :http_fallback is true and the WebSocket is not open, broadcast
messages are delivered through the REST API instead of being buffered.
Other message types are still buffered normally. See Supabase.Realtime.HTTP
for details.
Token Resolution
The access token used for the WebSocket upgrade is resolved in this order:
- The
:access_token_fnoption, if provided and returns{:ok, token}. client.access_token, if set on the%Supabase.Client{}struct.client.apikeyas a last resort.
Custom Params
Any map passed as :params is merged into the WebSocket URL query string
alongside the default apikey and vsn params.
Summary
Functions
Returns a specification to start this module under a supervisor.
Sends a message on a channel.
Sends a message on a channel with acknowledgment support.
Updates the authentication token for all channels.
Starts the connection manager.
Gets the current connection state.
Types
@type state() :: %{ client: Supabase.Client.t(), registry: atom() | pid(), store: atom() | pid(), socket: pid() | nil, status: Supabase.Realtime.connection_state(), stream_ref: reference() | nil, reconnect_timer: reference() | nil, heartbeat_timer: reference() | nil, heartbeat_interval: pos_integer(), pending_heartbeat_ref: Supabase.Realtime.ref() | nil, reconnect_attempts: non_neg_integer(), reconnect_after_ms: (non_neg_integer() -> pos_integer()), send_buffer: :queue.queue(), send_buffer_size: non_neg_integer(), push_buffers: %{required(String.t()) => :queue.queue()}, http_fallback: boolean(), access_token_fn: (-> {:ok, String.t()} | {:error, term()}) | {module(), atom(), list()} | nil, custom_params: map() }
Connection state.
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec send_message(GenServer.server(), Supabase.Realtime.Channel.t(), map()) :: :ok | {:error, term()}
Sends a message on a channel.
Parameters
server- The server PID or namechannel- The channel structpayload- The message payload
@spec send_message_with_ack( GenServer.server(), Supabase.Realtime.Channel.t(), map(), String.t() ) :: :ok | {:error, term()}
Sends a message on a channel with acknowledgment support.
Parameters
server- The server PID or namechannel- The channel structpayload- The message payloadack_ref- The acknowledgment reference
@spec set_auth(GenServer.server(), String.t()) :: :ok | {:error, term()}
Updates the authentication token for all channels.
Parameters
server- The server PID or nametoken- The new authentication token
@spec start_link(keyword()) :: GenServer.on_start()
Starts the connection manager.
Options
:name- Optional registration name:registry- Registry process name:client- A%Supabase.Client{}struct:heartbeat_interval- Interval in milliseconds between heartbeats:reconnect_after_ms- Function that returns reconnection delay based on attempts
@spec state(GenServer.server()) :: Supabase.Realtime.connection_state()
Gets the current connection state.
Parameters
server- The server PID or name