Durable.Queue.Adapter behaviour (Durable v0.1.0-rc)
View SourceBehaviour for queue adapters.
The queue adapter is responsible for:
- Fetching and claiming jobs atomically
- Acknowledging completed jobs
- Handling failed jobs (nack)
- Rescheduling jobs for later execution
- Recovering stale locks from crashed workers
- Providing queue statistics
All callbacks receive the Durable config as the first argument to support dynamic repo and prefix configuration.
Summary
Callbacks
Acknowledges successful job completion.
Fetches and atomically claims jobs from a queue.
Returns statistics for a queue.
Updates the lock timestamp to indicate the worker is still alive.
Negatively acknowledges a job (failure).
Recovers stale locks from crashed workers.
Recovers "zombie" workflows — executions stuck in :waiting status with
no pending inputs or events that could ever unblock them.
Reschedules a job for future execution.
Wakes workflows whose sleep/1 or schedule_at/1 wait has elapsed.
Functions
Returns the default adapter module.
Types
@type job() :: %{ id: job_id(), workflow_module: String.t(), workflow_name: String.t(), queue: queue_name(), priority: integer(), input: map(), context: map(), scheduled_at: DateTime.t() | nil, current_step: String.t() | nil, lock_token: String.t() | nil }
@type job_id() :: String.t()
@type queue_name() :: String.t()
Callbacks
@callback ack(config :: Durable.Config.t(), job_id :: job_id()) :: :ok | {:error, term()}
Acknowledges successful job completion.
Called when a workflow execution completes successfully. Clears the lock fields on the workflow execution.
@callback fetch_jobs( config :: Durable.Config.t(), queue :: queue_name(), limit :: pos_integer(), node_id :: String.t() ) :: [job()]
Fetches and atomically claims jobs from a queue.
Jobs are locked to prevent duplicate processing. Returns a list of claimed jobs up to the specified limit.
Parameters
config- The Durable configurationqueue- The queue name to fetch fromlimit- Maximum number of jobs to claimnode_id- Unique identifier for this node (used for locking)
Returns
A list of job maps that have been claimed.
@callback get_stats(config :: Durable.Config.t(), queue :: queue_name()) :: map()
Returns statistics for a queue.
Returns a map with counts by status and other metrics.
@callback heartbeat(config :: Durable.Config.t(), job_id :: job_id()) :: :ok | {:error, term()}
Updates the lock timestamp to indicate the worker is still alive.
Called periodically by workers to prevent stale lock recovery from releasing jobs that are still being processed.
@callback nack(config :: Durable.Config.t(), job_id :: job_id(), reason :: term()) :: :ok | {:error, term()}
Negatively acknowledges a job (failure).
Called when a workflow execution fails after all retries. Marks the job as failed and clears the lock.
@callback recover_stale_locks( config :: Durable.Config.t(), timeout_seconds :: pos_integer() ) :: {:ok, non_neg_integer()} | {:error, term()}
Recovers stale locks from crashed workers.
Jobs locked longer than the timeout are released back to pending status. Returns the count of recovered jobs.
@callback recover_zombie_workflows( config :: Durable.Config.t(), timeout_seconds :: pos_integer() ) :: {:ok, non_neg_integer()} | {:error, term()}
Recovers "zombie" workflows — executions stuck in :waiting status with
no pending inputs or events that could ever unblock them.
This typically happens when a step crashes during a state transition and
the executor can't record a clean error (e.g. due to a secondary error
while serializing). The workflow remains in :waiting indefinitely even
though nothing is actually waiting on it.
Zombies older than the timeout are marked :failed with a diagnostic
error. Returns the count of workflows recovered.
This callback is optional so third-party adapters don't break on upgrade.
@callback reschedule( config :: Durable.Config.t(), job_id :: job_id(), run_at :: DateTime.t() ) :: :ok | {:error, term()}
Reschedules a job for future execution.
Used for sleep and wait primitives. Sets the job back to pending status with a new scheduled_at time.
@callback wake_sleeping_workflows( config :: Durable.Config.t(), batch_size :: pos_integer() ) :: {:ok, non_neg_integer()} | {:error, term()}
Wakes workflows whose sleep/1 or schedule_at/1 wait has elapsed.
Atomically transitions rows where status = :waiting AND scheduled_at <= NOW()
back to :pending, clears the lock, and merges a __sleep_satisfied__
marker into context so the step body's next sleep/schedule_at call
returns immediately instead of re-throwing.
Returns the count of workflows woken. Optional so older adapters keep working — when not implemented, the SleepWaker simply skips its sweep.
Functions
@spec default_adapter() :: module()
Returns the default adapter module.