A macro-based DSL for defining pgflow workflows.
This module provides a declarative way to define workflow steps with dependencies,
retries, timeouts, and array processing capabilities. Use it by calling use PgFlow.Flow
in your flow module.
Example
defmodule MyApp.Flows.Example do
use PgFlow.Flow
@flow queue: :example, max_attempts: 3, base_delay: 5, timeout: 60
step :first do
fn input, ctx ->
%{result: input["value"] * 2}
end
end
step :second, depends_on: [:first] do
fn deps, ctx ->
%{doubled: deps.first["result"]}
end
end
map :process_items, array: :second do
fn item, ctx ->
%{processed: item}
end
end
endFlow Options
The @flow module attribute accepts the following options:
:queue- (required) atom identifier for the flow queue (also accepts:slugas alias):max_attempts- maximum retry attempts for failed steps (default: 1):base_delay- base delay in seconds for exponential backoff (default: 1):timeout- step execution timeout in seconds (default: 30):cron- (optional) schedule this flow via pg_cron with sub-options::schedule- (required) cron schedule string (e.g., "@hourly", "0 9 *"):input- (optional) static input map passed to each scheduled run
Step Options
Steps defined with step/2 or step/3 accept these options:
:depends_on- list of step atoms this step depends on:handler- module implementing PgFlow.StepHandler (alternative to block):max_attempts- override flow-level max_attempts:base_delay- override flow-level base_delay:timeout- override flow-level timeout:start_delay- seconds to delay before starting this step
Map Options
Map steps defined with map/2 or map/3 accept step options plus:
:array- step slug whose output array to process (for dependent maps)
Generated Functions
Using this module generates the following callback functions:
__pgflow_definition__/0- returns aPgFlow.Flow.Definitionstruct__pgflow_slug__/0- returns the flow slug atom__pgflow_steps__/0- returns the raw step definitions__pgflow_handler__/1- pattern-matched functions for each step
Summary
Functions
Defines an array processing step that executes a handler for each item.
Defines a single execution step in the workflow.
Functions
Defines an array processing step that executes a handler for each item.
Examples
# Map over inline array
map :process_users do
fn user, ctx ->
%{processed: process_user(user)}
end
end
# Map over output from another step
map :enrich_items, array: :fetch_items do
fn item, ctx ->
%{enriched: enrich(item)}
end
end
# Map with module handler
map :validate_each, array: :items, handler: MyApp.ValidateItemHandler
# Map with custom settings
map :slow_processing, array: :items, timeout: 120 do
fn item, ctx ->
%{result: slow_process(item)}
end
end
Defines a single execution step in the workflow.
Examples
# Basic step with inline handler
step :fetch_data do
fn input, ctx ->
%{data: fetch_from_api(input["url"])}
end
end
# Step with dependencies
step :transform, depends_on: [:fetch_data] do
fn deps, ctx ->
%{transformed: transform(deps.fetch_data["data"])}
end
end
# Step with module handler
step :validate, handler: MyApp.ValidateHandler
# Step with custom retry settings
step :flaky_operation, max_attempts: 5, base_delay: 10 do
fn input, ctx ->
perform_operation()
end
end