Tutorial: Coffee Shop Job Queue

View Source
Mix.install([
  {:bedrock, "~> 0.4"},
  {:bedrock_job_queue, "~> 0.1"},
  {:kino, "~> 0.16.0"}
])

Welcome to Bedrock Job Queue!

In this tutorial, we'll build a background job system for a busy coffee shop. Along the way, you'll learn how Bedrock's job queue handles async operations reliably - from order confirmations to delivery syncing.

This tutorial follows the same patterns as the Class Scheduling tutorial, but focuses on background job processing.

Setting Up Our Database

First, let's set up our Bedrock cluster. This gives us a reliable storage layer for our job queue:

# Create a temporary folder to persist data
working_dir = Path.join(System.tmp_dir!(), "coffee_shop_#{:rand.uniform(99999)}")
File.mkdir_p!(working_dir)

defmodule CoffeeShop.Cluster do
  use Bedrock.Cluster,
    otp_app: :coffee_shop,
    name: "coffee_shop",
    config: [
      capabilities: [:coordination, :log, :storage],
      trace: [],
      coordinator: [path: working_dir],
      storage: [path: working_dir],
      log: [path: working_dir]
    ]
end

defmodule CoffeeShop.Repo do
  use Bedrock.Repo, cluster: CoffeeShop.Cluster
end

Now let's start the cluster:

Kino.start_child!({CoffeeShop.Cluster, []})

Quick test to make sure everything works:

alias Bedrock.JobQueue.Job
alias CoffeeShop.{Repo, JobQueue}

Repo.transact(fn -> Repo.put("test", "coffee ready") end)

result = Repo.transact(fn -> Repo.get("test") end)
"Coffee status: #{result}"

Our Mission: A Busy Coffee Shop

Imagine you're building software for a coffee chain. When customers place orders, many things need to happen:

  • Order confirmation - Send a notification to the customer
  • Brewing updates - Track when the barista starts making the drink
  • Ready for pickup - Alert the customer when their order is ready
  • Delivery sync - Sync with third-party delivery apps
  • Cleanup tasks - Administrative tasks that can wait

These operations shouldn't block the order flow. That's where background jobs come in!

Defining Job Types

Each job type is a module with a perform/2 callback. The second argument is metadata containing topic, queue_id, item_id, and attempt. Let's define our coffee shop jobs:

defmodule CoffeeShop.Jobs.OrderConfirmation do
  use Job,
    topic: "order:confirm",
    priority: 100

  @impl true
  def perform(%{order_id: order_id, customer: customer}, _meta) do
    IO.puts("  [#{order_id}] Sending confirmation to #{customer}")
    Process.sleep(100)
    :ok
  end
end

defmodule CoffeeShop.Jobs.BrewingStarted do
  use Job,
    topic: "order:brewing",
    priority: 50

  @impl true
  def perform(%{order_id: order_id, drink: drink}, _meta) do
    IO.puts("  [#{order_id}] Barista started: #{drink}")
    Process.sleep(100)
    {:ok, %{started_at: DateTime.utc_now()}}
  end
end

defmodule CoffeeShop.Jobs.ReadyForPickup do
  use Job,
    topic: "order:ready",
    priority: 10

  @impl true
  def perform(%{order_id: order_id, customer: customer}, _meta) do
    IO.puts("  [#{order_id}] READY! Paging #{customer}!")
    Process.sleep(100)
    :ok
  end
end

defmodule CoffeeShop.Jobs.EspressoShot do
  use Job,
    topic: "brew:espresso",
    max_retries: 3,
    priority: 40

  @impl true
  def perform(%{order_id: order_id, shots: shots}, _meta) do
    IO.puts("  [#{order_id}] Pulling #{shots} shot(s)...")
    Process.sleep(100)
    :ok
  end
end

defmodule CoffeeShop.Jobs.DeliverySync do
  use Job,
    topic: "delivery:sync",
    max_retries: 5

  @impl true
  def perform(%{order_id: order_id, platform: platform}, _meta) do
    IO.puts("  [#{order_id}] Syncing with #{platform}...")
    Process.sleep(100)
    :ok
  end
end

defmodule CoffeeShop.Jobs.AdminCleanup do
  use Job,
    topic: "admin:cleanup",
    priority: 200

  @impl true
  def perform(%{task: task}, _meta) do
    IO.puts("  Running cleanup: #{task}")
    Process.sleep(100)
    :ok
  end
end

"6 job modules defined!"

Setting Up the Job Queue

Now let's define our JobQueue module. This is where we configure the repo and wire up all our workers:

defmodule CoffeeShop.JobQueue do
  use Bedrock.JobQueue,
    otp_app: :coffee_shop,
    repo: CoffeeShop.Repo,
    workers: %{
      "order:confirm" => CoffeeShop.Jobs.OrderConfirmation,
      "order:brewing" => CoffeeShop.Jobs.BrewingStarted,
      "order:ready" => CoffeeShop.Jobs.ReadyForPickup,
      "brew:espresso" => CoffeeShop.Jobs.EspressoShot,
      "delivery:sync" => CoffeeShop.Jobs.DeliverySync,
      "admin:cleanup" => CoffeeShop.Jobs.AdminCleanup
    }
end

"JobQueue module defined!"

Start the consumer:

Kino.start_child!({JobQueue, concurrency: 2, batch_size: 5})

# Give it a moment to initialize
Process.sleep(500)
"Consumer started!"

Taking Orders: Basic Enqueueing

Let's enqueue our first job - an order confirmation:

{:ok, job} = JobQueue.enqueue("main_shop", "order:confirm",
  %{order_id: "ORD-001", customer: "Alice"})

IO.puts("Enqueued job: #{Base.encode16(job.id, case: :lower) |> binary_part(0, 8)}...")

# Wait for processing
Process.sleep(500)
"Check the output above!"

Rush Hour: Priorities

Not all jobs are equal! A "ready for pickup" notification should jump ahead of cleanup tasks.

Priority is a number where lower = higher priority:

  • Priority 10: Urgent (pickup notifications)
  • Priority 50: Normal (brewing updates)
  • Priority 200: Low (cleanup tasks)

Let's see priorities in action:

# Enqueue all jobs in a single transaction so they're visible atomically.
# This ensures priority ordering is demonstrated correctly - otherwise
# jobs get processed as they arrive (race between enqueue and consumer).

Repo.transact(fn ->
  JobQueue.enqueue("main_shop", "admin:cleanup",
    %{task: "clear_old_orders"}, priority: 200)
  JobQueue.enqueue("main_shop", "order:brewing",
    %{order_id: "ORD-002", drink: "Latte"}, priority: 50)
  JobQueue.enqueue("main_shop", "order:ready",
    %{order_id: "ORD-003", customer: "Bob"}, priority: 10)
end)

IO.puts("Enqueued atomically: cleanup (200), brewing (50), ready (10)")
IO.puts("\nWatching execution order (should be: ready → brewing → cleanup):")
Process.sleep(2000)
"Jobs processed! Check the priority order above."

Happy Hour: Scheduled Jobs

Sometimes you want a job to run at a specific time:

# Schedule a job for 2 seconds from now
scheduled_time = DateTime.utc_now() |> DateTime.add(2, :second)

{:ok, _} = JobQueue.enqueue("main_shop", "order:confirm",
  %{order_id: "ORD-SCHEDULED", customer: "Charlie"},
  at: scheduled_time)

IO.puts("Scheduled job for #{DateTime.to_iso8601(scheduled_time)}")
IO.puts("Waiting for it to execute...")
Process.sleep(3500)
"Check above - Charlie's confirmation should have processed!"

Or use a delay in milliseconds:

# Enqueue with a 1 second delay
{:ok, _} = JobQueue.enqueue("main_shop", "order:confirm",
  %{order_id: "ORD-DELAYED", customer: "Diana"},
  in: 1000)

IO.puts("Enqueued with 1 second delay...")
Process.sleep(2000)
"Diana's confirmation should have processed!"

Multiple Shops: Tenant Isolation

Each coffee shop location can have its own queue, completely isolated:

# Different shops, different queues
{:ok, _} = JobQueue.enqueue("downtown_shop", "order:confirm",
  %{order_id: "DT-001", customer: "Eve"})

{:ok, _} = JobQueue.enqueue("airport_kiosk", "order:confirm",
  %{order_id: "AP-001", customer: "Frank"})

IO.puts("Enqueued to downtown_shop and airport_kiosk")
Process.sleep(1000)
"Jobs processed in their respective queues!"

Checking Queue Stats

You can inspect queue status at any time:

for queue_id <- ["main_shop", "downtown_shop", "airport_kiosk"] do
  stats = JobQueue.stats(queue_id)
  IO.puts("#{queue_id}: pending=#{stats.pending_count}, processing=#{stats.processing_count}")
end

"Stats shown above!"

What We've Learned

Congratulations! You've built a complete job queue system. Here's what we covered:

ConceptWhat You Learned
Job ModulesUse use Bedrock.JobQueue.Job with topic and priority
Return Values:ok, {:ok, result}, {:error, reason}, {:snooze, ms}, {:discard, reason}
Enqueueingenqueue/4 with :at, :in, :priority opts
PrioritiesLower number = higher priority (10 > 50 > 200)
TopicsString keys routing to job modules via workers map
TenantsUse queue_id to isolate jobs per tenant

Keep Exploring

  • Check out Bedrock.JobQueue.Job for all job options
  • Experiment with {:snooze, delay_ms} for rate limiting
  • Try {:error, reason} to see automatic retries with backoff

The job queue is inspired by the QuiCK paper for high-performance distributed queues.

Happy brewing!