Bedrock.JobQueue.Store (bedrock_job_queue v0.2.0)

View Source

Core storage operations for the job queue.

This module provides the transactional primitives for queue operations, following QuiCK paper patterns with Bedrock's ACID guarantees.

Keyspace Layout

job_queue/
  queues/{queue_id}/
    items/                         # {priority, vesting_time, id} -> Item
    leases/{item_id}               # -> Lease
    dead_letter/{timestamp}/{id}   # -> Item (failed jobs after max retries)
    stats/pending                  # atomic counter
    stats/processing               # atomic counter

  queue_leases/{queue_id}          # -> QueueLease (two-tier leasing)
  pointers/                        # {vesting_time, queue_id} -> <<>>

Key Encoding

Item keys use nested tuple encoding {priority, vesting_time, id} which:

  • Sorts by priority first (lower = higher priority)
  • Then by vesting_time (earlier = visible first)
  • Then by id for uniqueness

Pointer keys use {vesting_time, queue_id} for efficient scanning of queues with visible items.

Summary

Functions

Completes a leased job, removing it from the queue.

Atomically dequeues items from a queue.

Enqueues a job item atomically.

Garbage collects stale pointers.

Gets the minimum vesting_time from items in a queue.

Obtains an exclusive lease on a queue for dequeuing.

Peeks at visible items in a queue, ordered by priority then vesting time.

Creates the pointer index keyspace.

Creates keyspaces for a queue.

Creates the queue leases keyspace for two-tier leasing.

Releases a queue lease.

Requeues a failed job with exponential backoff.

Scans the pointer index for queues with visible items.

Gets queue statistics.

Updates a queue's pointer with a new vesting_time.

Types

repo()

@type repo() :: module()

root_keyspace()

@type root_keyspace() :: Bedrock.Keyspace.t()

Functions

complete(repo, root, lease)

@spec complete(repo(), root_keyspace(), Bedrock.JobQueue.Lease.t()) ::
  :ok | {:error, :lease_not_found | :lease_mismatch}

Completes a leased job, removing it from the queue.

  1. Validates lease exists and matches
  2. Deletes item from queue using stored item_key (O(1) lookup)
  3. Deletes lease record
  4. Decrements processing_count

dequeue(repo, root, queue_id, holder, opts \\ [])

@spec dequeue(repo(), root_keyspace(), String.t(), binary(), keyword()) ::
  {:ok, [Bedrock.JobQueue.Lease.t()]}

Atomically dequeues items from a queue.

Per QuiCK paper: Combines peek + obtain_lease into a single atomic operation. This is more efficient than separate calls and avoids race conditions.

Options

  • :limit - Maximum items to dequeue (default: 10)
  • :lease_duration - Lease duration in ms (default: 30_000)
  • :now - Current time in ms (default: System.system_time(:millisecond))

Return Value

Returns {:ok, [Lease.t()]} with leases for successfully dequeued items.

Note: The returned list may be smaller than :limit if:

  • Fewer items are visible in the queue
  • Some items were leased by other consumers between peek and obtain_lease
  • The function silently skips items that fail to lease rather than erroring

An empty list {:ok, []} indicates no items were available or all visible items were already leased.

enqueue(repo, root, item, opts \\ [])

@spec enqueue(repo(), root_keyspace(), Bedrock.JobQueue.Item.t(), keyword()) :: :ok

Enqueues a job item atomically.

Within a transaction:

  1. Writes item to queue zone with key {priority, vesting_time, id}
  2. Updates pointer index with atomic min for vesting_time
  3. Increments pending_count via atomic add

extend_lease(repo, root, lease, extension_ms, opts \\ [])

@spec extend_lease(
  repo(),
  root_keyspace(),
  Bedrock.JobQueue.Lease.t(),
  pos_integer(),
  keyword()
) ::
  {:ok, Bedrock.JobQueue.Lease.t()}
  | {:error,
     :lease_not_found | :lease_mismatch | :lease_expired | :item_not_found}

Extends a lease's expiration time.

Per QuiCK paper: Long-running jobs can extend their lease before expiry to prevent the item from becoming visible to other consumers.

  1. Validates lease exists and matches
  2. Updates item's vesting_time to new expiry
  3. Updates lease record with new expiry
  4. Updates pointer index

Error Cases

  • {:error, :lease_expired} - Lease already expired (checked before DB access)
  • {:error, :lease_not_found} - No lease record exists for this item
  • {:error, :lease_mismatch} - Lease ID doesn't match stored lease
  • {:error, :item_not_found} - Item no longer exists in queue

gc_stale_pointers(repo, root, opts \\ [])

@spec gc_stale_pointers(repo(), root_keyspace(), keyword()) :: non_neg_integer()

Garbage collects stale pointers.

Per QuiCK paper: Pointers become stale when their vesting_time has passed and the queue has no visible items. This function scans for such pointers and deletes them.

Options:

  • :limit - Maximum pointers to scan (default: 100)
  • :grace_period - Additional time in ms after vesting before GC (default: 60_000)
  • :now - Current time in ms (default: System.system_time(:millisecond))

Returns the count of deleted pointers.

min_vesting_time(repo, root, queue_id, opts \\ [])

@spec min_vesting_time(repo(), root_keyspace(), String.t(), keyword()) ::
  non_neg_integer() | nil

