Compiles flow definitions into SQL statements.
This module transforms PgFlow.Flow.Definition structs into executable SQL
that registers flows and steps in the pgflow database schema. The generated
SQL is intended to be run inside Ecto migrations.
Generated SQL
For each flow, the compiler generates:
- A
SELECT pgflow.create_flow(...)call to create the flow record and PGMQ queue - A
SELECT pgflow.add_step(...)call for each step in the flow
Example
definition = MyApp.Flows.ArticleFlow.__pgflow_definition__()
sql_statements = PgFlow.FlowCompiler.compile(definition)
# Returns:
# [
# "SELECT pgflow.create_flow('article_flow', 3, 5, 120)",
# "SELECT pgflow.add_step('article_flow', 'fetch_article', ARRAY[]::text[], NULL, NULL, NULL, NULL, 'single')",
# ...
# ]
Summary
Functions
Generates the SQL to add a step to a flow.
Compiles a flow definition into a list of SQL statements.
Generates the SQL to create a flow.
Generates the SQL to schedule a flow/job with pg_cron.
Generates the SQL to unschedule a flow/job from pg_cron.
Checks if the flow module has a cron expression configured.
Functions
@spec add_step_sql(atom(), PgFlow.Flow.Step.t()) :: String.t()
Generates the SQL to add a step to a flow.
Parameters
flow_slug- The flow slug atomstep- APgFlow.Flow.Stepstruct
Returns
- A SQL string for adding the step
@spec compile(PgFlow.Flow.Definition.t()) :: [String.t()]
Compiles a flow definition into a list of SQL statements.
Returns a list of SQL strings that, when executed in order, will register the flow and all its steps in the database.
Parameters
definition- APgFlow.Flow.Definitionstruct
Returns
- A list of SQL statement strings
Example
iex> definition = %PgFlow.Flow.Definition{
...> slug: :test_flow,
...> module: TestFlow,
...> opts: [max_attempts: 3, base_delay: 5, timeout: 60],
...> steps: [
...> %PgFlow.Flow.Step{slug: :step_a},
...> %PgFlow.Flow.Step{slug: :step_b, depends_on: [:step_a]}
...> ]
...> }
iex> PgFlow.FlowCompiler.compile(definition)
[
"SELECT pgflow.create_flow('test_flow', 3, 5, 60)",
"SELECT pgflow.add_step('test_flow', 'step_a', ARRAY[]::text[], NULL, NULL, NULL, NULL, 'single')",
"SELECT pgflow.add_step('test_flow', 'step_b', ARRAY['step_a']::text[], NULL, NULL, NULL, NULL, 'single')"
]
@spec create_flow_sql(PgFlow.Flow.Definition.t()) :: String.t()
Generates the SQL to create a flow.
Parameters
definition- APgFlow.Flow.Definitionstruct
Returns
- A SQL string for creating the flow
Generates the SQL to schedule a flow/job with pg_cron.
Parameters
slug- The flow/job slug atomexpression- The cron expression string (e.g., "0 ")input- The input map to pass to the flow/job
Returns
- A SQL string for scheduling the cron job
Generates the SQL to unschedule a flow/job from pg_cron.
Parameters
slug- The flow/job slug atom
Returns
- A SQL string for unscheduling the cron job
Checks if the flow module has a cron expression configured.
Parameters
module- The flow module
Returns
trueif the module has a cron expression,falseotherwise