Bedrock.JobQueue.Store (bedrock_job_queue v0.2.0)
View SourceCore storage operations for the job queue.
This module provides the transactional primitives for queue operations, following QuiCK paper patterns with Bedrock's ACID guarantees.
Keyspace Layout
job_queue/
queues/{queue_id}/
items/ # {priority, vesting_time, id} -> Item
leases/{item_id} # -> Lease
dead_letter/{timestamp}/{id} # -> Item (failed jobs after max retries)
stats/pending # atomic counter
stats/processing # atomic counter
queue_leases/{queue_id} # -> QueueLease (two-tier leasing)
pointers/ # {vesting_time, queue_id} -> <<>>Key Encoding
Item keys use nested tuple encoding {priority, vesting_time, id} which:
- Sorts by priority first (lower = higher priority)
- Then by vesting_time (earlier = visible first)
- Then by id for uniqueness
Pointer keys use {vesting_time, queue_id} for efficient scanning of
queues with visible items.
Summary
Functions
Completes a leased job, removing it from the queue.
Atomically dequeues items from a queue.
Enqueues a job item atomically.
Extends a lease's expiration time.
Garbage collects stale pointers.
Gets the minimum vesting_time from items in a queue.
Obtains a lease on an item.
Obtains an exclusive lease on a queue for dequeuing.
Peeks at visible items in a queue, ordered by priority then vesting time.
Creates the pointer index keyspace.
Creates keyspaces for a queue.
Creates the queue leases keyspace for two-tier leasing.
Releases a queue lease.
Requeues a failed job with exponential backoff.
Scans the pointer index for queues with visible items.
Gets queue statistics.
Updates a queue's pointer with a new vesting_time.
Types
@type repo() :: module()
@type root_keyspace() :: Bedrock.Keyspace.t()
Functions
@spec complete(repo(), root_keyspace(), Bedrock.JobQueue.Lease.t()) :: :ok | {:error, :lease_not_found | :lease_mismatch}
Completes a leased job, removing it from the queue.
- Validates lease exists and matches
- Deletes item from queue using stored item_key (O(1) lookup)
- Deletes lease record
- Decrements processing_count
@spec dequeue(repo(), root_keyspace(), String.t(), binary(), keyword()) :: {:ok, [Bedrock.JobQueue.Lease.t()]}
Atomically dequeues items from a queue.
Per QuiCK paper: Combines peek + obtain_lease into a single atomic operation. This is more efficient than separate calls and avoids race conditions.
Options
:limit- Maximum items to dequeue (default: 10):lease_duration- Lease duration in ms (default: 30_000):now- Current time in ms (default:System.system_time(:millisecond))
Return Value
Returns {:ok, [Lease.t()]} with leases for successfully dequeued items.
Note: The returned list may be smaller than :limit if:
- Fewer items are visible in the queue
- Some items were leased by other consumers between peek and obtain_lease
- The function silently skips items that fail to lease rather than erroring
An empty list {:ok, []} indicates no items were available or all visible
items were already leased.
@spec enqueue(repo(), root_keyspace(), Bedrock.JobQueue.Item.t(), keyword()) :: :ok
Enqueues a job item atomically.
Within a transaction:
- Writes item to queue zone with key {priority, vesting_time, id}
- Updates pointer index with atomic min for vesting_time
- Increments pending_count via atomic add
@spec extend_lease( repo(), root_keyspace(), Bedrock.JobQueue.Lease.t(), pos_integer(), keyword() ) :: {:ok, Bedrock.JobQueue.Lease.t()} | {:error, :lease_not_found | :lease_mismatch | :lease_expired | :item_not_found}
Extends a lease's expiration time.
Per QuiCK paper: Long-running jobs can extend their lease before expiry to prevent the item from becoming visible to other consumers.
- Validates lease exists and matches
- Updates item's vesting_time to new expiry
- Updates lease record with new expiry
- Updates pointer index
Error Cases
{:error, :lease_expired}- Lease already expired (checked before DB access){:error, :lease_not_found}- No lease record exists for this item{:error, :lease_mismatch}- Lease ID doesn't match stored lease{:error, :item_not_found}- Item no longer exists in queue
@spec gc_stale_pointers(repo(), root_keyspace(), keyword()) :: non_neg_integer()
Garbage collects stale pointers.
Per QuiCK paper: Pointers become stale when their vesting_time has passed and the queue has no visible items. This function scans for such pointers and deletes them.
Options:
- :limit - Maximum pointers to scan (default: 100)
- :grace_period - Additional time in ms after vesting before GC (default: 60_000)
- :now - Current time in ms (default: System.system_time(:millisecond))
Returns the count of deleted pointers.
@spec min_vesting_time(repo(), root_keyspace(), String.t(), keyword()) :: non_neg_integer() | nil
Gets the minimum vesting_time from items in a queue.
Per QuiCK Algorithm 2: After dequeuing, read the minimum vesting_time to determine when to next scan this queue. Returns nil if queue is empty.
Options:
- :limit - Maximum items to scan (default: 1000)
@spec obtain_lease( repo(), root_keyspace(), Bedrock.JobQueue.Item.t(), binary(), pos_integer(), keyword() ) :: {:ok, Bedrock.JobQueue.Lease.t()} | {:error, :already_leased | :not_found}
Obtains a lease on an item.
Per QuiCK paper - leasing works by updating the vesting_time to make the item invisible to other consumers:
- Reads item, verifies it's available
- Creates lease record
- Updates item's vesting_time to lease expiry (makes it invisible)
- Updates pointer index with new min vesting_time
@spec obtain_queue_lease( repo(), root_keyspace(), String.t(), binary(), pos_integer(), keyword() ) :: {:ok, Bedrock.JobQueue.QueueLease.t()} | {:error, :queue_leased}
Obtains an exclusive lease on a queue for dequeuing.
Per QuiCK paper: Two-tier leasing prevents thundering herd. A consumer must first obtain a queue lease before it can dequeue items. Only one consumer can hold a queue lease at a time.
Returns:
{:ok, QueueLease.t()}- Lease obtained successfully{:error, :queue_leased}- Queue already leased by another consumer
@spec peek(repo(), root_keyspace(), String.t(), keyword()) :: [ Bedrock.JobQueue.Item.t() ]
Peeks at visible items in a queue, ordered by priority then vesting time.
Items are visible when:
- vesting_time <= now
- lease_id is nil (not currently leased)
Options:
- :limit - Maximum items to return (default: 10)
- :now - Current time in ms (default: System.system_time(:millisecond))
- :max_scan - Maximum items to scan before giving up (default: limit * 10)
@spec pointer_keyspace(root_keyspace()) :: Bedrock.Keyspace.t()
Creates the pointer index keyspace.
@spec queue_keyspaces(root_keyspace(), String.t()) :: %{ items: Bedrock.Keyspace.t(), leases: Bedrock.Keyspace.t(), stats: Bedrock.Keyspace.t() }
Creates keyspaces for a queue.
Returns a map with keyspaces for items, leases, and stats.
@spec queue_lease_keyspace(root_keyspace()) :: Bedrock.Keyspace.t()
Creates the queue leases keyspace for two-tier leasing.
@spec release_queue_lease(repo(), root_keyspace(), Bedrock.JobQueue.QueueLease.t()) :: :ok | {:error, :lease_not_found | :lease_mismatch}
Releases a queue lease.
Should be called after finishing dequeue operations to allow other consumers to access the queue.
@spec requeue(repo(), root_keyspace(), Bedrock.JobQueue.Lease.t(), keyword()) :: {:ok, :requeued | :dead_lettered} | {:error, :lease_not_found | :lease_mismatch | :item_not_found}
Requeues a failed job with exponential backoff.
- Increments error_count
- If exhausted, moves to dead letter queue
- Otherwise, sets new vesting_time with backoff
- Clears lease
Options
:backoff_fn- Function(attempt) -> delay_msfor retry delay:base_delay- Fixed base delay in ms (used by snooze, overrides backoff_fn):max_delay- Maximum delay in ms (default: 60_000):now- Current time in ms (default:System.system_time(:millisecond))
Error Cases
{:error, :lease_not_found}- No lease record exists for this item{:error, :lease_mismatch}- Lease ID doesn't match stored lease{:error, :item_not_found}- Item no longer exists in queue
@spec scan_visible_queues(repo(), root_keyspace(), keyword()) :: [String.t()]
Scans the pointer index for queues with visible items.
Returns queue_ids that have items with vesting_time <= now.
@spec stats(repo(), root_keyspace(), String.t()) :: %{ pending_count: non_neg_integer(), processing_count: non_neg_integer() }
Gets queue statistics.
@spec update_queue_pointer( repo(), root_keyspace(), String.t(), non_neg_integer(), keyword() ) :: :ok
Updates a queue's pointer with a new vesting_time.
Per QuiCK Algorithm 2: After processing, update the pointer to the minimum vesting_time of remaining items. This prevents rescanning queues that only have future-scheduled items.
When the new vesting_time is in the future, this also cleans up any stale pointers in the past to prevent the scanner from repeatedly finding them.