gaffer (gaffer v0.4.1)

View Source

Main API for managing job queues.

Summary

Types

An age in milliseconds.

Retry backoff strategy.

An interval in milliseconds.

A job.

A recorded execution error.

Filter options for listing jobs.

Unique job identifier.

Per-job options at insert time.

Possible states of a job.

Maximum age per job state, in milliseconds.

Maximum execution attempts for a job.

Maximum number of concurrent workers.

Job priority. Higher values are processed first.

Pruning configuration for a queue.

Queue identifier.

Queue configuration.

Information about a queue.

Grace period for worker shutdown in milliseconds.

Information about a job state.

Execution timeout in milliseconds.

Job timestamp.

Job Management

Cancels a job, preventing further execution.

Deletes a job.

Waits for the active workers to finish their jobs.

Waits for all jobs in the queue to finish.

Gets the definition of a job.

Inserts a job into a queue.

Lists all jobs in the given queue.

Lists jobs in the given queue matching the filter options.

Triggers an immediate prune of stale jobs in the given queue.

Queue Management

Creates a new queue.

Deletes a queue.

Deletes a queue using an explicit driver.

Creates a queue or updates it if it already exists.

Gets the configuration of a queue.

Returns current queue information.

Lists all queues.

Lists queues in storage that are not in the runtime configuration.

Pauses a queue.

Resumes a paused queue.

Updates the configuration of a queue.

Types

age()

-type age() :: non_neg_integer() | infinity.

An age in milliseconds.

backoff()

-type backoff() :: non_neg_integer() | [non_neg_integer()].

Retry backoff strategy.

interval()

-type interval() :: pos_integer() | infinity.

An interval in milliseconds.

job()

-type job() ::
          #{id := job_id(),
            queue := queue(),
            payload := term(),
            state := job_state(),
            attempt := non_neg_integer(),
            max_attempts := max_attempts(),
            priority := priority(),
            timeout := timeout_ms(),
            backoff := backoff(),
            shutdown_timeout := shutdown_timeout(),
            result => term(),
            scheduled_at => timestamp(),
            created_at := timestamp(),
            attempted_at => timestamp(),
            completed_at => timestamp(),
            cancelled_at => timestamp(),
            failed_at => timestamp(),
            errors := [job_error()]}.

A job.

job_error()

-type job_error() :: #{attempt := non_neg_integer(), error := term(), at := timestamp()}.

A recorded execution error.

job_filter()

-type job_filter() :: #{state => job_state()}.

Filter options for listing jobs.

job_id()

-type job_id() :: keysmith:uuid().

Unique job identifier.

job_opts()

-type job_opts() ::
          #{queue => queue(),
            max_attempts => max_attempts(),
            priority => priority(),
            timeout => timeout_ms(),
            backoff => backoff(),
            shutdown_timeout => shutdown_timeout(),
            scheduled_at => timestamp()}.

Per-job options at insert time.

job_state()

-type job_state() :: available | executing | completed | cancelled | failed.

Possible states of a job.

max_age()

-type max_age() :: #{job_state() | '_' => age()}.

Maximum age per job state, in milliseconds.

max_attempts()

-type max_attempts() :: pos_integer().

Maximum execution attempts for a job.

max_workers()

-type max_workers() :: pos_integer() | infinity.

Maximum number of concurrent workers.

priority()

-type priority() :: integer().

Job priority. Higher values are processed first.

prune_conf()

-type prune_conf() :: #{interval := interval(), max_age => max_age()}.

Pruning configuration for a queue.

When set, a per-queue pruner process periodically deletes jobs in terminal states older than the configured max_age (in milliseconds).

queue()

-type queue() :: atom().

Queue identifier.

queue_conf()

-type queue_conf() ::
          #{name := queue(),
            driver => gaffer_driver:driver(),
            worker := gaffer_worker:worker(),
            global_max_workers => max_workers(),
            max_workers => max_workers(),
            poll_interval => interval(),
            shutdown_timeout => shutdown_timeout(),
            max_attempts => max_attempts(),
            timeout => timeout_ms(),
            backoff => backoff(),
            priority => priority(),
            on_discard => queue(),
            hooks => [gaffer_hooks:hook()],
            prune => prune_conf()}.

