Amplified.PubSub.Protocol protocol (Amplified.PubSub v0.2.1)

Copy Markdown View Source

The protocol that powers Amplified.PubSub.

Each function in this protocol is dispatched based on the type of its first argument. Built-in implementations exist for BitString, Atom, Tuple, List, Stream, and Phoenix.LiveView.Socket. Struct implementations are generated automatically when you use Amplified.PubSub in a module.

You generally don't need to call these functions directly — use the delegating functions in Amplified.PubSub instead. This module is relevant when you need to implement the protocol manually with defimpl.

Implementing for a struct

The easiest way is use Amplified.PubSub in your schema module, which generates a full implementation with sensible defaults. If you need more control, write a defimpl block and pass impl: true to get the defaults as a starting point:

defimpl Amplified.PubSub.Protocol, for: MyApp.Blog.Post do
  use Amplified.PubSub, impl: true

  # Override just the channel derivation
  def channel(%{slug: slug}, _ns), do: "post:#{slug}"
end

Return conventions

  • broadcast/2,3 — return the subject (struct implementations) or the message (string implementation), enabling pipeline chaining.
  • subscribe/1, unsubscribe/1 — return the subject for structs, or :ok for strings.
  • handle_info/2,3,4 — return {:cont, socket} when the message was not handled, or {:halt, socket} when it was.

Summary

Types

t()

All the types that implement this protocol.

Functions

Broadcasts a message to the channel for the given subject.

Broadcasts a message with additional attributes to the channel for the given subject.

Returns the PubSub channel name for the given subject.

Returns the PubSub channel name for the given subject, scoped to namespace.

Dispatches an incoming PubSub message through the protocol.

Dispatches a message for a specific subject.

Dispatches a message for a specific subject with additional attributes.

Subscribes the current process to the PubSub channel for the given subject.

Unsubscribes the current process from the PubSub channel for the given subject.

Types

t()

@type t() :: term()

All the types that implement this protocol.

Functions

broadcast(subject, message)

@spec broadcast(
  subject :: subject,
  message :: term()
) :: subject
when subject: nil | binary() | struct() | {:ok, struct()} | {:error, term()}

Broadcasts a message to the channel for the given subject.

For struct implementations generated by use Amplified.PubSub, atom and binary events are automatically wrapped as {event, subject} before broadcasting. The subject is returned for pipeline chaining.

For the BitString implementation, the first argument is treated as a literal channel name and the message is broadcast as-is via Phoenix.PubSub.broadcast/3.

broadcast(subject, message, attrs)

@spec broadcast(
  subject :: subject,
  message :: term(),
  attrs :: map()
) :: subject
when subject: nil | binary() | struct() | {:ok, struct()} | {:error, term()}

Broadcasts a message with additional attributes to the channel for the given subject.

Like broadcast/2, but the broadcast payload includes the attributes. For atom/binary events on structs, the payload becomes {event, subject, attrs}.

channel(subject)

@spec channel(subject :: struct()) :: binary()

Returns the PubSub channel name for the given subject.

For structs, the default implementation derives the channel from the module's last name segment (snake_cased) and the struct's :id field, e.g. "post:abc-123". The optional namespace is appended with a : separator.

channel(subject, namespace)

@spec channel(
  subject :: struct(),
  namespace :: String.t() | atom() | nil
) :: binary()

Returns the PubSub channel name for the given subject, scoped to namespace.

handle_info(message, socket)

@spec handle_info(
  message :: term(),
  socket :: Phoenix.LiveView.Socket.t()
) :: {:cont | :halt, Phoenix.LiveView.Socket.t()}

Dispatches an incoming PubSub message through the protocol.

The Tuple implementation is the primary dispatcher — it pattern-matches on {action, subject} and {:flash, level, message} tuples, delegating to the subject's handle_info/3 when a protocol implementation exists for the subject.

Returns {:cont, socket} for unhandled messages, {:halt, socket} for handled ones.

handle_info(subject, message, socket)

@spec handle_info(
  subject :: struct(),
  message :: term(),
  socket :: Phoenix.LiveView.Socket.t()
) :: {:cont | :halt, Phoenix.LiveView.Socket.t()}

Dispatches a message for a specific subject.

Called by the Tuple implementation after unpacking {action, subject}. Override this in your use Amplified.PubSub block to handle events for your struct type.

handle_info(subject, message, changeset, socket)

@spec handle_info(
  subject :: struct(),
  message :: term(),
  changeset :: map(),
  socket :: Phoenix.LiveView.Socket.t()
) :: {:cont | :halt, Phoenix.LiveView.Socket.t()}

Dispatches a message for a specific subject with additional attributes.

Called by the Tuple implementation after unpacking {action, subject, attrs}. Override this in your use Amplified.PubSub block to handle events that include a changeset or attributes map.

subscribe(channel)

@spec subscribe(binary() | struct()) :: :ok | struct() | {:error, term()}

Subscribes the current process to the PubSub channel for the given subject.

The BitString implementation subscribes via the configured PubSub server. Struct implementations derive the channel first, then delegate to the string implementation.

unsubscribe(channel)

@spec unsubscribe(binary() | struct()) :: :ok | struct() | {:error, term()}

Unsubscribes the current process from the PubSub channel for the given subject.