Client API for interacting with PgFlow runs.
Provides functions to start flows, query run status, and wait for completion.
Usage
# Start a flow asynchronously
{:ok, run_id} = PgFlow.Client.start_flow(:my_flow, %{"order_id" => 123})
# Start a flow and wait for completion
{:ok, run} = PgFlow.Client.start_flow_sync(:my_flow, %{"order_id" => 123}, timeout: 30_000)
# Get run details
{:ok, run} = PgFlow.Client.get_run(run_id)
# Get run with step states preloaded
{:ok, run} = PgFlow.Client.get_run_with_states(run_id)
Summary
Functions
Deletes a flow and all associated data (runs, tasks, queue).
Enqueues a background job immediately.
Enqueues a background job with options.
Enqueues a background job that becomes visible at scheduled_at.
Enqueues a background job that becomes visible after delay_seconds.
Checks if a flow exists in the database.
Gets a run by ID.
Gets a run with all step states preloaded.
Starts a flow run with the given input.
Starts a flow and waits for completion.
Recompiles a flow definition at runtime.
Functions
Deletes a flow and all associated data (runs, tasks, queue).
This permanently removes the flow definition and all historical run data. Intended for cleaning up retired flow versions.
Examples
PgFlow.Client.delete_flow("acct_123_hubspot_sync_v1")
# => :ok
Enqueues a background job immediately.
Jobs are single-step flows, so this delegates to start_flow/2.
Enqueues a background job with options.
Supported options:
:delay_seconds- non-negative integer seconds before the job is visible:scheduled_at-DateTimewhen the job should become visible
@spec enqueue_at(module(), map(), DateTime.t()) :: {:ok, String.t()} | {:error, term()}
Enqueues a background job that becomes visible at scheduled_at.
Timestamps in the past are enqueued for immediate execution.
@spec enqueue_in(module(), map(), non_neg_integer()) :: {:ok, String.t()} | {:error, term()}
Enqueues a background job that becomes visible after delay_seconds.
Checks if a flow exists in the database.
Examples
PgFlow.Client.flow_exists?("acct_123_hubspot_sync_v1")
# => {:ok, true}
@spec get_run(String.t()) :: {:ok, PgFlow.Schema.Run.t()} | {:error, :not_found} | {:error, term()}
Gets a run by ID.
Returns {:ok, run} if found, or {:error, :not_found} if the run does not exist.
Examples
{:ok, run} = PgFlow.Client.get_run(run_id)
{:error, :not_found} = PgFlow.Client.get_run("00000000-0000-0000-0000-000000000000")
@spec get_run_with_states(String.t()) :: {:ok, PgFlow.Schema.Run.t()} | {:error, :not_found} | {:error, term()}
Gets a run with all step states preloaded.
Returns {:ok, run} with step_states association loaded, or {:error, :not_found}
if the run does not exist.
Examples
{:ok, run} = PgFlow.Client.get_run_with_states(run_id)
Enum.each(run.step_states, fn state ->
IO.puts("#{state.step_slug}: #{state.status}")
end)
Starts a flow run with the given input.
Calls the pgflow.start_flow SQL function which handles all initialization:
- Creates run and step_states records
- Handles map step initial_tasks
- Broadcasts run:started event
- Enqueues ready steps to pgmq
- Handles empty cascades
The flow can be specified by module name or slug atom/string.
Returns {:ok, run_id} on success or {:error, reason} on failure.
Examples
{:ok, run_id} = PgFlow.Client.start_flow(:process_order, %{"order_id" => 123})
{:ok, run_id} = PgFlow.Client.start_flow("process_order", %{"order_id" => 123})
{:ok, run_id} = PgFlow.Client.start_flow(MyApp.Flows.ProcessOrder, %{"order_id" => 123})
@spec start_flow_sync(module() | atom() | String.t(), map(), keyword()) :: {:ok, PgFlow.Schema.Run.t()} | {:error, PgFlow.Schema.Run.t()} | {:error, :timeout} | {:error, term()}
Starts a flow and waits for completion.
Blocks until the flow completes or the timeout is reached. Returns the completed run on success or error.
Options
:timeout- Maximum time to wait in milliseconds (default: 60_000):poll_interval- How often to check status in milliseconds (default: 500)
Examples
{:ok, run} = PgFlow.Client.start_flow_sync(:my_flow, %{"order_id" => 123})
{:error, run} = PgFlow.Client.start_flow_sync(:failing_flow, %{})
{:error, :timeout} = PgFlow.Client.start_flow_sync(:slow_flow, %{}, timeout: 1000)
Recompiles a flow definition at runtime.
This is the primary API for runtime flow management. Unlike the compile-time
DSL (use PgFlow.Flow), this function creates flow definitions from plain
data - ideal for per-tenant automations where flows are defined dynamically.
If the flow already exists, this operation is destructive: the existing definition and historical run/task data for the slug are deleted before recompiling.
Options
:max_attempts- Maximum retry attempts (default: 3):base_delay- Base delay between retries in seconds (default: 1):timeout- Step timeout in seconds (default: 60):steps- Required. List of step definition maps, each with::slug- Step identifier (required):deps- List of dependency step slugs (default: []):step_type- Step type, e.g."single"(default:"single"):max_attempts- Step-level retry override (optional):base_delay- Step-level delay override (optional):timeout- Step-level timeout override (optional):start_delay- Delay before step starts in seconds (optional)
Examples
PgFlow.Client.upsert_flow("acct_123_hubspot_sync_v1",
max_attempts: 3,
steps: [
%{slug: "reshape", deps: []},
%{slug: "create_contact", deps: ["reshape"]}
]
)
# => {:ok, %{"status" => "compiled", "differences" => []}}