PgFlow (PgFlow v0.1.0)

Copy Markdown View Source

PgFlow is an Elixir implementation of the pgflow workflow engine.

It provides a macro-based DSL for defining workflow DAGs that execute on PostgreSQL using pgmq for task coordination.

Compatibility

This Elixir implementation is compatible with pgflow core version 0.5.0. It uses the same database schema and SQL functions as the TypeScript/Deno implementation, allowing both to run side-by-side against the same database.

Quick Start

defmodule MyApp.Flows.ProcessOrder do
  use PgFlow.Flow

  @flow slug: :process_order, max_attempts: 3

  step :validate do
    fn input, _ctx ->
      # Validate order data
      %{valid: true, order_id: input["order_id"]}
    end
  end

  step :charge, depends_on: [:validate] do
    fn deps, _ctx ->
      # Charge the customer
      %{charged: true, amount: 100}
    end
  end

  step :fulfill, depends_on: [:charge] do
    fn deps, _ctx ->
      # Fulfill the order
      %{fulfilled: true}
    end
  end
end

Starting a Flow

{:ok, run_id} = PgFlow.start_flow(MyApp.Flows.ProcessOrder, %{"order_id" => 123})

Configuration

Add PgFlow to your supervision tree:

children = [
  {PgFlow, repo: MyApp.Repo, flows: [MyApp.Flows.ProcessOrder]}
]

See PgFlow.Config for configuration options.

Summary

Functions

Returns a child specification for starting PgFlow under a supervisor.

Returns the compatible pgflow core version.

Deletes a flow and all associated data (runs, tasks, queue).

Enqueues a background job with the given input.

Enqueues a background job with options.

Enqueues a background job that becomes available at scheduled_at.

Enqueues a background job that becomes available after delay_seconds.

Checks if a flow exists in the database.

Gets the definition for a flow by module or slug.

Gets a run by ID.

Gets a run with all step states preloaded.

Returns health check information.

Lists all registered flows.

Starts a flow run with the given input.

Starts a flow and waits for completion.

Starts the PgFlow supervision tree.

Starts a worker for the given flow.

Stops a worker for the given flow.

Recompiles a flow definition at runtime.

Functions

child_spec(opts)

@spec child_spec(keyword()) :: Supervisor.child_spec()

Returns a child specification for starting PgFlow under a supervisor.

Options

See PgFlow.Config for available options.

core_version()

@spec core_version() :: String.t()

Returns the compatible pgflow core version.

This version indicates which pgflow database schema and SQL functions this Elixir implementation is compatible with.

Examples

PgFlow.core_version()
#=> "0.5.0"

delete_flow(slug)

@spec delete_flow(String.t()) :: :ok | {:error, term()}

Deletes a flow and all associated data (runs, tasks, queue).

Examples

PgFlow.delete_flow("acct_123_hubspot_sync_v1")

enqueue(job_module, input)

@spec enqueue(module(), map()) :: {:ok, String.t()} | {:error, term()}

Enqueues a background job with the given input.

This is the primary API for dispatching jobs. Under the hood, jobs are single-step flows, so this delegates to start_flow/2.

Examples

{:ok, run_id} = PgFlow.enqueue(MyApp.Jobs.SendEmail, %{"to" => "user@example.com"})

enqueue(job_module, input, opts)

@spec enqueue(module(), map(), keyword()) :: {:ok, String.t()} | {:error, term()}

Enqueues a background job with options.

Supported options:

  • :delay_seconds - non-negative integer seconds before the job is available
  • :scheduled_at - DateTime when the job should become available

Examples

{:ok, run_id} = PgFlow.enqueue(MyApp.Jobs.SendEmail, %{"to" => "user@example.com"}, delay_seconds: 60)
{:ok, run_id} = PgFlow.enqueue(MyApp.Jobs.SendEmail, %{"to" => "user@example.com"}, scheduled_at: ~U[2026-05-08 12:00:00Z])

enqueue_at(job_module, input, scheduled_at)

@spec enqueue_at(module(), map(), DateTime.t()) ::
  {:ok, String.t()} | {:error, term()}

Enqueues a background job that becomes available at scheduled_at.

Timestamps in the past enqueue the job for immediate execution. Any DateTime time zone is accepted; PgFlow compares the scheduled timestamp as an instant.

Examples

{:ok, run_id} = PgFlow.enqueue_at(MyApp.Jobs.SendEmail, %{"to" => "user@example.com"}, ~U[2026-05-08 12:00:00Z])

enqueue_in(job_module, input, delay_seconds)

@spec enqueue_in(module(), map(), non_neg_integer()) ::
  {:ok, String.t()} | {:error, term()}

Enqueues a background job that becomes available after delay_seconds.

PgFlow persists the run immediately, then delays the initial pgmq task visibility so workers cannot execute it until the delay elapses. Values of 0 enqueue the job for immediate execution.

