gaffer (gaffer v0.4.0)
View SourceMain API for managing job queues.
Summary
Types
An age in milliseconds.
Retry backoff strategy.
An interval in milliseconds.
A job.
A recorded execution error.
Filter options for listing jobs.
Unique job identifier.
Per-job options at insert time.
Possible states of a job.
Maximum age per job state, in milliseconds.
Maximum execution attempts for a job.
Maximum number of concurrent workers.
Job priority. Higher values are processed first.
Pruning configuration for a queue.
Queue identifier.
Queue configuration.
Information about a queue.
Grace period for worker shutdown in milliseconds.
Information about a job state.
Execution timeout in milliseconds.
Job timestamp.
Job Management
Cancels a job, preventing further execution.
Deletes a job.
Equivalent to drain(Queue, 5000).
Waits for the active workers to finish their jobs.
Equivalent to flush(Queue, infinity).
Waits for all jobs in the queue to finish.
Gets the definition of a job.
Equivalent to insert(Queue, Payload, #{}).
Inserts a job into a queue.
Lists all jobs in the given queue.
Lists jobs in the given queue matching the filter options.
Triggers an immediate prune of stale jobs in the given queue.
Queue Management
Creates a new queue.
Deletes a queue.
Deletes a queue using an explicit driver.
Creates a queue or updates it if it already exists.
Gets the configuration of a queue.
Returns current queue information.
Lists all queues.
Lists queues in storage that are not in the runtime configuration.
Pauses a queue.
Resumes a paused queue.
Updates the configuration of a queue.
Types
-type age() :: non_neg_integer() | infinity.
An age in milliseconds.
-type backoff() :: non_neg_integer() | [non_neg_integer()].
Retry backoff strategy.
-type interval() :: pos_integer() | infinity.
An interval in milliseconds.
-type job() :: #{id := job_id(), queue := queue(), payload := term(), state := job_state(), attempt := non_neg_integer(), max_attempts := max_attempts(), priority := priority(), timeout := timeout_ms(), backoff := backoff(), shutdown_timeout := shutdown_timeout(), result => term(), scheduled_at => timestamp(), created_at := timestamp(), attempted_at => timestamp(), completed_at => timestamp(), cancelled_at => timestamp(), failed_at => timestamp(), errors := [job_error()]}.
A job.
-type job_error() :: #{attempt := non_neg_integer(), error := term(), at := timestamp()}.
A recorded execution error.
-type job_filter() :: #{state => job_state()}.
Filter options for listing jobs.
-type job_id() :: keysmith:uuid().
Unique job identifier.
-type job_opts() :: #{queue => queue(), max_attempts => max_attempts(), priority => priority(), timeout => timeout_ms(), backoff => backoff(), shutdown_timeout => shutdown_timeout(), scheduled_at => timestamp()}.
Per-job options at insert time.
-type job_state() :: available | executing | completed | cancelled | failed.
Possible states of a job.
Maximum age per job state, in milliseconds.
-type max_attempts() :: pos_integer().
Maximum execution attempts for a job.
-type max_workers() :: pos_integer() | infinity.
Maximum number of concurrent workers.
-type priority() :: integer().
Job priority. Higher values are processed first.
Pruning configuration for a queue.
When set, a per-queue pruner process periodically deletes jobs in terminal
states older than the configured max_age (in milliseconds).
-type queue() :: atom().
Queue identifier.
-type queue_conf() :: #{name := queue(), driver => gaffer_driver:driver(), worker := gaffer_worker:worker(), global_max_workers => max_workers(), max_workers => max_workers(), poll_interval => interval(), shutdown_timeout => shutdown_timeout(), max_attempts => max_attempts(), timeout => timeout_ms(), backoff => backoff(), priority => priority(), on_discard => queue(), hooks => [gaffer_hooks:hook()], prune => prune_conf()}.
Queue configuration.
-type queue_info() :: #{status := active | paused, jobs := #{available := state_info(), executing := state_info(), completed := state_info(), cancelled := state_info(), failed := state_info()}, workers := #{active := non_neg_integer(), max := #{local := max_workers(), global := max_workers()}}}.
Information about a queue.
-type shutdown_timeout() :: pos_integer().
Grace period for worker shutdown in milliseconds.
-type state_info() :: #{count := non_neg_integer(), oldest => timestamp(), newest => timestamp()}.
Information about a job state.
-type timeout_ms() :: pos_integer().
Execution timeout in milliseconds.
-type timestamp() :: integer() | {erlang:time_unit(), integer()}.
Job timestamp.
An erlang:system_time/0 integer or a {Unit, Value} pair.
Job Management
Cancels a job, preventing further execution.
Deletes a job.
-spec drain(queue()) -> ok.
Equivalent to drain(Queue, 5000).
Waits for the active workers to finish their jobs.
-spec flush(queue()) -> ok.
Equivalent to flush(Queue, infinity).
Waits for all jobs in the queue to finish.
Gets the definition of a job.
Equivalent to insert(Queue, Payload, #{}).
Inserts a job into a queue.
Lists all jobs in the given queue.
-spec list(queue(), job_filter()) -> [job()].
Lists jobs in the given queue matching the filter options.
Triggers an immediate prune of stale jobs in the given queue.
Queue Management
-spec create_queue(queue_conf()) -> ok | {error, already_exists}.
Creates a new queue.
-spec delete_queue(queue()) -> ok.
Deletes a queue.
-spec delete_queue(queue(), gaffer_driver:driver()) -> ok.
Deletes a queue using an explicit driver.
Also works for orphaned queues not initialized at runtime, as returned by
orphaned_queues/1.
-spec ensure_queue(queue_conf()) -> ok.
Creates a queue or updates it if it already exists.
-spec get_queue(queue()) -> queue_conf().
Gets the configuration of a queue.
-spec info(queue()) -> queue_info().
Returns current queue information.
-spec list_queues() -> [{queue(), queue_conf()}].
Lists all queues.
-spec orphaned_queues(gaffer_driver:driver()) -> [queue()].
Lists queues in storage that are not in the runtime configuration.
-spec pause(queue()) -> ok | {error, already_paused}.
Pauses a queue.
Running jobs continue to completion. While paused, the queue does not
claim new jobs and pruning is suspended until resume/1 is called.
Explicit calls to prune/1 still run while the queue is paused.
Returns {error, already_paused} if the queue is already paused.
-spec resume(queue()) -> ok | {error, already_active}.
Resumes a paused queue.
Job claiming and pruning resume normally.
Returns {error, already_active} if the queue is not currently paused.
Updates the configuration of a queue.