Generates an Ecto migration that registers a PgFlow flow in the database.
Usage
mix pgflow.gen.flow_migration MyApp.Flows.ArticleFlow
mix pgflow.gen.flow_migration MyApp.Flows.ArticleFlow --migrations-path priv/repo/migrationsOptions
--migrations-path- Path to the migrations directory. Defaults topriv/repo/migrations.
Generated SQL
The migration executes SQL statements that:
- Create the flow record and PGMQ queue
- Add each step with its dependencies and configuration
Example generated migration:
defmodule MyApp.Repo.Migrations.CompileArticleFlow do
use Ecto.Migration
def up do
execute "SELECT pgflow.create_flow('article_flow', 3, 5, 120)"
execute "SELECT pgflow.add_step('article_flow', 'fetch_article', ARRAY[]::text[], NULL, NULL, NULL, NULL, 'single')"
execute "SELECT pgflow.add_step('article_flow', 'summarize', ARRAY['fetch_article']::text[], NULL, NULL, NULL, NULL, 'single')"
end
def down do
execute "DELETE FROM pgflow.deps WHERE flow_slug = 'article_flow'"
execute "DELETE FROM pgflow.steps WHERE flow_slug = 'article_flow'"
execute "DELETE FROM pgflow.flows WHERE flow_slug = 'article_flow'"
execute "SELECT pgmq.drop_queue('article_flow')"
end
endRequirements
The flow module must:
- Use
PgFlow.Flow - Define a valid flow with
@flowand at least one step - Be compilable (no syntax errors)
Example
# Define a flow
defmodule MyApp.Flows.ArticleFlow do
use PgFlow.Flow
@flow slug: :article_flow, max_attempts: 3
step :fetch do
fn input, _ctx -> %{data: input} end
end
step :process, depends_on: [:fetch] do
fn deps, _ctx -> %{result: deps.fetch} end
end
end
# Generate the migration
$ mix pgflow.gen.flow_migration MyApp.Flows.ArticleFlow
# Run the migration
$ mix ecto.migrate