Supabase.Realtime.Connection (supabase_realtime v0.5.0)

Copy Markdown

WebSocket connection manager for Supabase Realtime.

Handles the full lifecycle of a WebSocket connection to the Supabase Realtime service.

Responsibilities

  • Establishing and maintaining the WebSocket connection.
  • Sending and receiving messages.
  • Reconnection with exponential backoff. The default strategy doubles the delay on each attempt, up to 10 seconds. You can supply your own function with the :reconnect_after_ms option.
  • Heartbeats to detect connection health.

Buffers

Messages sent while the connection is down are placed in a send buffer (up to 100 entries). Once the WebSocket upgrades, the buffer is flushed in order. Messages sent while a channel is still joining go into a per-topic push buffer that flushes when the channel reaches the :joined state.

HTTP Fallback

When :http_fallback is true and the WebSocket is not open, broadcast messages are delivered through the REST API instead of being buffered. Other message types are still buffered normally. See Supabase.Realtime.HTTP for details.

Token Resolution

The access token used for the WebSocket upgrade is resolved in this order:

  1. The :access_token_fn option, if provided and returns {:ok, token}.
  2. client.access_token, if set on the %Supabase.Client{} struct.
  3. client.apikey as a last resort.

Custom Params

Any map passed as :params is merged into the WebSocket URL query string alongside the default apikey and vsn params.

Summary

Types

Connection state.

Functions

Returns a specification to start this module under a supervisor.

Sends a message on a channel.

Sends a message on a channel with acknowledgment support.

Updates the authentication token for all channels.

Starts the connection manager.

Gets the current connection state.

Types

state()

@type state() :: %{
  client: Supabase.Client.t(),
  registry: atom() | pid(),
  store: atom() | pid(),
  socket: pid() | nil,
  status: Supabase.Realtime.connection_state(),
  stream_ref: reference() | nil,
  reconnect_timer: reference() | nil,
  heartbeat_timer: reference() | nil,
  heartbeat_interval: pos_integer(),
  pending_heartbeat_ref: Supabase.Realtime.ref() | nil,
  reconnect_attempts: non_neg_integer(),
  reconnect_after_ms: (non_neg_integer() -> pos_integer()),
  send_buffer: :queue.queue(),
  send_buffer_size: non_neg_integer(),
  push_buffers: %{required(String.t()) => :queue.queue()},
  http_fallback: boolean(),
  access_token_fn:
    (-> {:ok, String.t()} | {:error, term()}) | {module(), atom(), list()} | nil,
  custom_params: map()
}

Connection state.

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

send_message(server, channel, payload)

@spec send_message(GenServer.server(), Supabase.Realtime.Channel.t(), map()) ::
  :ok | {:error, term()}

Sends a message on a channel.

Parameters

  • server - The server PID or name
  • channel - The channel struct
  • payload - The message payload

send_message_with_ack(server, channel, payload, ack_ref)

@spec send_message_with_ack(
  GenServer.server(),
  Supabase.Realtime.Channel.t(),
  map(),
  String.t()
) ::
  :ok | {:error, term()}

Sends a message on a channel with acknowledgment support.

Parameters

  • server - The server PID or name
  • channel - The channel struct
  • payload - The message payload
  • ack_ref - The acknowledgment reference

set_auth(server, token)

@spec set_auth(GenServer.server(), String.t()) :: :ok | {:error, term()}

Updates the authentication token for all channels.

Parameters

  • server - The server PID or name
  • token - The new authentication token

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts the connection manager.

Options

  • :name - Optional registration name
  • :registry - Registry process name
  • :client - A %Supabase.Client{} struct
  • :heartbeat_interval - Interval in milliseconds between heartbeats
  • :reconnect_after_ms - Function that returns reconnection delay based on attempts

state(server)

Gets the current connection state.

Parameters

  • server - The server PID or name