PgFlow.Flow.Definition (PgFlow v0.1.0)

Copy Markdown View Source

Represents a compiled flow definition.

A flow definition contains all the information needed to execute a workflow:

  • A unique slug identifier
  • The module that defined it
  • Flow-level configuration options
  • An ordered list of steps with their dependencies

Flow definitions are validated at compile time to ensure:

  • No circular dependencies exist
  • All step dependencies reference existing steps
  • Map steps have at most one dependency

Examples

%PgFlow.Flow.Definition{
  slug: :user_onboarding,
  module: MyApp.Flows.UserOnboarding,
  opts: [max_attempts: 3, base_delay: 1000, timeout: 30_000],
  steps: [
    %PgFlow.Flow.Step{slug: :create_account},
    %PgFlow.Flow.Step{slug: :send_welcome_email, depends_on: [:create_account]}
  ]
}

Summary

Functions

Converts raw step tuples from the DSL into Step structs.

Converts a flow slug (atom) to a string for database storage.

Validates a flow definition.

Validates that all step dependencies reference existing steps.

Validates that map steps have at most one dependency.

Validates that the flow has no circular dependencies.

Types

flow_type()

@type flow_type() :: :flow | :job

t()

@type t() :: %PgFlow.Flow.Definition{
  flow_type: flow_type(),
  module: module(),
  opts: keyword(),
  slug: atom(),
  steps: [PgFlow.Flow.Step.t()]
}

Functions

build_steps(step_tuples)

@spec build_steps([atom() | {atom(), keyword()}]) :: [PgFlow.Flow.Step.t()]

Converts raw step tuples from the DSL into Step structs.

Examples

iex> PgFlow.Flow.Definition.build_steps([:fetch_user, {:send_email, depends_on: [:fetch_user]}])
[
  %PgFlow.Flow.Step{slug: :fetch_user, step_type: :single, depends_on: []},
  %PgFlow.Flow.Step{slug: :send_email, step_type: :single, depends_on: [:fetch_user]}
]

slug_to_string(slug)

@spec slug_to_string(t() | atom()) :: String.t()

Converts a flow slug (atom) to a string for database storage.

Examples

iex> PgFlow.Flow.Definition.slug_to_string(:user_onboarding)
"user_onboarding"

iex> PgFlow.Flow.Definition.slug_to_string(:payment_processing)
"payment_processing"

validate(definition)

@spec validate(t()) :: {:ok, t()} | {:error, String.t()}

Validates a flow definition.

Performs the following checks:

  1. No circular dependencies exist in the step graph
  2. All step dependencies reference existing steps
  3. Map steps have at most one dependency

Returns {:ok, definition} if valid, or {:error, reason} if invalid.

Examples

iex> definition = %PgFlow.Flow.Definition{
...>   slug: :test_flow,
...>   module: TestFlow,
...>   steps: [
...>     %PgFlow.Flow.Step{slug: :step1},
...>     %PgFlow.Flow.Step{slug: :step2, depends_on: [:step1]}
...>   ]
...> }
iex> PgFlow.Flow.Definition.validate(definition)
{:ok, definition}

iex> definition = %PgFlow.Flow.Definition{
...>   slug: :test_flow,
...>   module: TestFlow,
...>   steps: [
...>     %PgFlow.Flow.Step{slug: :step1, depends_on: [:step2]},
...>     %PgFlow.Flow.Step{slug: :step2, depends_on: [:step1]}
...>   ]
...> }
iex> PgFlow.Flow.Definition.validate(definition)
{:error, "Circular dependency detected in flow :test_flow"}

validate_dependencies_exist(definition)

@spec validate_dependencies_exist(t()) :: :ok | {:error, String.t()}

Validates that all step dependencies reference existing steps.

Examples

iex> definition = %PgFlow.Flow.Definition{
...>   slug: :test_flow,
...>   module: TestFlow,
...>   steps: [
...>     %PgFlow.Flow.Step{slug: :step1, depends_on: [:nonexistent]}
...>   ]
...> }
iex> PgFlow.Flow.Definition.validate_dependencies_exist(definition)
{:error, "Step :step1 in flow :test_flow depends on non-existent step :nonexistent"}

validate_map_step_constraints(definition)

@spec validate_map_step_constraints(t()) :: :ok | {:error, String.t()}

Validates that map steps have at most one dependency.

Map steps process arrays from a single source, so they can only depend on one step.

Examples

iex> definition = %PgFlow.Flow.Definition{
...>   slug: :test_flow,
...>   module: TestFlow,
...>   steps: [
...>     %PgFlow.Flow.Step{
...>       slug: :process_items,
...>       step_type: :map,
...>       depends_on: [:step1, :step2]
...>     }
...>   ]
...> }
iex> PgFlow.Flow.Definition.validate_map_step_constraints(definition)
{:error, "Map step :process_items in flow :test_flow must have exactly one dependency (the array source), but has 2"}

validate_no_cycles(definition)

@spec validate_no_cycles(t()) :: :ok | {:error, String.t()}

Validates that the flow has no circular dependencies.

Uses Kahn's algorithm for topological sorting to detect cycles. If a topological sort cannot be completed, a cycle exists.

Examples

iex> definition = %PgFlow.Flow.Definition{
...>   slug: :test_flow,
...>   module: TestFlow,
...>   steps: [
...>     %PgFlow.Flow.Step{slug: :a, depends_on: [:b]},
...>     %PgFlow.Flow.Step{slug: :b, depends_on: [:a]}
...>   ]
...> }
iex> PgFlow.Flow.Definition.validate_no_cycles(definition)
{:error, "Circular dependency detected in flow :test_flow"}