Tutorial: Coffee Shop Job Queue
View SourceMix.install([
{:bedrock, "~> 0.4"},
{:bedrock_job_queue, "~> 0.1"},
{:kino, "~> 0.16.0"}
])Welcome to Bedrock Job Queue!
In this tutorial, we'll build a background job system for a busy coffee shop. Along the way, you'll learn how Bedrock's job queue handles async operations reliably - from order confirmations to delivery syncing.
This tutorial follows the same patterns as the Class Scheduling tutorial, but focuses on background job processing.
Setting Up Our Database
First, let's set up our Bedrock cluster. This gives us a reliable storage layer for our job queue:
# Create a temporary folder to persist data
working_dir = Path.join(System.tmp_dir!(), "coffee_shop_#{:rand.uniform(99999)}")
File.mkdir_p!(working_dir)
defmodule CoffeeShop.Cluster do
use Bedrock.Cluster,
otp_app: :coffee_shop,
name: "coffee_shop",
config: [
capabilities: [:coordination, :log, :storage],
trace: [],
coordinator: [path: working_dir],
storage: [path: working_dir],
log: [path: working_dir]
]
end
defmodule CoffeeShop.Repo do
use Bedrock.Repo, cluster: CoffeeShop.Cluster
endNow let's start the cluster:
Kino.start_child!({CoffeeShop.Cluster, []})Quick test to make sure everything works:
alias Bedrock.JobQueue.Job
alias CoffeeShop.{Repo, JobQueue}
Repo.transact(fn -> Repo.put("test", "coffee ready") end)
result = Repo.transact(fn -> Repo.get("test") end)
"Coffee status: #{result}"Our Mission: A Busy Coffee Shop
Imagine you're building software for a coffee chain. When customers place orders, many things need to happen:
- Order confirmation - Send a notification to the customer
- Brewing updates - Track when the barista starts making the drink
- Ready for pickup - Alert the customer when their order is ready
- Delivery sync - Sync with third-party delivery apps
- Cleanup tasks - Administrative tasks that can wait
These operations shouldn't block the order flow. That's where background jobs come in!
Defining Job Types
Each job type is a module with a perform/2 callback. The second argument is metadata containing topic, queue_id, item_id, and attempt. Let's define our coffee shop jobs:
defmodule CoffeeShop.Jobs.OrderConfirmation do
use Job,
topic: "order:confirm",
priority: 100
@impl true
def perform(%{order_id: order_id, customer: customer}, _meta) do
IO.puts(" [#{order_id}] Sending confirmation to #{customer}")
Process.sleep(100)
:ok
end
end
defmodule CoffeeShop.Jobs.BrewingStarted do
use Job,
topic: "order:brewing",
priority: 50
@impl true
def perform(%{order_id: order_id, drink: drink}, _meta) do
IO.puts(" [#{order_id}] Barista started: #{drink}")
Process.sleep(100)
{:ok, %{started_at: DateTime.utc_now()}}
end
end
defmodule CoffeeShop.Jobs.ReadyForPickup do
use Job,
topic: "order:ready",
priority: 10
@impl true
def perform(%{order_id: order_id, customer: customer}, _meta) do
IO.puts(" [#{order_id}] READY! Paging #{customer}!")
Process.sleep(100)
:ok
end
end
defmodule CoffeeShop.Jobs.EspressoShot do
use Job,
topic: "brew:espresso",
max_retries: 3,
priority: 40
@impl true
def perform(%{order_id: order_id, shots: shots}, _meta) do
IO.puts(" [#{order_id}] Pulling #{shots} shot(s)...")
Process.sleep(100)
:ok
end
end
defmodule CoffeeShop.Jobs.DeliverySync do
use Job,
topic: "delivery:sync",
max_retries: 5
@impl true
def perform(%{order_id: order_id, platform: platform}, _meta) do
IO.puts(" [#{order_id}] Syncing with #{platform}...")
Process.sleep(100)
:ok
end
end
defmodule CoffeeShop.Jobs.AdminCleanup do
use Job,
topic: "admin:cleanup",
priority: 200
@impl true
def perform(%{task: task}, _meta) do
IO.puts(" Running cleanup: #{task}")
Process.sleep(100)
:ok
end
end
"6 job modules defined!"Setting Up the Job Queue
Now let's define our JobQueue module. This is where we configure the repo and wire up all our workers:
defmodule CoffeeShop.JobQueue do
use Bedrock.JobQueue,
otp_app: :coffee_shop,
repo: CoffeeShop.Repo,
workers: %{
"order:confirm" => CoffeeShop.Jobs.OrderConfirmation,
"order:brewing" => CoffeeShop.Jobs.BrewingStarted,
"order:ready" => CoffeeShop.Jobs.ReadyForPickup,
"brew:espresso" => CoffeeShop.Jobs.EspressoShot,
"delivery:sync" => CoffeeShop.Jobs.DeliverySync,
"admin:cleanup" => CoffeeShop.Jobs.AdminCleanup
}
end
"JobQueue module defined!"Start the consumer:
Kino.start_child!({JobQueue, concurrency: 2, batch_size: 5})
# Give it a moment to initialize
Process.sleep(500)
"Consumer started!"Taking Orders: Basic Enqueueing
Let's enqueue our first job - an order confirmation:
{:ok, job} = JobQueue.enqueue("main_shop", "order:confirm",
%{order_id: "ORD-001", customer: "Alice"})
IO.puts("Enqueued job: #{Base.encode16(job.id, case: :lower) |> binary_part(0, 8)}...")
# Wait for processing
Process.sleep(500)
"Check the output above!"Rush Hour: Priorities
Not all jobs are equal! A "ready for pickup" notification should jump ahead of cleanup tasks.
Priority is a number where lower = higher priority:
- Priority 10: Urgent (pickup notifications)
- Priority 50: Normal (brewing updates)
- Priority 200: Low (cleanup tasks)
Let's see priorities in action:
# Enqueue all jobs in a single transaction so they're visible atomically.
# This ensures priority ordering is demonstrated correctly - otherwise
# jobs get processed as they arrive (race between enqueue and consumer).
Repo.transact(fn ->
JobQueue.enqueue("main_shop", "admin:cleanup",
%{task: "clear_old_orders"}, priority: 200)
JobQueue.enqueue("main_shop", "order:brewing",
%{order_id: "ORD-002", drink: "Latte"}, priority: 50)
JobQueue.enqueue("main_shop", "order:ready",
%{order_id: "ORD-003", customer: "Bob"}, priority: 10)
end)
IO.puts("Enqueued atomically: cleanup (200), brewing (50), ready (10)")
IO.puts("\nWatching execution order (should be: ready → brewing → cleanup):")
Process.sleep(2000)
"Jobs processed! Check the priority order above."Happy Hour: Scheduled Jobs
Sometimes you want a job to run at a specific time:
# Schedule a job for 2 seconds from now
scheduled_time = DateTime.utc_now() |> DateTime.add(2, :second)
{:ok, _} = JobQueue.enqueue("main_shop", "order:confirm",
%{order_id: "ORD-SCHEDULED", customer: "Charlie"},
at: scheduled_time)
IO.puts("Scheduled job for #{DateTime.to_iso8601(scheduled_time)}")
IO.puts("Waiting for it to execute...")
Process.sleep(3500)
"Check above - Charlie's confirmation should have processed!"Or use a delay in milliseconds:
# Enqueue with a 1 second delay
{:ok, _} = JobQueue.enqueue("main_shop", "order:confirm",
%{order_id: "ORD-DELAYED", customer: "Diana"},
in: 1000)
IO.puts("Enqueued with 1 second delay...")
Process.sleep(2000)
"Diana's confirmation should have processed!"Multiple Shops: Tenant Isolation
Each coffee shop location can have its own queue, completely isolated:
# Different shops, different queues
{:ok, _} = JobQueue.enqueue("downtown_shop", "order:confirm",
%{order_id: "DT-001", customer: "Eve"})
{:ok, _} = JobQueue.enqueue("airport_kiosk", "order:confirm",
%{order_id: "AP-001", customer: "Frank"})
IO.puts("Enqueued to downtown_shop and airport_kiosk")
Process.sleep(1000)
"Jobs processed in their respective queues!"Checking Queue Stats
You can inspect queue status at any time:
for queue_id <- ["main_shop", "downtown_shop", "airport_kiosk"] do
stats = JobQueue.stats(queue_id)
IO.puts("#{queue_id}: pending=#{stats.pending_count}, processing=#{stats.processing_count}")
end
"Stats shown above!"What We've Learned
Congratulations! You've built a complete job queue system. Here's what we covered:
| Concept | What You Learned |
|---|---|
| Job Modules | Use use Bedrock.JobQueue.Job with topic and priority |
| Return Values | :ok, {:ok, result}, {:error, reason}, {:snooze, ms}, {:discard, reason} |
| Enqueueing | enqueue/4 with :at, :in, :priority opts |
| Priorities | Lower number = higher priority (10 > 50 > 200) |
| Topics | String keys routing to job modules via workers map |
| Tenants | Use queue_id to isolate jobs per tenant |
Keep Exploring
- Check out
Bedrock.JobQueue.Jobfor all job options - Experiment with
{:snooze, delay_ms}for rate limiting - Try
{:error, reason}to see automatic retries with backoff
The job queue is inspired by the QuiCK paper for high-performance distributed queues.
Happy brewing!