gabsurd/worker

OTP Worker actor for the Absurd durable workflow system. Polls a queue for tasks, dispatches to registered handlers, and completes/fails tasks based on handler results.

Distributed systems behaviour

Types

Worker configuration. Create with new() and customize with with_* functions.

pub type Config {
  Config(
    db: client.Db,
    queue_name: String,
    worker_id: String,
    poll_interval: Int,
    claim_timeout: Int,
    batch_size: Int,
    max_backoff: Int,
    handlers: List(Handler),
  )
}

Constructors

  • Config(
      db: client.Db,
      queue_name: String,
      worker_id: String,
      poll_interval: Int,
      claim_timeout: Int,
      batch_size: Int,
      max_backoff: Int,
      handlers: List(Handler),
    )

A handler for a specific task type.

Define one of these for each kind of task your workers should process. The worker dispatches claimed tasks to the handler matching their task_name.

Example

let email_handler = worker.Handler(
  task_name: "send_email",
  execute: fn(ctx) {
    // ... send email using ctx.params(ctx) ...
    Complete(json.object([#("sent", json.bool(True))]))
  },
  on_error: option.None,
)
pub type Handler {
  Handler(
    task_name: String,
    execute: fn(context.Context) -> HandlerResult,
    on_error: option.Option(fn(context.Context, json.Json) -> Nil),
  )
}

Constructors

  • Handler(
      task_name: String,
      execute: fn(context.Context) -> HandlerResult,
      on_error: option.Option(fn(context.Context, json.Json) -> Nil),
    )

    Arguments

    task_name

    The task_name this handler responds to.

    execute

    Execute the task. Receives a Context with the claim details, database connection, and helper methods. Return Complete, Fail, or Suspend.

    on_error

    Optional hook called when execute returns Fail. Receives the context and the error json. Use for logging/metrics.

The result of a handler’s execute function.

Return Complete(result) to mark the task as successfully done. Return Fail(reason) to mark the task as failed. Return Suspend when the task is waiting for an event — the worker will skip the complete/fail call so the run stays in sleeping state until emit_event wakes it up.

pub type HandlerResult {
  Complete(json.Json)
  Fail(json.Json)
  Suspend
}

Constructors

Messages the worker actor accepts.

pub type Message {
  Poll
  Shutdown
}

Constructors

  • Poll
  • Shutdown

A running worker actor handle.

pub type Worker {
  Worker(subject: process.Subject(Message))
}

Constructors

Values

pub fn child_spec(
  name: String,
  config: Config,
) -> supervision.ChildSpecification(Worker)

Create a child spec for adding to a static_supervisor.

pub fn new(
  db: client.Db,
  queue_name: String,
  handlers: List(Handler),
) -> Config

Create a new worker config with defaults.

pub fn pool_child_specs(
  name: String,
  config: Config,
  count: Int,
) -> List(supervision.ChildSpecification(Worker))

Create a list of child specs for a worker pool (N workers). Each worker gets a unique worker_id incorporating a unique integer to avoid collisions between pools.

pub fn start(
  config: Config,
) -> Result(actor.Started(Worker), actor.StartError)

Start a worker actor.

pub fn stop(worker: Worker) -> Nil

Stop a worker gracefully.

pub fn with_batch_size(config: Config, size: Int) -> Config

Set the batch size (tasks claimed per poll).

pub fn with_claim_timeout(
  config: Config,
  timeout_secs: Int,
) -> Config

Set the claim timeout in seconds.

pub fn with_max_backoff(
  config: Config,
  max_backoff_ms: Int,
) -> Config

Set the maximum backoff in milliseconds for retrying after claim errors. Default: 60000 (60 seconds).

pub fn with_poll_interval(
  config: Config,
  interval_ms: Int,
) -> Config

Set the poll interval in milliseconds.

pub fn with_worker_id(
  config: Config,
  worker_id: String,
) -> Config

Set the worker ID (used as worker_id in claim_task).

Search Document