Durable.Queue.Adapter behaviour (Durable v0.1.0-rc)

View Source

Behaviour for queue adapters.

The queue adapter is responsible for:

  • Fetching and claiming jobs atomically
  • Acknowledging completed jobs
  • Handling failed jobs (nack)
  • Rescheduling jobs for later execution
  • Recovering stale locks from crashed workers
  • Providing queue statistics

All callbacks receive the Durable config as the first argument to support dynamic repo and prefix configuration.

Summary

Callbacks

Acknowledges successful job completion.

Fetches and atomically claims jobs from a queue.

Returns statistics for a queue.

Updates the lock timestamp to indicate the worker is still alive.

Negatively acknowledges a job (failure).

Recovers stale locks from crashed workers.

Recovers "zombie" workflows — executions stuck in :waiting status with no pending inputs or events that could ever unblock them.

Reschedules a job for future execution.

Wakes workflows whose sleep/1 or schedule_at/1 wait has elapsed.

Functions

Returns the default adapter module.

Types

job()

@type job() :: %{
  id: job_id(),
  workflow_module: String.t(),
  workflow_name: String.t(),
  queue: queue_name(),
  priority: integer(),
  input: map(),
  context: map(),
  scheduled_at: DateTime.t() | nil,
  current_step: String.t() | nil,
  lock_token: String.t() | nil
}

job_id()

@type job_id() :: String.t()

queue_name()

@type queue_name() :: String.t()

Callbacks

ack(config, job_id)

@callback ack(config :: Durable.Config.t(), job_id :: job_id()) :: :ok | {:error, term()}

Acknowledges successful job completion.

Called when a workflow execution completes successfully. Clears the lock fields on the workflow execution.

fetch_jobs(config, queue, limit, node_id)

@callback fetch_jobs(
  config :: Durable.Config.t(),
  queue :: queue_name(),
  limit :: pos_integer(),
  node_id :: String.t()
) :: [job()]

Fetches and atomically claims jobs from a queue.

Jobs are locked to prevent duplicate processing. Returns a list of claimed jobs up to the specified limit.

Parameters

  • config - The Durable configuration
  • queue - The queue name to fetch from
  • limit - Maximum number of jobs to claim
  • node_id - Unique identifier for this node (used for locking)

Returns

A list of job maps that have been claimed.

get_stats(config, queue)

@callback get_stats(config :: Durable.Config.t(), queue :: queue_name()) :: map()

Returns statistics for a queue.

Returns a map with counts by status and other metrics.

heartbeat(config, job_id)

@callback heartbeat(config :: Durable.Config.t(), job_id :: job_id()) ::
  :ok | {:error, term()}

Updates the lock timestamp to indicate the worker is still alive.

Called periodically by workers to prevent stale lock recovery from releasing jobs that are still being processed.

nack(config, job_id, reason)

@callback nack(config :: Durable.Config.t(), job_id :: job_id(), reason :: term()) ::
  :ok | {:error, term()}

Negatively acknowledges a job (failure).

Called when a workflow execution fails after all retries. Marks the job as failed and clears the lock.

recover_stale_locks(config, timeout_seconds)

@callback recover_stale_locks(
  config :: Durable.Config.t(),
  timeout_seconds :: pos_integer()
) ::
  {:ok, non_neg_integer()} | {:error, term()}

Recovers stale locks from crashed workers.

Jobs locked longer than the timeout are released back to pending status. Returns the count of recovered jobs.

recover_zombie_workflows(config, timeout_seconds)

(optional)
@callback recover_zombie_workflows(
  config :: Durable.Config.t(),
  timeout_seconds :: pos_integer()
) ::
  {:ok, non_neg_integer()} | {:error, term()}

Recovers "zombie" workflows — executions stuck in :waiting status with no pending inputs or events that could ever unblock them.

This typically happens when a step crashes during a state transition and the executor can't record a clean error (e.g. due to a secondary error while serializing). The workflow remains in :waiting indefinitely even though nothing is actually waiting on it.

Zombies older than the timeout are marked :failed with a diagnostic error. Returns the count of workflows recovered.

This callback is optional so third-party adapters don't break on upgrade.

reschedule(config, job_id, run_at)

@callback reschedule(
  config :: Durable.Config.t(),
  job_id :: job_id(),
  run_at :: DateTime.t()
) ::
  :ok | {:error, term()}

Reschedules a job for future execution.

Used for sleep and wait primitives. Sets the job back to pending status with a new scheduled_at time.

wake_sleeping_workflows(config, batch_size)

(optional)
@callback wake_sleeping_workflows(
  config :: Durable.Config.t(),
  batch_size :: pos_integer()
) ::
  {:ok, non_neg_integer()} | {:error, term()}

Wakes workflows whose sleep/1 or schedule_at/1 wait has elapsed.

Atomically transitions rows where status = :waiting AND scheduled_at <= NOW() back to :pending, clears the lock, and merges a __sleep_satisfied__ marker into context so the step body's next sleep/schedule_at call returns immediately instead of re-throwing.

Returns the count of workflows woken. Optional so older adapters keep working — when not implemented, the SleepWaker simply skips its sweep.

Functions

default_adapter()

@spec default_adapter() :: module()

Returns the default adapter module.