DustEcto.Phoenix (DustEcto v0.1.2)

Copy Markdown View Source

Phoenix.PubSub bridge for DustEcto.Repo subscriptions. Turns the four-step "supervise Dust, configure facade, subscribe in mount, don't block in the callback" recipe into a one-liner that's safe to call from a LiveView.

Why

DustEcto.Repo.subscribe/2 invokes its callback inside the SDK's per-store sync engine process. If the callback blocks — and any realistic LiveView callback might — the engine blocks, freezing every subscriber on that store. The standard workaround is to capture self() and send/2 from the callback, plus unsubscribe-on-terminate. That's a lot of boilerplate.

Phoenix.PubSub already solves "fan out a message to many subscribers without blocking the broadcaster." This module wires the two together: one shared broadcaster per topic translates Dust events into PubSub broadcasts, and LiveViews subscribe to the PubSub topic as they normally would.

Usage

defmodule MyAppWeb.LinksLive do
  use MyAppWeb, :live_view
  alias MyApp.Reading.Link

  def mount(_, _, socket) do
    if connected?(socket) do
      :ok = DustEcto.Phoenix.subscribe_to_pubsub(Link, MyApp.PubSub, "links")
    end

    {:ok, assign(socket, links: load_links())}
  end

  def handle_info({:dust_event, {:upserted, %Link{} = link}}, socket),
    do: {:noreply, update(socket, :links, &upsert_by_slug(&1, link))}

  def handle_info({:dust_event, {:deleted, slug}}, socket),
    do: {:noreply, update(socket, :links, &delete_by_slug(&1, slug))}
end

No terminate/2 cleanup needed — Phoenix.PubSub monitors subscribers and unsubscribes them automatically when they die. The broadcaster process stays alive for the application lifetime and is shared across every LiveView subscribed to the same topic.

Requirements

Needs the phoenix_pubsub package and an active SDK transport (i.e. Dust.Supervisor running in your supervision tree, or config :dustlayer_ecto, :dust_facade, MyApp.Dust). From HTTP mode the underlying Repo.subscribe/2 returns :not_supported, which surfaces here as a %DustEcto.Error{kind: :not_supported}.

Summary

Types

Message shape delivered to subscribers: {:dust_event, event} where event matches DustEcto.Repo.subscribe/2's callback contract.

Functions

Stops the broadcaster for {schema, pubsub, topic}. Existing PubSub subscribers stop receiving updates immediately. Idempotent — no-op if the broadcaster isn't running.

Ensures a broadcaster is running for {schema, pubsub, topic} and subscribes the calling process to pubsub on topic.

Unsubscribes the calling process from pubsub on topic. Does not stop the broadcaster — siblings on the same topic keep receiving events. To stop the broadcaster entirely (rare; saves one process), use stop_broadcaster/3.

Types

message()

@type message() :: {:dust_event, {:upserted, struct()} | {:deleted, String.t()}}

Message shape delivered to subscribers: {:dust_event, event} where event matches DustEcto.Repo.subscribe/2's callback contract.

Functions

stop_broadcaster(schema, pubsub, topic)

@spec stop_broadcaster(module(), atom(), String.t()) :: :ok

Stops the broadcaster for {schema, pubsub, topic}. Existing PubSub subscribers stop receiving updates immediately. Idempotent — no-op if the broadcaster isn't running.

subscribe_to_pubsub(schema, pubsub, topic)

@spec subscribe_to_pubsub(module(), atom(), String.t()) ::
  :ok | {:error, DustEcto.Error.t()}

Ensures a broadcaster is running for {schema, pubsub, topic} and subscribes the calling process to pubsub on topic.

Idempotent — calling this from many LiveViews with the same triple shares a single broadcaster.

Returns :ok on success, {:error, %DustEcto.Error{}} if phoenix_pubsub isn't loaded or the underlying Repo.subscribe/2 failed (most commonly: HTTP transport).

unsubscribe_from_pubsub(pubsub, topic)

@spec unsubscribe_from_pubsub(atom(), String.t()) :: :ok

Unsubscribes the calling process from pubsub on topic. Does not stop the broadcaster — siblings on the same topic keep receiving events. To stop the broadcaster entirely (rare; saves one process), use stop_broadcaster/3.