PgFlow is an Elixir implementation of the pgflow workflow engine.
It provides a macro-based DSL for defining workflow DAGs that execute on PostgreSQL using pgmq for task coordination.
Compatibility
This Elixir implementation is compatible with pgflow core version 0.5.0. It uses the same database schema and SQL functions as the TypeScript/Deno implementation, allowing both to run side-by-side against the same database.
Quick Start
defmodule MyApp.Flows.ProcessOrder do
use PgFlow.Flow
@flow slug: :process_order, max_attempts: 3
step :validate do
fn input, _ctx ->
# Validate order data
%{valid: true, order_id: input["order_id"]}
end
end
step :charge, depends_on: [:validate] do
fn deps, _ctx ->
# Charge the customer
%{charged: true, amount: 100}
end
end
step :fulfill, depends_on: [:charge] do
fn deps, _ctx ->
# Fulfill the order
%{fulfilled: true}
end
end
endStarting a Flow
{:ok, run_id} = PgFlow.start_flow(MyApp.Flows.ProcessOrder, %{"order_id" => 123})Configuration
Add PgFlow to your supervision tree:
children = [
{PgFlow, repo: MyApp.Repo, flows: [MyApp.Flows.ProcessOrder]}
]See PgFlow.Config for configuration options.
Summary
Functions
Returns a child specification for starting PgFlow under a supervisor.
Returns the compatible pgflow core version.
Deletes a flow and all associated data (runs, tasks, queue).
Enqueues a background job with the given input.
Enqueues a background job with options.
Enqueues a background job that becomes available at scheduled_at.
Enqueues a background job that becomes available after delay_seconds.
Checks if a flow exists in the database.
Gets the definition for a flow by module or slug.
Gets a run by ID.
Gets a run with all step states preloaded.
Returns health check information.
Lists all registered flows.
Starts a flow run with the given input.
Starts a flow and waits for completion.
Starts the PgFlow supervision tree.
Starts a worker for the given flow.
Stops a worker for the given flow.
Recompiles a flow definition at runtime.
Functions
@spec child_spec(keyword()) :: Supervisor.child_spec()
Returns a child specification for starting PgFlow under a supervisor.
Options
See PgFlow.Config for available options.
@spec core_version() :: String.t()
Returns the compatible pgflow core version.
This version indicates which pgflow database schema and SQL functions this Elixir implementation is compatible with.
Examples
PgFlow.core_version()
#=> "0.5.0"
Deletes a flow and all associated data (runs, tasks, queue).
Examples
PgFlow.delete_flow("acct_123_hubspot_sync_v1")
Enqueues a background job with the given input.
This is the primary API for dispatching jobs. Under the hood, jobs are
single-step flows, so this delegates to start_flow/2.
Examples
{:ok, run_id} = PgFlow.enqueue(MyApp.Jobs.SendEmail, %{"to" => "user@example.com"})
Enqueues a background job with options.
Supported options:
:delay_seconds- non-negative integer seconds before the job is available:scheduled_at-DateTimewhen the job should become available
Examples
{:ok, run_id} = PgFlow.enqueue(MyApp.Jobs.SendEmail, %{"to" => "user@example.com"}, delay_seconds: 60)
{:ok, run_id} = PgFlow.enqueue(MyApp.Jobs.SendEmail, %{"to" => "user@example.com"}, scheduled_at: ~U[2026-05-08 12:00:00Z])
@spec enqueue_at(module(), map(), DateTime.t()) :: {:ok, String.t()} | {:error, term()}
Enqueues a background job that becomes available at scheduled_at.
Timestamps in the past enqueue the job for immediate execution. Any DateTime
time zone is accepted; PgFlow compares the scheduled timestamp as an instant.
Examples
{:ok, run_id} = PgFlow.enqueue_at(MyApp.Jobs.SendEmail, %{"to" => "user@example.com"}, ~U[2026-05-08 12:00:00Z])
@spec enqueue_in(module(), map(), non_neg_integer()) :: {:ok, String.t()} | {:error, term()}
Enqueues a background job that becomes available after delay_seconds.
PgFlow persists the run immediately, then delays the initial pgmq task
visibility so workers cannot execute it until the delay elapses. Values of
0 enqueue the job for immediate execution.
Examples
{:ok, run_id} = PgFlow.enqueue_in(MyApp.Jobs.SendEmail, %{"to" => "user@example.com"}, 60)
Checks if a flow exists in the database.
Examples
PgFlow.flow_exists?("my_flow")
# => {:ok, true}
Gets the definition for a flow by module or slug.
Examples
{:ok, flow_def} = PgFlow.get_flow(MyApp.Flows.ProcessOrder)
{:ok, flow_def} = PgFlow.get_flow(:process_order)
{:error, :not_found} = PgFlow.get_flow(:unknown)
@spec get_run(String.t()) :: {:ok, PgFlow.Schema.Run.t()} | {:error, :not_found}
Gets a run by ID.
Examples
{:ok, run} = PgFlow.get_run("550e8400-e29b-41d4-a716-446655440000")
{:error, :not_found} = PgFlow.get_run("nonexistent-id")
@spec get_run_with_states(String.t()) :: {:ok, PgFlow.Schema.Run.t()} | {:error, :not_found}
Gets a run with all step states preloaded.
Examples
{:ok, run} = PgFlow.get_run_with_states("550e8400-e29b-41d4-a716-446655440000")
run.step_states # => [%StepState{}, ...]
Returns health check information.
Examples
PgFlow.health_check()
#=> %{status: :ok, workers: [...], flows: [...]}
@spec list_flows() :: [map()]
Lists all registered flows.
Examples
flows = PgFlow.list_flows()
#=> [%{module: MyApp.Flows.ProcessOrder, slug: :process_order, ...}, ...]
Starts a flow run with the given input.
The flow can be specified by module name, slug atom, or slug string.
Examples
{:ok, run_id} = PgFlow.start_flow(MyApp.Flows.ProcessOrder, %{"order_id" => 123})
{:ok, run_id} = PgFlow.start_flow(:process_order, %{"order_id" => 123})
@spec start_flow_sync(module() | atom() | String.t(), map(), keyword()) :: {:ok, PgFlow.Schema.Run.t()} | {:error, PgFlow.Schema.Run.t()} | {:error, :timeout} | {:error, term()}
Starts a flow and waits for completion.
Blocks until the flow completes or the timeout is reached.
Options
:timeout- Maximum time to wait in milliseconds (default: 60_000):poll_interval- How often to check status in milliseconds (default: 500)
Examples
{:ok, run} = PgFlow.start_flow_sync(MyApp.Flows.ProcessOrder, %{"order_id" => 123})
{:error, run} = PgFlow.start_flow_sync(MyApp.Flows.FailingFlow, %{})
@spec start_link(keyword()) :: Supervisor.on_start()
Starts the PgFlow supervision tree.
Options
See PgFlow.Config for available options.
Examples
PgFlow.start_link(repo: MyApp.Repo, flows: [MyApp.Flows.ProcessOrder])
Starts a worker for the given flow.
Options
:poll_interval- How often to poll for messages (default: 1000ms):visibility_timeout- How long to hold messages (default: 30s)
Examples
{:ok, pid} = PgFlow.start_worker(MyApp.Flows.ProcessOrder)
@spec stop_worker(module()) :: :ok | {:error, :not_found}
Stops a worker for the given flow.
Examples
:ok = PgFlow.stop_worker(MyApp.Flows.ProcessOrder)
Recompiles a flow definition at runtime.
Unlike the compile-time DSL (use PgFlow.Flow), this creates flow
definitions from plain data - for per-tenant automations and dynamic workflows.
If the flow already exists, this operation is destructive: existing definition and historical run/task data for the slug are deleted first.
Examples
PgFlow.upsert_flow("acct_123_hubspot_sync_v1",
max_attempts: 3,
steps: [
%{slug: "reshape", deps: []},
%{slug: "create_contact", deps: ["reshape"]}
]
)