Bedrock.JobQueue.Job behaviour (bedrock_job_queue v0.2.0)

View Source

Behaviour for job modules.

Usage

defmodule MyApp.EmailJob do
  use Bedrock.JobQueue.Job,
    max_retries: 5,
    priority: 10

  @impl true
  def perform(%{to: to, subject: subject, body: body}, meta) do
    # meta.topic, meta.queue_id, meta.item_id, meta.attempt available
    # Send email
    :ok
  end
end

Then configure the worker mapping in your JobQueue module:

defmodule MyApp.JobQueue do
  use Bedrock.JobQueue,
    otp_app: :my_app,
    repo: MyApp.Repo,
    workers: %{
      "email:send" => MyApp.EmailJob
    }
end

Options

  • :topic - Topic string for routing (optional, can also be passed at enqueue time)
  • :max_retries - Maximum retry attempts (default: 3)
  • :priority - Default priority for jobs created by this module (default: 100)
  • :timeout - Job execution timeout in milliseconds (default: 30_000)

Return Values

The perform/2 callback should return:

  • :ok - Job completed successfully
  • {:ok, result} - Job completed with a result (logged but otherwise ignored)
  • {:error, reason} - Job failed, will be retried if attempts remain
  • {:discard, reason} - Job failed permanently, won't be retried
  • {:snooze, delay_ms} - Reschedule job for later and count it in retry accounting

Meta

The second argument to perform/2 is a map containing:

  • :topic - The topic string that was enqueued
  • :queue_id - The queue_id (sharding key) for fairness
  • :item_id - Unique identifier for this job item
  • :attempt - Current attempt number (1-based)

Summary

Callbacks

Performs the job with the given payload and metadata.

Returns the job execution timeout in milliseconds.

Types

meta()

@type meta() :: %{
  topic: String.t(),
  queue_id: term(),
  item_id: binary(),
  attempt: pos_integer()
}

result()

@type result() ::
  :ok
  | {:ok, term()}
  | {:error, term()}
  | {:discard, term()}
  | {:snooze, non_neg_integer()}

Callbacks

perform(payload, meta)

@callback perform(payload :: map(), meta :: meta()) :: result()

Performs the job with the given payload and metadata.

timeout()

(optional)
@callback timeout() :: pos_integer()

Returns the job execution timeout in milliseconds.