Amplified.PubSub (Amplified.PubSub v0.2.1)

Copy Markdown View Source

A protocol-based PubSub abstraction for Phoenix LiveView.

defmodule MyApp.Blog do
  alias Amplified.PubSub

  def create_post(attrs) do
    %Post{}
    |> Post.changeset(attrs)
    |> Repo.insert()
    |> PubSub.broadcast(:created)
  end
end

Amplified.PubSub wraps Phoenix.PubSub with a protocol layer so the same broadcast/2, subscribe/1, and handle_info/2 calls work whether you pass a struct, an {:ok, struct} tuple from a Repo operation, a list of structs, or a raw channel string. This lets you weave PubSub into your context functions as pipeline-friendly operations that chain naturally with Ecto.

Configuration

Configure the PubSub server name used for subscriptions and broadcasts:

# config/config.exs
config :amplified_pubsub, pubsub_server: :my_app

Setup

Schema modules opt in by adding use Amplified.PubSub:

defmodule MyApp.Blog.Post do
  use Ecto.Schema
  use Amplified.PubSub

  schema "posts" do
    field :title, :string
    field :body, :string
    timestamps()
  end
end

This generates an Amplified.PubSub.Protocol implementation with sensible defaults:

  • channel/1 derives "post:<id>" from the module's last segment (snake_cased) and the struct's :id field
  • subscribe/1 and unsubscribe/1 subscribe via the configured PubSub server
  • broadcast/2 wraps atom/string events as {event, subject} and publishes to the subject's channel
  • handle_info/2,3,4 return {:cont, socket} (pass-through) so unhandled messages don't crash

Broadcasting from context functions

Since broadcast/2 returns its first argument — and passes through {:ok, _} and {:error, _} tuples — it drops right into Ecto pipelines:

defmodule MyApp.Blog do
  alias Amplified.PubSub

  def create_post(attrs) do
    %Post{}
    |> Post.changeset(attrs)
    |> Repo.insert()
    |> PubSub.broadcast(:created)
  end

  def update_post(%Post{} = post, attrs) do
    post
    |> Post.changeset(attrs)
    |> Repo.update()
    |> PubSub.broadcast(:updated)
  end

  def delete_post(%Post{} = post) do
    post
    |> Repo.delete()
    |> PubSub.broadcast(:deleted)
  end
end

On success, PubSub.broadcast({:ok, post}, :created) unwraps the tuple, broadcasts {:created, post} on the "post:<id>" channel, and returns {:ok, post}. On failure, {:error, changeset} passes through without broadcasting.

Subscribing in LiveViews

Subscribe a LiveView process to a subject's channel in mount/3. The subscription is idempotent — calling it twice won't produce duplicate messages:

def mount(%{"id" => id}, _session, socket) do
  post = Blog.get_post!(id)
  PubSub.subscribe(post)
  {:ok, assign(socket, post: post)}
end

Unsubscribe when you no longer want messages:

PubSub.unsubscribe(post)

You can also subscribe to a raw channel string:

PubSub.subscribe("posts:feed")

Handling messages

PubSub.handle_info/2 returns {:cont, socket} or {:halt, socket} — the same convention used by Phoenix.LiveView.attach_hook/4 — so you can wire it directly into the LiveView lifecycle as an on_mount hook. Every LiveView in the live session then gets PubSub handling automatically, with no per-view boilerplate.

Define a hooks module:

defmodule MyAppWeb.Hooks do
  import Phoenix.LiveView

  alias Amplified.PubSub

  defmodule Default do
    def on_mount(:default, _params, _session, socket) do
      {:cont, MyAppWeb.Hooks.attach_defaults(socket)}
    end
  end

  # This is the critical wiring step. attach_hook/4 registers
  # PubSub.handle_info/2 as a lifecycle hook that intercepts every
  # message the LiveView process receives. Without this, PubSub
  # messages will arrive but the protocol dispatch won't fire —
  # your schema handle_info/3 implementations won't be called.
  def attach_defaults(socket) do
    socket
    |> subscribe()
    |> attach_hook(:pubsub, :handle_info, &PubSub.handle_info/2)
  end

  # Subscribe based on whatever is assigned to the socket.
  # Unsubscribe first to prevent duplicates on reconnect.
  defp subscribe(socket) do
    if connected?(socket) do
      user = socket.assigns[:current_user]
      project = socket.assigns[:project]

      if user, do: PubSub.subscribe(user)
      if project, do: PubSub.subscribe(project)
    end

    socket
  end
end

Then attach the hook in the router:

live_session :authenticated, on_mount: MyAppWeb.Hooks.Default do
  live "/posts", PostLive.Index
  live "/posts/:id", PostLive.Show
end

With this in place, the hook subscribes every LiveView to the current user and project channels and dispatches all {action, subject} messages through the protocol. Individual LiveViews can still subscribe to additional channels in their own mount/3 — the hook returns {:cont, socket} for anything it doesn't handle, so the message continues to the view's handle_info/2.

Event handling in schemas

When a {action, subject} message arrives, the Tuple dispatcher looks up the subject's protocol implementation and calls its handle_info/3. This lets you define handlers in the schema's use Amplified.PubSub block.

When to use schema-level handlers

Reserve schema-level handlers for events that would otherwise require the same callback to be duplicated across multiple LiveViews. Handlers specific to a single view belong in that view, not in the schema.

A good example is keeping the current user up to date across all LiveViews — every view needs the same logic, so colocating it with the schema avoids repetition:

defmodule MyApp.Accounts.User do
  use Ecto.Schema
  use Amplified.PubSub do
    # Match the broadcast user's ID against the scope's user ID
    # to ensure we only update when the broadcast is for *this*
    # session's authenticated user.
    def handle_info(
          %User{id: id} = user,
          :updated,
          %{assigns: %{current_scope: %{user: %{id: id}} = scope}} = socket
        ) do
      {:cont, assign(socket, current_scope: %{scope | user: user})}
    end

    def handle_info(
          %User{id: id},
          :deleted,
          %{assigns: %{current_scope: %{user: %{id: id}}}} = socket
        ) do
      {:halt, redirect(socket, to: ~p"/sign-out")}
    end
  end

  schema "users" do
    field :email, :string
    field :name, :string
    timestamps()
  end
end

defmodule MyApp.Blog.Post do
  use Ecto.Schema
  use Amplified.PubSub do
    def handle_info(%Post{id: id} = post, :updated, %{assigns: %{post: %{id: id}}} = socket) do
      {:cont, assign(socket, post: post)}
    end

    def handle_info(%Post{id: id}, :deleted, %{assigns: %{post: %{id: id}}} = socket) do
      {:halt, push_navigate(socket, to: ~p"/posts")}
    end
  end

  schema "posts" do
    field :title, :string
    field :body, :string
    timestamps()
  end
end

With these implementations in place and a global attach_hook dispatching through PubSub.handle_info/2, a :updated broadcast for the current %User{} will automatically update the scope's user on every connected LiveView that belongs to that user — with no per-view code at all. Broadcasts for other users fall through as {:cont, socket} and are ignored.

The convention is to return {:halt, socket} when you've handled the message and you don't want other lifecycle hooks to run, and {:cont, socket} when you do. The defaults always return {:cont, socket}, so unmatched messages fall through safely.

Flash messages

The Tuple implementation recognises {:flash, level, message} tuples and calls Phoenix.LiveView.put_flash/3 automatically:

PubSub.broadcast("room:lobby", {:flash, :info, "Someone joined!"})

Custom channels

Override the channel derivation by passing a block to use:

use Amplified.PubSub do
  def channel(%Post{slug: slug}, _ns), do: "post:#{slug}"
end

Or implement the protocol externally with defimpl:

defimpl Amplified.PubSub.Protocol, for: MyApp.Blog.Post do
  use Amplified.PubSub, impl: true
  def channel(%{slug: slug}, _ns), do: "post:#{slug}"
end

When using impl: true, you get all the default function bodies injected into your defimpl block, so you only need to override the functions you want to customise.

Namespaced channels

All channel/2 functions accept an optional namespace for scoping. This is useful when different LiveViews care about different aspects of the same resource:

PubSub.channel(post)             # => "post:abc-123"
PubSub.channel(post, :comments)  # => "post:abc-123:comments"

Lists and streams

Broadcasting or subscribing to a list operates on each item individually:

PubSub.subscribe(posts)           # subscribes to each post's channel
PubSub.broadcast(posts, :archived) # broadcasts for each post

When broadcasting a list with more than one item, items are grouped by channel and sent as a single [{item, event}, ...] message per channel for efficiency. Streams are materialised to lists before operating.

Protocol implementations

Built-in protocol implementations handle the following types:

  • BitString — treats the string as a literal channel name; broadcasts, subscribes, and unsubscribes via Phoenix.PubSub
  • Atom — converts to a string channel (e.g. :users"users"); broadcast is a no-op that returns the message
  • Tuple — unwraps {:ok, subject} for broadcast/subscribe; passes {:error, _} through unchanged; dispatches {action, subject} messages in handle_info
  • List — maps the operation across each element, grouping multi-item broadcasts by channel
  • Stream — materialises to a list, then delegates to the List implementation
  • Phoenix.LiveView.Socket — derives a channel from the socket's session ID ("socket:<id>")
  • Structs via use Amplified.PubSub — derives channels from the module's last name segment and the struct's :id field

