Bedrock.JobQueue (bedrock_job_queue v0.2.0)

View Source

A durable job queue system for Elixir, built on Bedrock.

Modeled after Apple's QuiCK paper, this system provides:

  • Topic-based routing to worker modules
  • Two-level sharding (per-queue zones + pointer index)
  • Priority ordering and scheduled/delayed jobs
  • Fault-tolerant leasing via vesting time
  • Automatic lease extension for long-running jobs
  • Scanner/Manager/Worker consumer architecture

Jobs are delivered with at-least-once semantics. Consumers lease jobs before execution and periodically extend active leases while workers are running. If a worker stops before completing the job, lease extension stops and the job eventually becomes visible for another consumer to claim.

Quick Start

Define a JobQueue module for your application:

defmodule MyApp.JobQueue do
  use Bedrock.JobQueue,
    otp_app: :my_app,
    repo: MyApp.Repo,
    workers: %{
      "user:created" => MyApp.Jobs.UserCreated,
      "email:send" => MyApp.Jobs.SendEmail
    }
end

Define job modules:

defmodule MyApp.Jobs.UserCreated do
  use Bedrock.JobQueue.Job, topic: "user:created"

  @impl true
  def perform(%{user_id: user_id}, meta) do
    # meta.topic, meta.queue_id, meta.item_id, meta.attempt available
    :ok
  end
end

Add to your supervision tree:

children = [
  MyApp.Cluster,
  {MyApp.JobQueue, concurrency: 10, batch_size: 5}
]

Enqueue jobs:

# Immediate processing
MyApp.JobQueue.enqueue("tenant_1", "user:created", %{user_id: 123})

# Schedule for a specific time
MyApp.JobQueue.enqueue("tenant_1", "email:send", payload, at: ~U[2024-01-15 10:00:00Z])

# Schedule with a delay
MyApp.JobQueue.enqueue("tenant_1", "cleanup", payload, in: :timer.hours(1))

# With priority (lower = higher priority)
MyApp.JobQueue.enqueue("tenant_1", "urgent", payload, priority: 0)

Summary

Functions

Defines a JobQueue module.

Types

config()

@type config() :: %{
  otp_app: atom(),
  repo: module(),
  workers: %{required(String.t()) => module()}
}

enqueue_opts()

@type enqueue_opts() :: [
  at: DateTime.t(),
  in: non_neg_integer(),
  priority: integer(),
  max_retries: non_neg_integer(),
  id: binary()
]

payload()

@type payload() :: map() | binary()

queue_id()

@type queue_id() :: term()

topic()

@type topic() :: String.t()

Functions

__using__(opts)

(macro)

Defines a JobQueue module.

Options

  • :otp_app - The OTP application name (required)
  • :repo - The Bedrock Repo module (required)
  • :workers - Map of topic strings to job modules (default: %{})

Example

defmodule MyApp.JobQueue do
  use Bedrock.JobQueue,
    otp_app: :my_app,
    repo: MyApp.Repo,
    workers: %{
      "email:send" => MyApp.Jobs.SendEmail
    }
end