Context struct passed to step handler functions.
The context provides metadata about the current execution environment and utilities for accessing flow data.
Fields
:run_id- UUID of the current flow run:step_slug- Slug of the current step being executed:task_index- Index of the task within the step (0 for single steps):attempt- Current attempt number (1-indexed):flow_input- Lazy-loaded flow input (useget_flow_input/1to access):repo- Ecto repository module for database access
Usage
Step handlers receive the context as their second argument:
step :process, depends_on: [:fetch] do
fn deps, ctx ->
# Access context fields
IO.puts("Running step #{ctx.step_slug} for run #{ctx.run_id}")
IO.puts("This is attempt #{ctx.attempt}")
# Get flow input if needed
input = PgFlow.Context.get_flow_input(ctx)
# Use dependencies from previous steps
%{result: deps.fetch.data}
end
end
Summary
Functions
Loads the flow input from the database.
Creates a new context struct.
Preloads the flow input into the context.
Types
@type t() :: %PgFlow.Context{ attempt: pos_integer(), flow_input: map() | :not_loaded, repo: module(), run_id: Ecto.UUID.t(), step_slug: atom(), task_index: non_neg_integer() }
Functions
Loads the flow input from the database.
The flow input is lazily loaded to avoid unnecessary database queries when the input is not needed by the step handler.
Returns the flow input as a map, or raises if the run cannot be found.
Examples
input = PgFlow.Context.get_flow_input(ctx)
#=> %{"order_id" => 123, "customer_id" => 456}
Creates a new context struct.
Examples
ctx = PgFlow.Context.new(
run_id: "550e8400-e29b-41d4-a716-446655440000",
step_slug: :process,
task_index: 0,
attempt: 1,
repo: MyApp.Repo
)
Preloads the flow input into the context.
This is useful when you want to load the flow input eagerly, such as when processing multiple tasks that will all need access to the flow input.
Examples
ctx = PgFlow.Context.preload_flow_input(ctx)
# flow_input is now loaded and cached in the context