PgFlow.Queries.Flows (PgFlow v0.1.0)

Copy Markdown View Source

SQL query interface for pgflow flow operations.

Provides functions for starting flows, completing/failing tasks, reading messages, and managing flow lifecycle. All functions that accept JSON data expect Elixir terms that will be encoded with Jason.

Summary

Types

Result of pruning old run data.

Functions

Compiles and upserts a flow definition.

Marks a task as completed with output data.

Delays the first queued task for a flow run by moving its pgmq visibility time.

Deletes a flow and all associated data (runs, step states, step tasks, queue).

Deletes a message from a PGMQ queue.

Marks a task as failed with an error message.

Checks if a flow exists in the database.

Retrieves the input data for a flow run.

Retrieves a flow run's current state.

Retrieves the output for a specific step in a flow run.

Prunes old flow run data older than the specified retention period.

Reads messages from a queue without blocking (non-blocking read).

Recovers stalled tasks by resetting step_tasks stuck in 'started' status.

Starts a new flow execution.

Starts multiple tasks by marking messages as in-progress.

Recompiles a flow definition from runtime options.

Checks if a slug is valid according to core pgflow rules.

Types

prune_result()

@type prune_result() :: %{
  deleted_runs: non_neg_integer(),
  deleted_step_states: non_neg_integer(),
  deleted_step_tasks: non_neg_integer(),
  deleted_workers: non_neg_integer()
}

Result of pruning old run data.

Functions

compile_flow(repo, slug, opts, steps)

@spec compile_flow(Ecto.Repo.t(), String.t(), map(), [map()]) ::
  {:ok, term()} | {:error, term()}

Compiles and upserts a flow definition.

Parameters

  • repo - The Ecto repository
  • slug - The flow identifier slug
  • opts - Flow options map (e.g., %{max_retries: 3})
  • steps - List of step definitions as maps

complete_task(repo, run_id, step_slug, task_index, output)

@spec complete_task(
  Ecto.Repo.t(),
  String.t(),
  String.t(),
  non_neg_integer(),
  map() | list()
) ::
  {:ok, term()} | {:error, term()}

Marks a task as completed with output data.

Parameters

  • repo - The Ecto repository
  • run_id - The flow run UUID
  • step_slug - The step identifier slug
  • task_index - The task index (0-based)
  • output - Output data as an Elixir term (will be encoded as JSONB)

Returns

  • {:ok, result} - Success result from the database
  • {:error, reason} - Error details if the operation fails

delay_run(repo, flow_slug, run_id, delay_seconds)

@spec delay_run(Ecto.Repo.t(), String.t(), String.t(), non_neg_integer()) ::
  :ok | {:error, term()}

Delays the first queued task for a flow run by moving its pgmq visibility time.

This is a lower-level helper for public APIs such as PgFlow.enqueue_in/3 and PgFlow.enqueue_at/3. Call it in the same repository transaction as start_flow/3 when callers need to ensure workers cannot see the task before the delay is applied.

delete_flow(repo, slug)

@spec delete_flow(Ecto.Repo.t(), String.t()) :: :ok | {:error, term()}

Deletes a flow and all associated data (runs, step states, step tasks, queue).

Parameters

  • repo - The Ecto repository
  • slug - The flow identifier slug

