PgFlow.Job (PgFlow v0.1.0)

Copy Markdown View Source

A macro-based DSL for defining pgflow background jobs.

Jobs are single-step flows under the hood, providing a simpler API for one-off background processing. Use use PgFlow.Job in your job module.

Example

defmodule MyApp.Jobs.SendEmail do
  use PgFlow.Job

  @job queue: :send_email, max_attempts: 3, base_delay: 5, timeout: 60

  perform :send do
    fn input, _ctx ->
      Mailer.send(input["to"], input["subject"], input["body"])
      %{sent: true}
    end
  end
end

The optional name in perform :name do sets the step slug in the database. When omitted, the step slug defaults to the @job queue/slug value:

@job queue: :cleanup
perform do  # step slug will be :cleanup
  fn _input, _ctx -> :ok end
end

Job Options

The @job module attribute accepts the following options:

  • :queue - (required) atom identifier for the job queue (also accepts :slug as alias)
  • :max_attempts - maximum retry attempts for failed jobs (default: 1)
  • :base_delay - base delay in seconds for exponential backoff (default: 1)
  • :timeout - job execution timeout in seconds (default: 30)

Generated Functions

Using this module generates the following callback functions:

  • __pgflow_definition__/0 - returns a PgFlow.Flow.Definition struct with flow_type: :job
  • __pgflow_slug__/0 - returns the job queue slug atom
  • __pgflow_steps__/0 - returns the raw step definitions
  • __pgflow_handler__/0 - returns the handler function
  • __pgflow_handler__(:step_name) - returns the handler function by step name
  • perform/2 - convenience wrapper for testing: perform(input, ctx)

Summary

Functions

Defines the job's perform block with an optional step name.

Functions

perform(list)

(macro)

perform(name, list)

(macro)

Defines the job's perform block with an optional step name.

When called with a name (perform :step_name do), that name becomes the step slug in the database. When called without a name (perform do), the step slug defaults to the @job queue/slug value.

The block must return a 2-arity function that receives the job input and a PgFlow.Context struct.

Examples

# Explicit step name
perform :send_email do
  fn input, _ctx ->
    %{result: process(input["data"])}
  end
end

# Step name defaults to @job queue/slug
perform do
  fn input, _ctx ->
    %{result: process(input["data"])}
  end
end