Bedrock.JobQueue (bedrock_job_queue v0.2.0)
View SourceA durable job queue system for Elixir, built on Bedrock.
Modeled after Apple's QuiCK paper, this system provides:
- Topic-based routing to worker modules
- Two-level sharding (per-queue zones + pointer index)
- Priority ordering and scheduled/delayed jobs
- Fault-tolerant leasing via vesting time
- Automatic lease extension for long-running jobs
- Scanner/Manager/Worker consumer architecture
Jobs are delivered with at-least-once semantics. Consumers lease jobs before execution and periodically extend active leases while workers are running. If a worker stops before completing the job, lease extension stops and the job eventually becomes visible for another consumer to claim.
Quick Start
Define a JobQueue module for your application:
defmodule MyApp.JobQueue do
use Bedrock.JobQueue,
otp_app: :my_app,
repo: MyApp.Repo,
workers: %{
"user:created" => MyApp.Jobs.UserCreated,
"email:send" => MyApp.Jobs.SendEmail
}
endDefine job modules:
defmodule MyApp.Jobs.UserCreated do
use Bedrock.JobQueue.Job, topic: "user:created"
@impl true
def perform(%{user_id: user_id}, meta) do
# meta.topic, meta.queue_id, meta.item_id, meta.attempt available
:ok
end
endAdd to your supervision tree:
children = [
MyApp.Cluster,
{MyApp.JobQueue, concurrency: 10, batch_size: 5}
]Enqueue jobs:
# Immediate processing
MyApp.JobQueue.enqueue("tenant_1", "user:created", %{user_id: 123})
# Schedule for a specific time
MyApp.JobQueue.enqueue("tenant_1", "email:send", payload, at: ~U[2024-01-15 10:00:00Z])
# Schedule with a delay
MyApp.JobQueue.enqueue("tenant_1", "cleanup", payload, in: :timer.hours(1))
# With priority (lower = higher priority)
MyApp.JobQueue.enqueue("tenant_1", "urgent", payload, priority: 0)
Summary
Functions
Defines a JobQueue module.
Types
@type enqueue_opts() :: [ at: DateTime.t(), in: non_neg_integer(), priority: integer(), max_retries: non_neg_integer(), id: binary() ]
@type queue_id() :: term()
@type topic() :: String.t()
Functions
Defines a JobQueue module.
Options
:otp_app- The OTP application name (required):repo- The Bedrock Repo module (required):workers- Map of topic strings to job modules (default: %{})
Example
defmodule MyApp.JobQueue do
use Bedrock.JobQueue,
otp_app: :my_app,
repo: MyApp.Repo,
workers: %{
"email:send" => MyApp.Jobs.SendEmail
}
end