Gets the minimum vesting_time from items in a queue.

Per QuiCK Algorithm 2: After dequeuing, read the minimum vesting_time to determine when to next scan this queue. Returns nil if queue is empty.

Options:

  • :limit - Maximum items to scan (default: 1000)

obtain_lease(repo, root, item, holder, duration_ms, opts \\ [])

@spec obtain_lease(
  repo(),
  root_keyspace(),
  Bedrock.JobQueue.Item.t(),
  binary(),
  pos_integer(),
  keyword()
) :: {:ok, Bedrock.JobQueue.Lease.t()} | {:error, :already_leased | :not_found}

Obtains a lease on an item.

Per QuiCK paper - leasing works by updating the vesting_time to make the item invisible to other consumers:

  1. Reads item, verifies it's available
  2. Creates lease record
  3. Updates item's vesting_time to lease expiry (makes it invisible)
  4. Updates pointer index with new min vesting_time

obtain_queue_lease(repo, root, queue_id, holder, duration_ms, opts \\ [])

@spec obtain_queue_lease(
  repo(),
  root_keyspace(),
  String.t(),
  binary(),
  pos_integer(),
  keyword()
) :: {:ok, Bedrock.JobQueue.QueueLease.t()} | {:error, :queue_leased}

Obtains an exclusive lease on a queue for dequeuing.

Per QuiCK paper: Two-tier leasing prevents thundering herd. A consumer must first obtain a queue lease before it can dequeue items. Only one consumer can hold a queue lease at a time.

Returns:

  • {:ok, QueueLease.t()} - Lease obtained successfully
  • {:error, :queue_leased} - Queue already leased by another consumer

peek(repo, root, queue_id, opts \\ [])

@spec peek(repo(), root_keyspace(), String.t(), keyword()) :: [
  Bedrock.JobQueue.Item.t()
]

Peeks at visible items in a queue, ordered by priority then vesting time.

Items are visible when:

  • vesting_time <= now
  • lease_id is nil (not currently leased)

Options:

  • :limit - Maximum items to return (default: 10)
  • :now - Current time in ms (default: System.system_time(:millisecond))
  • :max_scan - Maximum items to scan before giving up (default: limit * 10)

pointer_keyspace(root)

@spec pointer_keyspace(root_keyspace()) :: Bedrock.Keyspace.t()

Creates the pointer index keyspace.

queue_keyspaces(root, queue_id)

@spec queue_keyspaces(root_keyspace(), String.t()) :: %{
  items: Bedrock.Keyspace.t(),
  leases: Bedrock.Keyspace.t(),
  stats: Bedrock.Keyspace.t()
}

Creates keyspaces for a queue.

Returns a map with keyspaces for items, leases, and stats.

queue_lease_keyspace(root)

@spec queue_lease_keyspace(root_keyspace()) :: Bedrock.Keyspace.t()

Creates the queue leases keyspace for two-tier leasing.

release_queue_lease(repo, root, lease)

@spec release_queue_lease(repo(), root_keyspace(), Bedrock.JobQueue.QueueLease.t()) ::
  :ok | {:error, :lease_not_found | :lease_mismatch}

Releases a queue lease.

Should be called after finishing dequeue operations to allow other consumers to access the queue.

requeue(repo, root, lease, opts)

@spec requeue(repo(), root_keyspace(), Bedrock.JobQueue.Lease.t(), keyword()) ::
  {:ok, :requeued | :dead_lettered}
  | {:error, :lease_not_found | :lease_mismatch | :item_not_found}

Requeues a failed job with exponential backoff.

  1. Increments error_count
  2. If exhausted, moves to dead letter queue
  3. Otherwise, sets new vesting_time with backoff
  4. Clears lease

Options

  • :backoff_fn - Function (attempt) -> delay_ms for retry delay
  • :base_delay - Fixed base delay in ms (used by snooze, overrides backoff_fn)
  • :max_delay - Maximum delay in ms (default: 60_000)
  • :now - Current time in ms (default: System.system_time(:millisecond))

Error Cases

  • {:error, :lease_not_found} - No lease record exists for this item
  • {:error, :lease_mismatch} - Lease ID doesn't match stored lease
  • {:error, :item_not_found} - Item no longer exists in queue

scan_visible_queues(repo, root, opts \\ [])

@spec scan_visible_queues(repo(), root_keyspace(), keyword()) :: [String.t()]

Scans the pointer index for queues with visible items.

Returns queue_ids that have items with vesting_time <= now.

stats(repo, root, queue_id)

@spec stats(repo(), root_keyspace(), String.t()) :: %{
  pending_count: non_neg_integer(),
  processing_count: non_neg_integer()
}

Gets queue statistics.

update_queue_pointer(repo, root, queue_id, vesting_time, opts \\ [])

@spec update_queue_pointer(
  repo(),
  root_keyspace(),
  String.t(),
  non_neg_integer(),
  keyword()
) :: :ok

Updates a queue's pointer with a new vesting_time.

Per QuiCK Algorithm 2: After processing, update the pointer to the minimum vesting_time of remaining items. This prevents rescanning queues that only have future-scheduled items.

When the new vesting_time is in the future, this also cleans up any stale pointers in the past to prevent the scanner from repeatedly finding them.