PgFlow.LiveClient (PgFlow v0.1.0)

Copy Markdown View Source

LiveView-native client for tracking flow and job runs in real-time.

Provides functions to start flows, subscribe to existing runs, and handle PubSub updates — all operating on %PgFlow.Schema.Run{} structs stored in socket assigns.

Usage

defmodule MyAppWeb.FlowLive do
  use MyAppWeb, :live_view

  alias PgFlow.LiveClient

  def mount(_params, _session, socket) do
    {:ok, LiveClient.init(socket, pubsub: MyApp.PubSub)}
  end

  def handle_event("start", %{"url" => url}, socket) do
    case LiveClient.start_flow(socket, :article_flow, %{"url" => url}) do
      {:ok, socket} -> {:noreply, socket}
      {:error, _reason, socket} -> {:noreply, socket}
    end
  end

  def handle_info({:pgflow, _, _} = msg, socket) do
    {:noreply, LiveClient.handle_info(msg, socket)}
  end
end

Multiple Runs

Track multiple runs simultaneously using the :as option:

socket = LiveClient.start_flow(socket, :flow_a, input, as: :run_a)
socket = LiveClient.subscribe(socket, some_run_id, as: :run_b)

Then access @run_a and @run_b independently in templates.

Jobs

Jobs work identically — use enqueue/4 for naming parity with PgFlow.enqueue/2:

LiveClient.enqueue(socket, MyApp.Jobs.SendEmail, %{"to" => "user@example.com"})

Summary

Functions

Starts a job and subscribes to real-time updates.

Handles a PgFlow PubSub message, updating the tracked run in assigns.

Initializes the socket for flow tracking.

Starts a flow and subscribes to real-time updates.

Subscribes to an existing run's real-time updates.

Unsubscribes from a run's updates and resets the assign to nil.

Functions

enqueue(socket, job_module, input, opts \\ [])

Starts a job and subscribes to real-time updates.

Convenience wrapper providing naming parity with PgFlow.enqueue/2. Behaves identically to start_flow/4.

Options

  • :as — assign key (default: :flow_run)

handle_info(arg, socket)

@spec handle_info({:pgflow, String.t(), tuple()}, Phoenix.LiveView.Socket.t()) ::
  Phoenix.LiveView.Socket.t()

Handles a PgFlow PubSub message, updating the tracked run in assigns.

Call this from your LiveView's handle_info/2:

def handle_info({:pgflow, _, _} = msg, socket) do
  {:noreply, LiveClient.handle_info(msg, socket)}
end

Returns the socket unchanged if the message is for an untracked run.

init(socket, opts)

Initializes the socket for flow tracking.

Sets the default assign to nil and stores PubSub config in socket private. Safe to call during both connected and disconnected mount phases.

Options

  • :pubsub — (required) the Phoenix.PubSub module
  • :as — assign key for the run (default: :flow_run)
  • :repo — Ecto repo for snapshot loading (default: uses configured PgFlow repo)

start_flow(socket, flow_module_or_slug, input, opts \\ [])

@spec start_flow(
  Phoenix.LiveView.Socket.t(),
  module() | atom() | String.t(),
  map(),
  keyword()
) ::
  {:ok, Phoenix.LiveView.Socket.t()}
  | {:error, term(), Phoenix.LiveView.Socket.t()}

Starts a flow and subscribes to real-time updates.

Creates the flow run via PgFlow.Client.start_flow/2, subscribes to the run's PubSub topic, loads the current state from the database, and assigns the %Run{} struct.

Returns {:ok, socket} on success or {:error, reason, socket} on failure.

Options

  • :as — assign key (default: :flow_run)

subscribe(socket, run_id, opts \\ [])

Subscribes to an existing run's real-time updates.

Loads the current state from the database and subscribes to PubSub. Useful for observing runs started elsewhere (e.g., run detail pages).

Options

  • :as — assign key (default: :flow_run)

unsubscribe(socket, opts \\ [])

Unsubscribes from a run's updates and resets the assign to nil.

Options

  • :as — assign key to unsubscribe (default: :flow_run)