Examples

{:ok, run_id} = PgFlow.enqueue_in(MyApp.Jobs.SendEmail, %{"to" => "user@example.com"}, 60)

flow_exists?(slug)

@spec flow_exists?(String.t()) :: {:ok, boolean()} | {:error, term()}

Checks if a flow exists in the database.

Examples

PgFlow.flow_exists?("my_flow")
# => {:ok, true}

get_flow(flow_module_or_slug)

@spec get_flow(module() | atom()) :: {:ok, map()} | {:error, :not_found}

Gets the definition for a flow by module or slug.

Examples

{:ok, flow_def} = PgFlow.get_flow(MyApp.Flows.ProcessOrder)
{:ok, flow_def} = PgFlow.get_flow(:process_order)
{:error, :not_found} = PgFlow.get_flow(:unknown)

get_run(run_id)

@spec get_run(String.t()) :: {:ok, PgFlow.Schema.Run.t()} | {:error, :not_found}

Gets a run by ID.

Examples

{:ok, run} = PgFlow.get_run("550e8400-e29b-41d4-a716-446655440000")
{:error, :not_found} = PgFlow.get_run("nonexistent-id")

get_run_with_states(run_id)

@spec get_run_with_states(String.t()) ::
  {:ok, PgFlow.Schema.Run.t()} | {:error, :not_found}

Gets a run with all step states preloaded.

Examples

{:ok, run} = PgFlow.get_run_with_states("550e8400-e29b-41d4-a716-446655440000")
run.step_states  # => [%StepState{}, ...]

health_check()

@spec health_check() :: %{status: :ok, workers: [map()], flows: [map()]}

Returns health check information.

Examples

PgFlow.health_check()
#=> %{status: :ok, workers: [...], flows: [...]}

list_flows()

@spec list_flows() :: [map()]

Lists all registered flows.

Examples

flows = PgFlow.list_flows()
#=> [%{module: MyApp.Flows.ProcessOrder, slug: :process_order, ...}, ...]

start_flow(flow_module_or_slug, input)

@spec start_flow(module() | atom() | String.t(), map()) ::
  {:ok, String.t()} | {:error, term()}

Starts a flow run with the given input.

The flow can be specified by module name, slug atom, or slug string.

Examples

{:ok, run_id} = PgFlow.start_flow(MyApp.Flows.ProcessOrder, %{"order_id" => 123})
{:ok, run_id} = PgFlow.start_flow(:process_order, %{"order_id" => 123})

start_flow_sync(flow_module_or_slug, input, opts \\ [])

@spec start_flow_sync(module() | atom() | String.t(), map(), keyword()) ::
  {:ok, PgFlow.Schema.Run.t()}
  | {:error, PgFlow.Schema.Run.t()}
  | {:error, :timeout}
  | {:error, term()}

Starts a flow and waits for completion.

Blocks until the flow completes or the timeout is reached.

Options

  • :timeout - Maximum time to wait in milliseconds (default: 60_000)
  • :poll_interval - How often to check status in milliseconds (default: 500)

Examples

{:ok, run} = PgFlow.start_flow_sync(MyApp.Flows.ProcessOrder, %{"order_id" => 123})
{:error, run} = PgFlow.start_flow_sync(MyApp.Flows.FailingFlow, %{})

start_link(opts)

@spec start_link(keyword()) :: Supervisor.on_start()

Starts the PgFlow supervision tree.

Options

See PgFlow.Config for available options.

Examples

PgFlow.start_link(repo: MyApp.Repo, flows: [MyApp.Flows.ProcessOrder])

start_worker(flow_module, opts \\ [])

@spec start_worker(
  module(),
  keyword()
) :: {:ok, pid()} | {:error, term()}

Starts a worker for the given flow.

Options

  • :poll_interval - How often to poll for messages (default: 1000ms)
  • :visibility_timeout - How long to hold messages (default: 30s)

Examples

{:ok, pid} = PgFlow.start_worker(MyApp.Flows.ProcessOrder)

stop_worker(flow_module)

@spec stop_worker(module()) :: :ok | {:error, :not_found}

Stops a worker for the given flow.

Examples

:ok = PgFlow.stop_worker(MyApp.Flows.ProcessOrder)

upsert_flow(slug, opts)

@spec upsert_flow(
  String.t(),
  keyword()
) :: {:ok, map()} | {:error, term()}

Recompiles a flow definition at runtime.

Unlike the compile-time DSL (use PgFlow.Flow), this creates flow definitions from plain data - for per-tenant automations and dynamic workflows.

If the flow already exists, this operation is destructive: existing definition and historical run/task data for the slug are deleted first.

Examples

 PgFlow.upsert_flow("acct_123_hubspot_sync_v1",
   max_attempts: 3,
   steps: [
     %{slug: "reshape", deps: []},
     %{slug: "create_contact", deps: ["reshape"]}
   ]
 )