Durable.Queue.Poller (Durable v0.1.0-rc)

View Source

GenServer that polls a queue for jobs and starts workers to execute them.

Each poller handles a single queue and maintains:

  • A set of active job IDs being processed
  • Concurrency limits
  • Poll interval configuration

The poller monitors workers and handles their completion messages, calling ack/nack on the adapter as appropriate.

Summary

Functions

Returns a specification to start this module under a supervisor.

Drains the poller, waiting for all active jobs to complete.

Pauses the poller, stopping it from claiming new jobs.

Resumes the poller after being paused.

Starts the poller process.

Returns the current state of the poller for debugging.

Types

t()

@type t() :: %Durable.Queue.Poller{
  active_jobs: MapSet.t(String.t()),
  concurrency: pos_integer(),
  config: Durable.Config.t(),
  job_tokens: term(),
  node_id: String.t(),
  paused: boolean(),
  poll_interval: pos_integer(),
  queue_name: String.t(),
  timer_ref: reference() | nil,
  worker_refs: %{required(reference()) => String.t()},
  worker_supervisor: atom() | pid()
}

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

drain(server, timeout \\ 30000)

@spec drain(GenServer.server(), timeout()) :: :ok | {:error, :timeout}

Drains the poller, waiting for all active jobs to complete.

Returns :ok when all jobs are complete or {:error, :timeout} if the timeout is exceeded.

pause(server)

@spec pause(GenServer.server()) :: :ok

Pauses the poller, stopping it from claiming new jobs.

resume(server)

@spec resume(GenServer.server()) :: :ok

Resumes the poller after being paused.

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts the poller process.

Options

  • :config - The Durable configuration (required)
  • :queue_name - The name of the queue to poll (required)
  • :concurrency - Maximum concurrent workers (default: 10)
  • :poll_interval - Milliseconds between polls (default: 1000)
  • :worker_supervisor - The DynamicSupervisor for workers (required)

status(server)

@spec status(GenServer.server()) :: map()

Returns the current state of the poller for debugging.