Workers are modules that implement Kathikon.Worker. The dispatcher calls perform/1 when a job is claimed.
Defining a worker
defmodule MyApp.Workers.ChargeCustomer do
use Kathikon.Worker
@impl true
def perform(%Kathikon.Job{args: %{"invoice_id" => id}} = job) do
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# `job` includes id, queue, attempts, priority, timestamps, errors
case MyApp.Billing.charge(id) do
:ok -> :ok
{:error, :card_declined} -> {:error, :card_declined}
end
end
enduse Kathikon.Worker sets @behaviour Kathikon.Worker. You must implement perform/1.
Return values
| Return | Effect |
|---|---|
:ok | Job → :completed |
{:sleep, seconds} | Job → :scheduled for seconds; not a failure (no attempts or errors) |
{:error, reason} | Job → :retryable with backoff, or :discarded if max_attempts exceeded |
# Success
:ok
# Defer without failure — scheduler promotes back to :available when due
{:sleep, 60}
{:sleep, 300}
# Transient failure — will retry (increments attempts, records error)
{:error, :timeout}
{:error, %{status: 503}}
# Any term is accepted as the error reason
{:error, "payment gateway unavailable"}{:sleep, seconds} requires a positive integer. Invalid values are treated as {:error, {:invalid_sleep, value}}.
Exceptions and throws
Uncaught exceptions and throws are converted to {:error, reason} internally:
def perform(_job), do: raise("boom") # → retry as {:exception, ...}
def perform(_job), do: throw(:cancelled) # → retry as {:throw, :cancelled}Design workers to return {:error, reason} for expected failures; reserve exceptions for programmer errors.
Enqueueing work for a worker
{:ok, job} =
Kathikon.insert(MyApp.Workers.ChargeCustomer, %{"invoice_id" => "INV-99"},
queue: :billing,
max_attempts: 5,
priority: 3
)Args must be a map (JSON-serializable values work best for future cross-node support).
Idempotency
Kathikon provides at-least-once execution. A worker may run more than once for the same job id (retries, future lease recovery). Make perform/1 safe to repeat:
def perform(%{args: %{"order_id" => id}}) do
# Good: billing API keyed by order_id is idempotent
MyApp.Billing.capture_once(id)
:ok
endIO-bound vs CPU-bound work
- IO-bound (HTTP, email, DB): higher concurrency per queue is fine.
- CPU-bound (PDF, image processing): use a dedicated queue with low concurrency.
See Queues & concurrency.
Related
Kathikon.Workermodule — behaviour definition- Retries & errors
- Module reference: Kathikon.Worker