Queue configuration.

queue_info()

-type queue_info() ::
          #{status := active | paused,
            jobs :=
                #{available := state_info(),
                  executing := state_info(),
                  completed := state_info(),
                  cancelled := state_info(),
                  failed := state_info()},
            workers :=
                #{active := non_neg_integer(),
                  max := #{local := max_workers(), global := max_workers()}}}.

Information about a queue.

shutdown_timeout()

-type shutdown_timeout() :: pos_integer().

Grace period for worker shutdown in milliseconds.

state_info()

-type state_info() :: #{count := non_neg_integer(), oldest => timestamp(), newest => timestamp()}.

Information about a job state.

timeout_ms()

-type timeout_ms() :: pos_integer().

Execution timeout in milliseconds.

timestamp()

-type timestamp() :: integer() | {erlang:time_unit(), integer()}.

Job timestamp.

An erlang:system_time/0 integer or a {Unit, Value} pair.

Job Management

cancel(Queue, ID)

-spec cancel(queue(), job_id()) -> {ok, job()} | {error, {invalid_transition, term()}}.

Cancels a job, preventing further execution.

delete(Queue, ID)

-spec delete(queue(), job_id()) -> ok.

Deletes a job.

drain(Queue)

-spec drain(queue()) -> ok.

Equivalent to drain(Queue, 5000).

drain(Queue, Timeout)

-spec drain(queue(), timeout()) -> ok.

Waits for the active workers to finish their jobs.

flush(Queue)

-spec flush(queue()) -> ok.

Equivalent to flush(Queue, infinity).

flush(Queue, Timeout)

-spec flush(queue(), timeout()) -> ok.

Waits for all jobs in the queue to finish.

get(Queue, ID)

-spec get(queue(), job_id()) -> job().

Gets the definition of a job.

insert(Queue, Payload)

-spec insert(queue(), term()) -> job().

Equivalent to insert(Queue, Payload, #{}).

insert(Queue, Payload, Opts)

-spec insert(queue(), term(), job_opts()) -> job().

Inserts a job into a queue.

list(Queue)

-spec list(queue()) -> [job()].

Lists all jobs in the given queue.

list(Queue, Filters)

-spec list(queue(), job_filter()) -> [job()].

Lists jobs in the given queue matching the filter options.

prune(Queue)

-spec prune(queue()) -> [job_id()].

Triggers an immediate prune of stale jobs in the given queue.

Queue Management

create_queue(Conf)

-spec create_queue(queue_conf()) -> ok | {error, already_exists}.

Creates a new queue.

delete_queue(Name)

-spec delete_queue(queue()) -> ok.

Deletes a queue.

delete_queue(Name, Driver)

-spec delete_queue(queue(), gaffer_driver:driver()) -> ok.

Deletes a queue using an explicit driver.

Also works for orphaned queues not initialized at runtime, as returned by orphaned_queues/1.

ensure_queue(Conf)

-spec ensure_queue(queue_conf()) -> ok.

Creates a queue or updates it if it already exists.

get_queue(Name)

-spec get_queue(queue()) -> queue_conf().

Gets the configuration of a queue.

info(Queue)

-spec info(queue()) -> queue_info().

Returns current queue information.

list_queues()

-spec list_queues() -> [{queue(), queue_conf()}].

Lists all queues.

orphaned_queues(Driver)

-spec orphaned_queues(gaffer_driver:driver()) -> [queue()].

Lists queues in storage that are not in the runtime configuration.

pause(Queue)

-spec pause(queue()) -> ok | {error, already_paused}.

Pauses a queue.

Running jobs continue to completion. While paused, the queue does not claim new jobs and pruning is suspended until resume/1 is called. Explicit calls to prune/1 still run while the queue is paused.

Returns {error, already_paused} if the queue is already paused.

resume(Queue)

-spec resume(queue()) -> ok | {error, already_active}.

Resumes a paused queue.

Job claiming and pruning resume normally.

Returns {error, already_active} if the queue is not currently paused.

update_queue(Name, Updates)

-spec update_queue(queue(), map()) -> ok.

Updates the configuration of a queue.