SQL query interface for pgflow flow operations.
Provides functions for starting flows, completing/failing tasks, reading messages, and managing flow lifecycle. All functions that accept JSON data expect Elixir terms that will be encoded with Jason.
Summary
Types
Result of pruning old run data.
Functions
Compiles and upserts a flow definition.
Marks a task as completed with output data.
Delays the first queued task for a flow run by moving its pgmq visibility time.
Deletes a flow and all associated data (runs, step states, step tasks, queue).
Deletes a message from a PGMQ queue.
Marks a task as failed with an error message.
Checks if a flow exists in the database.
Retrieves the input data for a flow run.
Retrieves a flow run's current state.
Retrieves the output for a specific step in a flow run.
Prunes old flow run data older than the specified retention period.
Reads messages from a queue without blocking (non-blocking read).
Recovers stalled tasks by resetting step_tasks stuck in 'started' status.
Starts a new flow execution.
Starts multiple tasks by marking messages as in-progress.
Recompiles a flow definition from runtime options.
Checks if a slug is valid according to core pgflow rules.
Types
@type prune_result() :: %{ deleted_runs: non_neg_integer(), deleted_step_states: non_neg_integer(), deleted_step_tasks: non_neg_integer(), deleted_workers: non_neg_integer() }
Result of pruning old run data.
Functions
Compiles and upserts a flow definition.
Parameters
repo- The Ecto repositoryslug- The flow identifier slugopts- Flow options map (e.g.,%{max_retries: 3})steps- List of step definitions as maps
@spec complete_task( Ecto.Repo.t(), String.t(), String.t(), non_neg_integer(), map() | list() ) :: {:ok, term()} | {:error, term()}
Marks a task as completed with output data.
Parameters
repo- The Ecto repositoryrun_id- The flow run UUIDstep_slug- The step identifier slugtask_index- The task index (0-based)output- Output data as an Elixir term (will be encoded as JSONB)
Returns
{:ok, result}- Success result from the database{:error, reason}- Error details if the operation fails
@spec delay_run(Ecto.Repo.t(), String.t(), String.t(), non_neg_integer()) :: :ok | {:error, term()}
Delays the first queued task for a flow run by moving its pgmq visibility time.
This is a lower-level helper for public APIs such as PgFlow.enqueue_in/3
and PgFlow.enqueue_at/3. Call it in the same repository transaction as
start_flow/3 when callers need to ensure workers cannot see the task before
the delay is applied.
@spec delete_flow(Ecto.Repo.t(), String.t()) :: :ok | {:error, term()}
Deletes a flow and all associated data (runs, step states, step tasks, queue).
Parameters
repo- The Ecto repositoryslug- The flow identifier slug
Returns
:okon success (including when flow doesn't exist){:error, term()}on failure
@spec delete_message(Ecto.Repo.t(), String.t(), pos_integer()) :: {:ok, boolean()} | {:error, term()}
Deletes a message from a PGMQ queue.
@spec fail_task(Ecto.Repo.t(), String.t(), String.t(), non_neg_integer(), String.t()) :: {:ok, term()} | {:error, term()}
Marks a task as failed with an error message.
Parameters
repo- The Ecto repositoryrun_id- The flow run UUIDstep_slug- The step identifier slugtask_index- The task index (0-based)error_message- Error description string
Returns
{:ok, result}- Success result from the database{:error, reason}- Error details if the operation fails
@spec flow_exists?(Ecto.Repo.t(), String.t()) :: {:ok, boolean()} | {:error, term()}
Checks if a flow exists in the database.
@spec get_flow_input(Ecto.Repo.t(), String.t()) :: {:ok, map() | list()} | {:error, term()}
Retrieves the input data for a flow run.
@spec get_run(Ecto.Repo.t(), String.t()) :: {:ok, %{status: String.t(), output: term()}} | {:error, :not_found | term()}
Retrieves a flow run's current state.
Parameters
repo- The Ecto repositoryrun_id- The flow run UUID
Returns
{:ok, %{status: String.t(), output: term()}}- Run state{:error, :not_found}- Run does not exist{:error, reason}- Error details if the operation fails
@spec get_step_output(Ecto.Repo.t(), String.t(), String.t()) :: {:ok, map() | nil} | {:error, term()}
Retrieves the output for a specific step in a flow run.
@spec prune_data(Ecto.Repo.t(), pos_integer(), keyword()) :: {:ok, prune_result()} | {:error, term()}
Prunes old flow run data older than the specified retention period.
Options
:flow_slugs- List of flow slugs to prune (default: all flows)
@spec read(Ecto.Repo.t(), String.t(), pos_integer(), pos_integer()) :: {:ok, [list()]} | {:error, term()}
Reads messages from a queue without blocking (non-blocking read).
Uses pgmq.read() to fetch available messages. Returns immediately whether or not messages are available. Messages are made invisible for the visibility timeout period to prevent duplicate processing.
Queue poll SQL logging is disabled by default because workers call this
frequently. Set config :pgflow, :log_queue_polls, true to enable Ecto query
logging for these reads while debugging.
Parameters
repo- The Ecto repositoryqueue_name- The name of the queue to read from (matches flow_slug)visibility_timeout- Time in seconds messages remain invisiblebatch_size- Maximum number of messages to retrieve
Returns
{:ok, messages}- List of message records from pgmq (may be empty){:error, reason}- Error details if the operation fails
@spec recover_stalled_tasks(Ecto.Repo.t(), pos_integer()) :: {:ok, non_neg_integer()} | {:error, term()}
Recovers stalled tasks by resetting step_tasks stuck in 'started' status.
Tasks that have been in 'started' status longer than the stale threshold are reset to 'queued' so they can be re-processed by workers.
Starts a new flow execution.
Parameters
repo- The Ecto repositoryflow_slug- The flow identifier sluginput- Input data as an Elixir term (will be encoded as JSONB)
Returns
{:ok, run_id}- The UUID of the created flow run{:error, reason}- Error details if the operation fails
@spec start_tasks(Ecto.Repo.t(), String.t(), [pos_integer()], String.t()) :: {:ok, [list()]} | {:error, term()}
Starts multiple tasks by marking messages as in-progress.
Parameters
repo- The Ecto repositoryflow_slug- The flow identifier slugmsg_ids- List of message IDs from pgmqworker_id- The worker UUID string
Returns
{:ok, task_details}- List of task detail records{:error, reason}- Error details if the operation fails
Recompiles a flow definition from runtime options.
Uses create_flow + add_step (the proven low-level SQL functions) to
register a flow. If the flow already exists, it is dropped and re-created
to ensure the definition matches.
This operation is destructive for existing flows: all historical run and task data for the slug is deleted before recompiling.
Parameters
repo- The Ecto repositoryslug- The flow identifier slugopts- Flow-level options map with keys:"max_attempts","base_delay","timeout"steps- List of step definition maps with keys:"slug","deps","step_type", and optional"max_attempts","base_delay","timeout","start_delay"
Returns
{:ok, %{"status" => status}}where status is"compiled"or"recompiled"{:error, term()}on failure
@spec valid_slug?(Ecto.Repo.t(), String.t()) :: {:ok, boolean()} | {:error, term()}
Checks if a slug is valid according to core pgflow rules.