Telemetry

The following telemetry events are emitted:

  • [:amplified, :pubsub, :broadcast] — fired on every broadcast. Measurements are empty (%{}). Metadata contains :topic and :message.

Attach a handler in your application to log broadcasts, collect metrics, or perform any other observation:

:telemetry.attach("my-app-pubsub-log", [:amplified, :pubsub, :broadcast], fn
  _event, _measurements, %{topic: topic, message: message}, _config ->
    Logger.debug("broadcast(\#{inspect(topic)}, \#{inspect(message)})")
end, nil)

Summary

Functions

Injects an Amplified.PubSub.Protocol implementation into the calling module.

Broadcasts a message for the given subject.

Broadcasts a message with additional attributes for the given subject.

Returns the PubSub channel name for the given subject.

Dispatches an incoming PubSub message through the protocol.

Dispatches a message for a specific subject and socket.

Dispatches a message for a specific subject with attributes and socket.

Returns the protocol implementation module for the given data, or nil.

Like impl_for/1, but raises Protocol.UndefinedError if no implementation exists.

Returns the configured PubSub server name.

Subscribes the current process to the subject's PubSub channel.

Unsubscribes the current process from the subject's PubSub channel.

Functions

__using__(opts \\ [])

(macro)

Injects an Amplified.PubSub.Protocol implementation into the calling module.

When called without options, a full defimpl Amplified.PubSub.Protocol block is generated for the calling module's struct with default implementations of all protocol functions. Any function defined in an optional :do block overrides the corresponding default.

Options

  • :impl — when true, injects the default function bodies without wrapping them in a defimpl block. Use this when writing an explicit defimpl and you want the defaults as a starting point.

  • :do block — functions defined here override the corresponding defaults. This is the primary way to customise channel derivation or message handling.

Examples

Basic usage generates a full protocol implementation:

defmodule MyApp.Blog.Post do
  use Ecto.Schema
  use Amplified.PubSub

  schema "posts" do
    field :title, :string
  end
end

With overrides — custom channel and event handling:

defmodule MyApp.Blog.Post do
  use Ecto.Schema
  use Amplified.PubSub do
    def channel(%Post{slug: slug}, _ns), do: "post:#{slug}"

    def handle_info(%Post{} = post, :updated, socket) do
      {:halt, assign(socket, post: post)}
    end
  end

  schema "posts" do
    field :title, :string
    field :slug, :string
  end
end

Inside an explicit defimpl:

defimpl Amplified.PubSub.Protocol, for: MyApp.Blog.Post do
  use Amplified.PubSub, impl: true

  def channel(%{slug: slug}, _ns), do: "post:#{slug}"
end

broadcast(subject, message)

Broadcasts a message for the given subject.

The behaviour depends on the subject's type, dispatched through Amplified.PubSub.Protocol:

  • Struct — derives the channel from the struct, wraps atom/string events as {event, subject}, broadcasts via Phoenix.PubSub, and returns the struct for pipeline chaining.

  • {:ok, struct} — unwraps the tuple, broadcasts for the struct, and returns {:ok, struct}.

  • {:error, reason} — passes through without broadcasting.

  • String — treats it as a literal channel name and broadcasts the message directly. Returns the message.

  • List — broadcasts for each item (grouped by channel when there are multiple items). Returns the list.

  • Atom — no-op; returns the message unchanged.

Examples

Broadcast an event for a struct:

PubSub.broadcast(post, :created)
# => broadcasts {:created, post} on "post:<id>", returns post

Pipeline with Ecto Repo operations — the {:ok, _} / {:error, _} tuple is handled transparently:

%Post{}
|> Post.changeset(attrs)
|> Repo.insert()
|> PubSub.broadcast(:created)
# => {:ok, post} on success, {:error, changeset} on failure

Broadcast to a raw channel string:

PubSub.broadcast("notifications:global", {:alert, "System update"})
# => broadcasts {:alert, "System update"}, returns the message

Broadcast for a list of subjects:

PubSub.broadcast(posts, :archived)
# => broadcasts :archived for each post, returns posts

broadcast(subject, message, attrs)

Broadcasts a message with additional attributes for the given subject.

Like broadcast/2, but includes an attributes map in the payload. For atom/string events on structs, the broadcast payload becomes {event, subject, attrs}.

Examples

