Kathikon (Kathikon v0.1.0)

Copy Markdown View Source

BEAM-native durable job queue and task execution platform.

See the README for an overview. Guides:

Quick start

Define a worker:

defmodule MyApp.EmailWorker do
  use Kathikon.Worker

  @impl true
  def perform(%Kathikon.Job{args: %{"email" => email}}) do
    MyApp.Mailer.deliver(email)
    :ok
  end
end

Enqueue a job:

{:ok, job} = Kathikon.insert(MyApp.EmailWorker, %{"email" => "a@b.com"})

Configure queues in config/config.exs:

config :kathikon,
  queues: [
    default: [concurrency: 10],
    emails: [concurrency: 5]
  ]

Job options

  • :queue — target queue (default :default)
  • :priority — higher runs first (default 0)
  • :max_attempts — retry limit (default from config, 20)
  • :schedule_in — delay in seconds before the job becomes available
  • :schedule_atDateTime when the job becomes available

Telemetry

Kathikon emits [:kathikon, ...] telemetry events for job lifecycle, scheduler ticks, and pruning. See Kathikon.Telemetry and docs/guides/telemetry-and-observability.md.

Summary

Functions

Lists all jobs currently in storage.

Cancels a job that has not yet reached a terminal success state.

Fetches a job by id.

Inserts a job into the durable queue.

Ensures a dispatcher is running for the given queue.

Functions

all()

@spec all() :: [Kathikon.Job.t()]

Lists all jobs currently in storage.

Intended for inspection, debugging, and tests — not for production dashboards.

Example

Kathikon.all()
|> Enum.filter(&(&1.state == :retryable))

cancel(job_id)

@spec cancel(String.t()) :: {:ok, Kathikon.Job.t()} | {:error, term()}

Cancels a job that has not yet reached a terminal success state.

Cancellable states

  • :scheduled, :available, :retryable — returns {:ok, job} with state: :cancelled
  • :executing — returns {:error, :executing} (Phase 1 does not interrupt running tasks)
  • :completed, :cancelled, :discarded — returns {:error, {:invalid_state, state}}

Example

{:ok, job} =
  Kathikon.insert(NewsletterWorker, %{}, schedule_in: 86_400)

{:ok, cancelled} = Kathikon.cancel(job.id)
cancelled.state  # :cancelled

fetch(job_id)

@spec fetch(String.t()) :: {:ok, Kathikon.Job.t()} | {:error, term()}

Fetches a job by id.

Example

{:ok, job} = Kathikon.fetch(job_id)
job.state
job.attempts

Returns {:error, :not_found} if the job was pruned or never existed.

insert(worker, args, opts \\ [])

@spec insert(module(), map(), keyword()) :: {:ok, Kathikon.Job.t()} | {:error, term()}

Inserts a job into the durable queue.

Starts the queue dispatcher if it is not already running. Emits [:kathikon, :job, :insert].

Options

  • :queue — target queue atom (default :default)
  • :priority — non-negative integer, higher claims first (default 0)
  • :max_attempts — retry limit (default from Kathikon.Config)
  • :schedule_in — seconds until the job becomes available
  • :schedule_atDateTime when the job becomes available

Examples

{:ok, job} = Kathikon.insert(MyWorker, %{"id" => "1"})

{:ok, job} =
  Kathikon.insert(EmailWorker, %{"to" => "u@example.com"},
    queue: :emails,
    priority: 5,
    max_attempts: 10
  )

{:ok, job} =
  Kathikon.insert(DigestWorker, %{}, schedule_in: 3600)

Returns

  • {:ok, job} — job persisted; job.state is :available or :scheduled
  • {:error, reason} — storage failure (e.g. duplicate id)

start_queue(queue)

@spec start_queue(atom()) :: :ok

Ensures a dispatcher is running for the given queue.

Called automatically by insert/3. Use directly when starting a queue before any jobs are enqueued.

Example

:ok = Kathikon.start_queue(:imports)