A protocol-based PubSub abstraction for Phoenix LiveView.
defmodule MyApp.Blog do
alias Amplified.PubSub
def create_post(attrs) do
%Post{}
|> Post.changeset(attrs)
|> Repo.insert()
|> PubSub.broadcast(:created)
end
endAmplified.PubSub wraps Phoenix.PubSub with a protocol layer so the
same broadcast/2, subscribe/1, and handle_info/2 calls work whether
you pass a struct, an {:ok, struct} tuple from a Repo operation, a list
of structs, or a raw channel string. This lets you weave PubSub into your
context functions as pipeline-friendly operations that chain naturally
with Ecto.
Configuration
Configure the PubSub server name used for subscriptions and broadcasts:
# config/config.exs
config :amplified_pubsub, pubsub_server: :my_appSetup
Schema modules opt in by adding use Amplified.PubSub:
defmodule MyApp.Blog.Post do
use Ecto.Schema
use Amplified.PubSub
schema "posts" do
field :title, :string
field :body, :string
timestamps()
end
endThis generates an Amplified.PubSub.Protocol implementation with sensible
defaults:
channel/1derives"post:<id>"from the module's last segment (snake_cased) and the struct's:idfieldsubscribe/1andunsubscribe/1subscribe via the configured PubSub serverbroadcast/2wraps atom/string events as{event, subject}and publishes to the subject's channelhandle_info/2,3,4return{:cont, socket}(pass-through) so unhandled messages don't crash
Broadcasting from context functions
Since broadcast/2 returns its first argument — and passes through
{:ok, _} and {:error, _} tuples — it drops right into Ecto pipelines:
defmodule MyApp.Blog do
alias Amplified.PubSub
def create_post(attrs) do
%Post{}
|> Post.changeset(attrs)
|> Repo.insert()
|> PubSub.broadcast(:created)
end
def update_post(%Post{} = post, attrs) do
post
|> Post.changeset(attrs)
|> Repo.update()
|> PubSub.broadcast(:updated)
end
def delete_post(%Post{} = post) do
post
|> Repo.delete()
|> PubSub.broadcast(:deleted)
end
endOn success, PubSub.broadcast({:ok, post}, :created) unwraps the tuple,
broadcasts {:created, post} on the "post:<id>" channel, and returns
{:ok, post}. On failure, {:error, changeset} passes through without
broadcasting.
Subscribing in LiveViews
Subscribe a LiveView process to a subject's channel in mount/3. The
subscription is idempotent — calling it twice won't produce duplicate
messages:
def mount(%{"id" => id}, _session, socket) do
post = Blog.get_post!(id)
PubSub.subscribe(post)
{:ok, assign(socket, post: post)}
endUnsubscribe when you no longer want messages:
PubSub.unsubscribe(post)You can also subscribe to a raw channel string:
PubSub.subscribe("posts:feed")Handling messages
PubSub.handle_info/2 returns {:cont, socket} or {:halt, socket} —
the same convention used by Phoenix.LiveView.attach_hook/4 — so you
can wire it directly into the LiveView lifecycle as an on_mount hook.
Every LiveView in the live session then gets PubSub handling
automatically, with no per-view boilerplate.
Define a hooks module:
defmodule MyAppWeb.Hooks do
import Phoenix.LiveView
alias Amplified.PubSub
defmodule Default do
def on_mount(:default, _params, _session, socket) do
{:cont, MyAppWeb.Hooks.attach_defaults(socket)}
end
end
# This is the critical wiring step. attach_hook/4 registers
# PubSub.handle_info/2 as a lifecycle hook that intercepts every
# message the LiveView process receives. Without this, PubSub
# messages will arrive but the protocol dispatch won't fire —
# your schema handle_info/3 implementations won't be called.
def attach_defaults(socket) do
socket
|> subscribe()
|> attach_hook(:pubsub, :handle_info, &PubSub.handle_info/2)
end
# Subscribe based on whatever is assigned to the socket.
# Unsubscribe first to prevent duplicates on reconnect.
defp subscribe(socket) do
if connected?(socket) do
user = socket.assigns[:current_user]
project = socket.assigns[:project]
if user, do: PubSub.subscribe(user)
if project, do: PubSub.subscribe(project)
end
socket
end
endThen attach the hook in the router:
live_session :authenticated, on_mount: MyAppWeb.Hooks.Default do
live "/posts", PostLive.Index
live "/posts/:id", PostLive.Show
endWith this in place, the hook subscribes every LiveView to the current
user and project channels and dispatches all {action, subject} messages
through the protocol. Individual LiveViews can still subscribe to
additional channels in their own mount/3 — the hook returns
{:cont, socket} for anything it doesn't handle, so the message
continues to the view's handle_info/2.
Event handling in schemas
When a {action, subject} message arrives, the Tuple dispatcher looks
up the subject's protocol implementation and calls its handle_info/3.
This lets you define handlers in the schema's use Amplified.PubSub
block.
When to use schema-level handlers
Reserve schema-level handlers for events that would otherwise require the same callback to be duplicated across multiple LiveViews. Handlers specific to a single view belong in that view, not in the schema.
A good example is keeping the current user up to date across all LiveViews — every view needs the same logic, so colocating it with the schema avoids repetition:
defmodule MyApp.Accounts.User do
use Ecto.Schema
use Amplified.PubSub do
# Match the broadcast user's ID against the scope's user ID
# to ensure we only update when the broadcast is for *this*
# session's authenticated user.
def handle_info(
%User{id: id} = user,
:updated,
%{assigns: %{current_scope: %{user: %{id: id}} = scope}} = socket
) do
{:cont, assign(socket, current_scope: %{scope | user: user})}
end
def handle_info(
%User{id: id},
:deleted,
%{assigns: %{current_scope: %{user: %{id: id}}}} = socket
) do
{:halt, redirect(socket, to: ~p"/sign-out")}
end
end
schema "users" do
field :email, :string
field :name, :string
timestamps()
end
end
defmodule MyApp.Blog.Post do
use Ecto.Schema
use Amplified.PubSub do
def handle_info(%Post{id: id} = post, :updated, %{assigns: %{post: %{id: id}}} = socket) do
{:cont, assign(socket, post: post)}
end
def handle_info(%Post{id: id}, :deleted, %{assigns: %{post: %{id: id}}} = socket) do
{:halt, push_navigate(socket, to: ~p"/posts")}
end
end
schema "posts" do
field :title, :string
field :body, :string
timestamps()
end
endWith these implementations in place and a global attach_hook
dispatching through PubSub.handle_info/2, a :updated broadcast
for the current %User{} will automatically update the scope's user on
every connected LiveView that belongs to that user — with no
per-view code at all. Broadcasts for other users fall through as
{:cont, socket} and are ignored.
The convention is to return {:halt, socket} when you've handled the
message and you don't want other lifecycle hooks to run, and
{:cont, socket} when you do. The defaults always return
{:cont, socket}, so unmatched messages fall through safely.
Flash messages
The Tuple implementation recognises {:flash, level, message} tuples and
calls Phoenix.LiveView.put_flash/3 automatically:
PubSub.broadcast("room:lobby", {:flash, :info, "Someone joined!"})Custom channels
Override the channel derivation by passing a block to use:
use Amplified.PubSub do
def channel(%Post{slug: slug}, _ns), do: "post:#{slug}"
endOr implement the protocol externally with defimpl:
defimpl Amplified.PubSub.Protocol, for: MyApp.Blog.Post do
use Amplified.PubSub, impl: true
def channel(%{slug: slug}, _ns), do: "post:#{slug}"
endWhen using impl: true, you get all the default function bodies injected
into your defimpl block, so you only need to override the functions you
want to customise.
Namespaced channels
All channel/2 functions accept an optional namespace for scoping. This is
useful when different LiveViews care about different aspects of the same
resource:
PubSub.channel(post) # => "post:abc-123"
PubSub.channel(post, :comments) # => "post:abc-123:comments"Lists and streams
Broadcasting or subscribing to a list operates on each item individually:
PubSub.subscribe(posts) # subscribes to each post's channel
PubSub.broadcast(posts, :archived) # broadcasts for each postWhen broadcasting a list with more than one item, items are grouped by
channel and sent as a single [{item, event}, ...] message per channel
for efficiency. Streams are materialised to lists before operating.
Protocol implementations
Built-in protocol implementations handle the following types:
BitString— treats the string as a literal channel name; broadcasts, subscribes, and unsubscribes viaPhoenix.PubSubAtom— converts to a string channel (e.g.:users→"users"); broadcast is a no-op that returns the messageTuple— unwraps{:ok, subject}for broadcast/subscribe; passes{:error, _}through unchanged; dispatches{action, subject}messages inhandle_infoList— maps the operation across each element, grouping multi-item broadcasts by channelStream— materialises to a list, then delegates to the List implementationPhoenix.LiveView.Socket— derives a channel from the socket's session ID ("socket:<id>")- Structs via
use Amplified.PubSub— derives channels from the module's last name segment and the struct's:idfield
Telemetry
The following telemetry events are emitted:
[:amplified, :pubsub, :broadcast]— fired on every broadcast. Measurements are empty (%{}). Metadata contains:topicand:message.
Attach a handler in your application to log broadcasts, collect metrics, or perform any other observation:
:telemetry.attach("my-app-pubsub-log", [:amplified, :pubsub, :broadcast], fn
_event, _measurements, %{topic: topic, message: message}, _config ->
Logger.debug("broadcast(\#{inspect(topic)}, \#{inspect(message)})")
end, nil)
Summary
Functions
Injects an Amplified.PubSub.Protocol implementation into the calling module.
Broadcasts a message for the given subject.
Broadcasts a message with additional attributes for the given subject.
Returns the PubSub channel name for the given subject.
Dispatches an incoming PubSub message through the protocol.
Dispatches a message for a specific subject and socket.
Dispatches a message for a specific subject with attributes and socket.
Returns the protocol implementation module for the given data, or nil.
Like impl_for/1, but raises Protocol.UndefinedError if no
implementation exists.
Returns the configured PubSub server name.
Subscribes the current process to the subject's PubSub channel.
Unsubscribes the current process from the subject's PubSub channel.
Functions
Injects an Amplified.PubSub.Protocol implementation into the calling module.
When called without options, a full defimpl Amplified.PubSub.Protocol
block is generated for the calling module's struct with default
implementations of all protocol functions. Any function defined in an
optional :do block overrides the corresponding default.
Options
:impl— whentrue, injects the default function bodies without wrapping them in adefimplblock. Use this when writing an explicitdefimpland you want the defaults as a starting point.:doblock — functions defined here override the corresponding defaults. This is the primary way to customise channel derivation or message handling.
Examples
Basic usage generates a full protocol implementation:
defmodule MyApp.Blog.Post do
use Ecto.Schema
use Amplified.PubSub
schema "posts" do
field :title, :string
end
endWith overrides — custom channel and event handling:
defmodule MyApp.Blog.Post do
use Ecto.Schema
use Amplified.PubSub do
def channel(%Post{slug: slug}, _ns), do: "post:#{slug}"
def handle_info(%Post{} = post, :updated, socket) do
{:halt, assign(socket, post: post)}
end
end
schema "posts" do
field :title, :string
field :slug, :string
end
endInside an explicit defimpl:
defimpl Amplified.PubSub.Protocol, for: MyApp.Blog.Post do
use Amplified.PubSub, impl: true
def channel(%{slug: slug}, _ns), do: "post:#{slug}"
end
Broadcasts a message for the given subject.
The behaviour depends on the subject's type, dispatched through
Amplified.PubSub.Protocol:
Struct — derives the channel from the struct, wraps atom/string events as
{event, subject}, broadcasts viaPhoenix.PubSub, and returns the struct for pipeline chaining.{:ok, struct}— unwraps the tuple, broadcasts for the struct, and returns{:ok, struct}.{:error, reason}— passes through without broadcasting.String — treats it as a literal channel name and broadcasts the message directly. Returns the message.
List — broadcasts for each item (grouped by channel when there are multiple items). Returns the list.
Atom — no-op; returns the message unchanged.
Examples
Broadcast an event for a struct:
PubSub.broadcast(post, :created)
# => broadcasts {:created, post} on "post:<id>", returns postPipeline with Ecto Repo operations — the {:ok, _} / {:error, _}
tuple is handled transparently:
%Post{}
|> Post.changeset(attrs)
|> Repo.insert()
|> PubSub.broadcast(:created)
# => {:ok, post} on success, {:error, changeset} on failureBroadcast to a raw channel string:
PubSub.broadcast("notifications:global", {:alert, "System update"})
# => broadcasts {:alert, "System update"}, returns the messageBroadcast for a list of subjects:
PubSub.broadcast(posts, :archived)
# => broadcasts :archived for each post, returns posts
Broadcasts a message with additional attributes for the given subject.
Like broadcast/2, but includes an attributes map in the payload. For
atom/string events on structs, the broadcast payload becomes
{event, subject, attrs}.
Examples
PubSub.broadcast(post, :updated, %{changed_fields: [:title]})
# => broadcasts {:updated, post, %{changed_fields: [:title]}}In a pipeline:
post
|> Post.changeset(attrs)
|> Repo.update()
|> PubSub.broadcast(:updated, %{changed_fields: Map.keys(attrs)})
Returns the PubSub channel name for the given subject.
The channel format depends on the subject type:
Struct (via
use Amplified.PubSub) —"<snake_cased_module>:<id>", e.g."blog_post:abc-123"for%MyApp.Blog.BlogPost{id: "abc-123"}.String — returned as-is.
Atom — converted to string, e.g.
:users→"users".List — returns a list of channels, one per element.
Stream — materialised to a list, then returns channels.
Socket —
"socket:<session_id>".
An optional namespace is appended with a : separator.
Examples
PubSub.channel(%Post{id: "abc-123"})
#=> "post:abc-123"
PubSub.channel(%Post{id: "abc-123"}, :comments)
#=> "post:abc-123:comments"
PubSub.channel("my:channel")
#=> "my:channel"
PubSub.channel("my:channel", "drafts")
#=> "my:channel:drafts"
PubSub.channel(:users, :admin)
#=> "users:admin"
PubSub.channel([%Post{id: "1"}, %Post{id: "2"}])
#=> ["post:1", "post:2"]
Dispatches an incoming PubSub message through the protocol.
This is the primary entry point, typically called from a LiveView's
handle_info/2 callback. The Tuple protocol implementation unpacks
{action, subject} messages and delegates to the subject's
handle_info/3, which lets you define event handlers in your schema's
PubSub block.
Returns {:cont, socket} for unhandled messages or {:halt, socket}
for handled ones.
Flash messages are handled automatically — when {:flash, level, msg}
is received, Phoenix.LiveView.put_flash/3 is called and
{:halt, socket} is returned.
Examples
# In your LiveView
def handle_info(message, socket) do
case PubSub.handle_info(message, socket) do
{:cont, socket} -> {:noreply, socket}
{:halt, socket} -> {:noreply, socket}
end
end
Dispatches a message for a specific subject and socket.
Called internally by the Tuple implementation's handle_info/2 after
unpacking {action, subject}. Override this in your schema's
use Amplified.PubSub block to handle specific events:
use Amplified.PubSub do
def handle_info(%Post{} = post, :updated, socket) do
{:halt, assign(socket, post: post)}
end
end
Dispatches a message for a specific subject with attributes and socket.
Like handle_info/3 but receives the additional attributes map that was
passed to broadcast/3:
use Amplified.PubSub do
def handle_info(%Post{} = post, :updated, %{changed_fields: fields}, socket) do
{:halt, assign(socket, post: post, changed: fields)}
end
end
Returns the protocol implementation module for the given data, or nil.
Useful for checking whether a value has a PubSub implementation before attempting to dispatch.
Examples
Amplified.PubSub.impl_for("a string")
#=> Amplified.PubSub.Protocol.BitString
Amplified.PubSub.impl_for(:an_atom)
#=> Amplified.PubSub.Protocol.Atom
Amplified.PubSub.impl_for(42)
#=> nil
Like impl_for/1, but raises Protocol.UndefinedError if no
implementation exists.
Examples
Amplified.PubSub.impl_for!("a string")
#=> Amplified.PubSub.Protocol.BitString
Returns the configured PubSub server name.
The server name is looked up from application config at runtime via
Application.fetch_env!/2. Raises ArgumentError if :pubsub_server
is not configured for :amplified_pubsub.
Examples
Amplified.PubSub.pubsub_server()
#=> :my_app
Subscribes the current process to the subject's PubSub channel.
For structs, the subject is returned for pipeline chaining. The implementation first unsubscribes to prevent duplicate subscriptions, making the call idempotent.
Examples
Subscribe in a LiveView's mount/3:
def mount(%{"id" => id}, _session, socket) do
post = Blog.get_post!(id)
PubSub.subscribe(post)
{:ok, assign(socket, post: post)}
endSubscribe to a raw channel:
PubSub.subscribe("posts:feed")Subscribe to all items in a list:
PubSub.subscribe(posts)
Unsubscribes the current process from the subject's PubSub channel.
Examples
PubSub.unsubscribe(post)
PubSub.unsubscribe("posts:feed")