PubSub.broadcast(post, :updated, %{changed_fields: [:title]})
# => broadcasts {:updated, post, %{changed_fields: [:title]}}

In a pipeline:

post
|> Post.changeset(attrs)
|> Repo.update()
|> PubSub.broadcast(:updated, %{changed_fields: Map.keys(attrs)})

channel(subject, namespace \\ nil)

Returns the PubSub channel name for the given subject.

The channel format depends on the subject type:

  • Struct (via use Amplified.PubSub) — "<snake_cased_module>:<id>", e.g. "blog_post:abc-123" for %MyApp.Blog.BlogPost{id: "abc-123"}.

  • String — returned as-is.

  • Atom — converted to string, e.g. :users"users".

  • List — returns a list of channels, one per element.

  • Stream — materialised to a list, then returns channels.

  • Socket"socket:<session_id>".

An optional namespace is appended with a : separator.

Examples

PubSub.channel(%Post{id: "abc-123"})
#=> "post:abc-123"

PubSub.channel(%Post{id: "abc-123"}, :comments)
#=> "post:abc-123:comments"

PubSub.channel("my:channel")
#=> "my:channel"

PubSub.channel("my:channel", "drafts")
#=> "my:channel:drafts"

PubSub.channel(:users, :admin)
#=> "users:admin"

PubSub.channel([%Post{id: "1"}, %Post{id: "2"}])
#=> ["post:1", "post:2"]

handle_info(message, socket)

Dispatches an incoming PubSub message through the protocol.

This is the primary entry point, typically called from a LiveView's handle_info/2 callback. The Tuple protocol implementation unpacks {action, subject} messages and delegates to the subject's handle_info/3, which lets you define event handlers in your schema's PubSub block.

Returns {:cont, socket} for unhandled messages or {:halt, socket} for handled ones.

Flash messages are handled automatically — when {:flash, level, msg} is received, Phoenix.LiveView.put_flash/3 is called and {:halt, socket} is returned.

Examples

# In your LiveView
def handle_info(message, socket) do
  case PubSub.handle_info(message, socket) do
    {:cont, socket} -> {:noreply, socket}
    {:halt, socket} -> {:noreply, socket}
  end
end

handle_info(subject, message, socket)

Dispatches a message for a specific subject and socket.

Called internally by the Tuple implementation's handle_info/2 after unpacking {action, subject}. Override this in your schema's use Amplified.PubSub block to handle specific events:

use Amplified.PubSub do
  def handle_info(%Post{} = post, :updated, socket) do
    {:halt, assign(socket, post: post)}
  end
end

handle_info(subject, message, changeset, socket)

Dispatches a message for a specific subject with attributes and socket.

Like handle_info/3 but receives the additional attributes map that was passed to broadcast/3:

use Amplified.PubSub do
  def handle_info(%Post{} = post, :updated, %{changed_fields: fields}, socket) do
    {:halt, assign(socket, post: post, changed: fields)}
  end
end

impl_for(data)

Returns the protocol implementation module for the given data, or nil.

Useful for checking whether a value has a PubSub implementation before attempting to dispatch.

Examples

Amplified.PubSub.impl_for("a string")
#=> Amplified.PubSub.Protocol.BitString

Amplified.PubSub.impl_for(:an_atom)
#=> Amplified.PubSub.Protocol.Atom

Amplified.PubSub.impl_for(42)
#=> nil

impl_for!(data)

Like impl_for/1, but raises Protocol.UndefinedError if no implementation exists.

Examples

Amplified.PubSub.impl_for!("a string")
#=> Amplified.PubSub.Protocol.BitString

pubsub_server()

Returns the configured PubSub server name.

The server name is looked up from application config at runtime via Application.fetch_env!/2. Raises ArgumentError if :pubsub_server is not configured for :amplified_pubsub.

Examples

Amplified.PubSub.pubsub_server()
#=> :my_app

subscribe(channel)

Subscribes the current process to the subject's PubSub channel.

For structs, the subject is returned for pipeline chaining. The implementation first unsubscribes to prevent duplicate subscriptions, making the call idempotent.

Examples

Subscribe in a LiveView's mount/3:

def mount(%{"id" => id}, _session, socket) do
  post = Blog.get_post!(id)
  PubSub.subscribe(post)
  {:ok, assign(socket, post: post)}
end

Subscribe to a raw channel:

PubSub.subscribe("posts:feed")

Subscribe to all items in a list:

PubSub.subscribe(posts)

unsubscribe(channel)

Unsubscribes the current process from the subject's PubSub channel.

Examples

PubSub.unsubscribe(post)
PubSub.unsubscribe("posts:feed")