BEAM-native durable job queue and task execution platform.
See the README for an overview. Guides:
Quick start
Define a worker:
defmodule MyApp.EmailWorker do
use Kathikon.Worker
@impl true
def perform(%Kathikon.Job{args: %{"email" => email}}) do
MyApp.Mailer.deliver(email)
:ok
end
endEnqueue a job:
{:ok, job} = Kathikon.insert(MyApp.EmailWorker, %{"email" => "a@b.com"})Configure queues in config/config.exs:
config :kathikon,
queues: [
default: [concurrency: 10],
emails: [concurrency: 5]
]Job options
:queue— target queue (default:default):priority— higher runs first (default0):max_attempts— retry limit (default from config,20):schedule_in— delay in seconds before the job becomes available:schedule_at—DateTimewhen the job becomes available
Telemetry
Kathikon emits [:kathikon, ...] telemetry events for job lifecycle,
scheduler ticks, and pruning. See Kathikon.Telemetry and
docs/guides/telemetry-and-observability.md.
Summary
Functions
Lists all jobs currently in storage.
Cancels a job that has not yet reached a terminal success state.
Fetches a job by id.
Inserts a job into the durable queue.
Ensures a dispatcher is running for the given queue.
Functions
@spec all() :: [Kathikon.Job.t()]
Lists all jobs currently in storage.
Intended for inspection, debugging, and tests — not for production dashboards.
Example
Kathikon.all()
|> Enum.filter(&(&1.state == :retryable))
@spec cancel(String.t()) :: {:ok, Kathikon.Job.t()} | {:error, term()}
Cancels a job that has not yet reached a terminal success state.
Cancellable states
:scheduled,:available,:retryable— returns{:ok, job}withstate: :cancelled:executing— returns{:error, :executing}(Phase 1 does not interrupt running tasks):completed,:cancelled,:discarded— returns{:error, {:invalid_state, state}}
Example
{:ok, job} =
Kathikon.insert(NewsletterWorker, %{}, schedule_in: 86_400)
{:ok, cancelled} = Kathikon.cancel(job.id)
cancelled.state # :cancelled
@spec fetch(String.t()) :: {:ok, Kathikon.Job.t()} | {:error, term()}
Fetches a job by id.
Example
{:ok, job} = Kathikon.fetch(job_id)
job.state
job.attemptsReturns {:error, :not_found} if the job was pruned or never existed.
@spec insert(module(), map(), keyword()) :: {:ok, Kathikon.Job.t()} | {:error, term()}
Inserts a job into the durable queue.
Starts the queue dispatcher if it is not already running.
Emits [:kathikon, :job, :insert].
Options
:queue— target queue atom (default:default):priority— non-negative integer, higher claims first (default0):max_attempts— retry limit (default fromKathikon.Config):schedule_in— seconds until the job becomes available:schedule_at—DateTimewhen the job becomes available
Examples
{:ok, job} = Kathikon.insert(MyWorker, %{"id" => "1"})
{:ok, job} =
Kathikon.insert(EmailWorker, %{"to" => "u@example.com"},
queue: :emails,
priority: 5,
max_attempts: 10
)
{:ok, job} =
Kathikon.insert(DigestWorker, %{}, schedule_in: 3600)Returns
{:ok, job}— job persisted;job.stateis:availableor:scheduled{:error, reason}— storage failure (e.g. duplicate id)
@spec start_queue(atom()) :: :ok
Ensures a dispatcher is running for the given queue.
Called automatically by insert/3. Use directly when starting
a queue before any jobs are enqueued.
Example
:ok = Kathikon.start_queue(:imports)