A macro-based DSL for defining pgflow background jobs.
Jobs are single-step flows under the hood, providing a simpler API for
one-off background processing. Use use PgFlow.Job in your job module.
Example
defmodule MyApp.Jobs.SendEmail do
use PgFlow.Job
@job queue: :send_email, max_attempts: 3, base_delay: 5, timeout: 60
perform :send do
fn input, _ctx ->
Mailer.send(input["to"], input["subject"], input["body"])
%{sent: true}
end
end
endThe optional name in perform :name do sets the step slug in the database.
When omitted, the step slug defaults to the @job queue/slug value:
@job queue: :cleanup
perform do # step slug will be :cleanup
fn _input, _ctx -> :ok end
endJob Options
The @job module attribute accepts the following options:
:queue- (required) atom identifier for the job queue (also accepts:slugas alias):max_attempts- maximum retry attempts for failed jobs (default: 1):base_delay- base delay in seconds for exponential backoff (default: 1):timeout- job execution timeout in seconds (default: 30)
Generated Functions
Using this module generates the following callback functions:
__pgflow_definition__/0- returns aPgFlow.Flow.Definitionstruct withflow_type: :job__pgflow_slug__/0- returns the job queue slug atom__pgflow_steps__/0- returns the raw step definitions__pgflow_handler__/0- returns the handler function__pgflow_handler__(:step_name)- returns the handler function by step nameperform/2- convenience wrapper for testing:perform(input, ctx)
Summary
Functions
Defines the job's perform block with an optional step name.
Functions
Defines the job's perform block with an optional step name.
When called with a name (perform :step_name do), that name becomes
the step slug in the database. When called without a name (perform do),
the step slug defaults to the @job queue/slug value.
The block must return a 2-arity function that receives the job input
and a PgFlow.Context struct.
Examples
# Explicit step name
perform :send_email do
fn input, _ctx ->
%{result: process(input["data"])}
end
end
# Step name defaults to @job queue/slug
perform do
fn input, _ctx ->
%{result: process(input["data"])}
end
end