PgFlow.Client (PgFlow v0.1.0)

Copy Markdown View Source

Client API for interacting with PgFlow runs.

Provides functions to start flows, query run status, and wait for completion.

Usage

# Start a flow asynchronously
{:ok, run_id} = PgFlow.Client.start_flow(:my_flow, %{"order_id" => 123})

# Start a flow and wait for completion
{:ok, run} = PgFlow.Client.start_flow_sync(:my_flow, %{"order_id" => 123}, timeout: 30_000)

# Get run details
{:ok, run} = PgFlow.Client.get_run(run_id)

# Get run with step states preloaded
{:ok, run} = PgFlow.Client.get_run_with_states(run_id)

Summary

Functions

Deletes a flow and all associated data (runs, tasks, queue).

Enqueues a background job immediately.

Enqueues a background job with options.

Enqueues a background job that becomes visible at scheduled_at.

Enqueues a background job that becomes visible after delay_seconds.

Checks if a flow exists in the database.

Gets a run by ID.

Gets a run with all step states preloaded.

Starts a flow run with the given input.

Starts a flow and waits for completion.

Recompiles a flow definition at runtime.

Functions

delete_flow(slug)

@spec delete_flow(String.t()) :: :ok | {:error, term()}

Deletes a flow and all associated data (runs, tasks, queue).

This permanently removes the flow definition and all historical run data. Intended for cleaning up retired flow versions.

Examples

PgFlow.Client.delete_flow("acct_123_hubspot_sync_v1")
# => :ok

enqueue(job_module, input)

@spec enqueue(module(), map()) :: {:ok, String.t()} | {:error, term()}

Enqueues a background job immediately.

Jobs are single-step flows, so this delegates to start_flow/2.

enqueue(job_module, input, opts)

@spec enqueue(module(), map(), keyword()) :: {:ok, String.t()} | {:error, term()}

Enqueues a background job with options.

Supported options:

  • :delay_seconds - non-negative integer seconds before the job is visible
  • :scheduled_at - DateTime when the job should become visible

enqueue_at(job_module, input, scheduled_at)

@spec enqueue_at(module(), map(), DateTime.t()) ::
  {:ok, String.t()} | {:error, term()}

Enqueues a background job that becomes visible at scheduled_at.

Timestamps in the past are enqueued for immediate execution.

enqueue_in(job_module, input, delay_seconds)

@spec enqueue_in(module(), map(), non_neg_integer()) ::
  {:ok, String.t()} | {:error, term()}

Enqueues a background job that becomes visible after delay_seconds.

flow_exists?(slug)

@spec flow_exists?(String.t()) :: {:ok, boolean()} | {:error, term()}

Checks if a flow exists in the database.

Examples

PgFlow.Client.flow_exists?("acct_123_hubspot_sync_v1")
# => {:ok, true}

get_run(run_id)

@spec get_run(String.t()) ::
  {:ok, PgFlow.Schema.Run.t()} | {:error, :not_found} | {:error, term()}

Gets a run by ID.

Returns {:ok, run} if found, or {:error, :not_found} if the run does not exist.

Examples

{:ok, run} = PgFlow.Client.get_run(run_id)
{:error, :not_found} = PgFlow.Client.get_run("00000000-0000-0000-0000-000000000000")

get_run_with_states(run_id)

@spec get_run_with_states(String.t()) ::
  {:ok, PgFlow.Schema.Run.t()} | {:error, :not_found} | {:error, term()}

Gets a run with all step states preloaded.

Returns {:ok, run} with step_states association loaded, or {:error, :not_found} if the run does not exist.

Examples

{:ok, run} = PgFlow.Client.get_run_with_states(run_id)
Enum.each(run.step_states, fn state ->
  IO.puts("#{state.step_slug}: #{state.status}")
end)

start_flow(flow_module_or_slug, input)

@spec start_flow(module() | atom() | String.t(), map()) ::
  {:ok, String.t()} | {:error, term()}

Starts a flow run with the given input.

Calls the pgflow.start_flow SQL function which handles all initialization:

  • Creates run and step_states records
  • Handles map step initial_tasks
  • Broadcasts run:started event
  • Enqueues ready steps to pgmq
  • Handles empty cascades

The flow can be specified by module name or slug atom/string. Returns {:ok, run_id} on success or {:error, reason} on failure.

Examples

{:ok, run_id} = PgFlow.Client.start_flow(:process_order, %{"order_id" => 123})
{:ok, run_id} = PgFlow.Client.start_flow("process_order", %{"order_id" => 123})
{:ok, run_id} = PgFlow.Client.start_flow(MyApp.Flows.ProcessOrder, %{"order_id" => 123})

start_flow_sync(flow_module_or_slug, input, opts \\ [])

@spec start_flow_sync(module() | atom() | String.t(), map(), keyword()) ::
  {:ok, PgFlow.Schema.Run.t()}
  | {:error, PgFlow.Schema.Run.t()}
  | {:error, :timeout}
  | {:error, term()}

Starts a flow and waits for completion.

Blocks until the flow completes or the timeout is reached. Returns the completed run on success or error.

Options

  • :timeout - Maximum time to wait in milliseconds (default: 60_000)
  • :poll_interval - How often to check status in milliseconds (default: 500)

Examples

{:ok, run} = PgFlow.Client.start_flow_sync(:my_flow, %{"order_id" => 123})
{:error, run} = PgFlow.Client.start_flow_sync(:failing_flow, %{})
{:error, :timeout} = PgFlow.Client.start_flow_sync(:slow_flow, %{}, timeout: 1000)

upsert_flow(slug, opts)

@spec upsert_flow(
  String.t(),
  keyword()
) :: {:ok, map()} | {:error, term()}

Recompiles a flow definition at runtime.

This is the primary API for runtime flow management. Unlike the compile-time DSL (use PgFlow.Flow), this function creates flow definitions from plain data - ideal for per-tenant automations where flows are defined dynamically.

If the flow already exists, this operation is destructive: the existing definition and historical run/task data for the slug are deleted before recompiling.

Options

  • :max_attempts - Maximum retry attempts (default: 3)
  • :base_delay - Base delay between retries in seconds (default: 1)
  • :timeout - Step timeout in seconds (default: 60)
  • :steps - Required. List of step definition maps, each with:
    • :slug - Step identifier (required)
    • :deps - List of dependency step slugs (default: [])
    • :step_type - Step type, e.g. "single" (default: "single")
    • :max_attempts - Step-level retry override (optional)
    • :base_delay - Step-level delay override (optional)
    • :timeout - Step-level timeout override (optional)
    • :start_delay - Delay before step starts in seconds (optional)

Examples

PgFlow.Client.upsert_flow("acct_123_hubspot_sync_v1",
  max_attempts: 3,
  steps: [
    %{slug: "reshape", deps: []},
    %{slug: "create_contact", deps: ["reshape"]}
  ]
)
# => {:ok, %{"status" => "compiled", "differences" => []}}