Returns

  • :ok on success (including when flow doesn't exist)
  • {:error, term()} on failure

delete_message(repo, queue_name, msg_id)

@spec delete_message(Ecto.Repo.t(), String.t(), pos_integer()) ::
  {:ok, boolean()} | {:error, term()}

Deletes a message from a PGMQ queue.

fail_task(repo, run_id, step_slug, task_index, error_message)

@spec fail_task(Ecto.Repo.t(), String.t(), String.t(), non_neg_integer(), String.t()) ::
  {:ok, term()} | {:error, term()}

Marks a task as failed with an error message.

Parameters

  • repo - The Ecto repository
  • run_id - The flow run UUID
  • step_slug - The step identifier slug
  • task_index - The task index (0-based)
  • error_message - Error description string

Returns

  • {:ok, result} - Success result from the database
  • {:error, reason} - Error details if the operation fails

flow_exists?(repo, flow_slug)

@spec flow_exists?(Ecto.Repo.t(), String.t()) :: {:ok, boolean()} | {:error, term()}

Checks if a flow exists in the database.

get_flow_input(repo, run_id)

@spec get_flow_input(Ecto.Repo.t(), String.t()) ::
  {:ok, map() | list()} | {:error, term()}

Retrieves the input data for a flow run.

get_run(repo, run_id)

@spec get_run(Ecto.Repo.t(), String.t()) ::
  {:ok, %{status: String.t(), output: term()}} | {:error, :not_found | term()}

Retrieves a flow run's current state.

Parameters

  • repo - The Ecto repository
  • run_id - The flow run UUID

Returns

  • {:ok, %{status: String.t(), output: term()}} - Run state
  • {:error, :not_found} - Run does not exist
  • {:error, reason} - Error details if the operation fails

get_step_output(repo, run_id, step_slug)

@spec get_step_output(Ecto.Repo.t(), String.t(), String.t()) ::
  {:ok, map() | nil} | {:error, term()}

Retrieves the output for a specific step in a flow run.

prune_data(repo, retention_hours, opts \\ [])

@spec prune_data(Ecto.Repo.t(), pos_integer(), keyword()) ::
  {:ok, prune_result()} | {:error, term()}

Prunes old flow run data older than the specified retention period.

Options

  • :flow_slugs - List of flow slugs to prune (default: all flows)

read(repo, queue_name, visibility_timeout, batch_size)

@spec read(Ecto.Repo.t(), String.t(), pos_integer(), pos_integer()) ::
  {:ok, [list()]} | {:error, term()}

Reads messages from a queue without blocking (non-blocking read).

Uses pgmq.read() to fetch available messages. Returns immediately whether or not messages are available. Messages are made invisible for the visibility timeout period to prevent duplicate processing.

Queue poll SQL logging is disabled by default because workers call this frequently. Set config :pgflow, :log_queue_polls, true to enable Ecto query logging for these reads while debugging.

Parameters

  • repo - The Ecto repository
  • queue_name - The name of the queue to read from (matches flow_slug)
  • visibility_timeout - Time in seconds messages remain invisible
  • batch_size - Maximum number of messages to retrieve

Returns

  • {:ok, messages} - List of message records from pgmq (may be empty)
  • {:error, reason} - Error details if the operation fails

recover_stalled_tasks(repo, stale_threshold)

@spec recover_stalled_tasks(Ecto.Repo.t(), pos_integer()) ::
  {:ok, non_neg_integer()} | {:error, term()}

Recovers stalled tasks by resetting step_tasks stuck in 'started' status.

Tasks that have been in 'started' status longer than the stale threshold are reset to 'queued' so they can be re-processed by workers.

start_flow(repo, flow_slug, input)

@spec start_flow(Ecto.Repo.t(), String.t(), map() | list()) ::
  {:ok, String.t()} | {:error, term()}

Starts a new flow execution.

Parameters

  • repo - The Ecto repository
  • flow_slug - The flow identifier slug
  • input - Input data as an Elixir term (will be encoded as JSONB)

Returns

  • {:ok, run_id} - The UUID of the created flow run
  • {:error, reason} - Error details if the operation fails

start_tasks(repo, flow_slug, msg_ids, worker_id)

@spec start_tasks(Ecto.Repo.t(), String.t(), [pos_integer()], String.t()) ::
  {:ok, [list()]} | {:error, term()}

Starts multiple tasks by marking messages as in-progress.

Parameters

  • repo - The Ecto repository
  • flow_slug - The flow identifier slug
  • msg_ids - List of message IDs from pgmq
  • worker_id - The worker UUID string

Returns

  • {:ok, task_details} - List of task detail records
  • {:error, reason} - Error details if the operation fails

upsert_flow(repo, slug, opts, steps)

@spec upsert_flow(Ecto.Repo.t(), String.t(), map(), [map()]) ::
  {:ok, map()} | {:error, term()}

Recompiles a flow definition from runtime options.

Uses create_flow + add_step (the proven low-level SQL functions) to register a flow. If the flow already exists, it is dropped and re-created to ensure the definition matches.

This operation is destructive for existing flows: all historical run and task data for the slug is deleted before recompiling.

Parameters

  • repo - The Ecto repository
  • slug - The flow identifier slug
  • opts - Flow-level options map with keys: "max_attempts", "base_delay", "timeout"
  • steps - List of step definition maps with keys: "slug", "deps", "step_type", and optional "max_attempts", "base_delay", "timeout", "start_delay"

Returns

  • {:ok, %{"status" => status}} where status is "compiled" or "recompiled"
  • {:error, term()} on failure

valid_slug?(repo, slug)

@spec valid_slug?(Ecto.Repo.t(), String.t()) :: {:ok, boolean()} | {:error, term()}

Checks if a slug is valid according to core pgflow rules.