Public API and runtime modules for Kathikon v0.1.0. For narrative guides see the documentation index or quick start.


Kathikon

Main entry point. Enqueue, cancel, and inspect jobs.

insert/3

@spec insert(module(), map(), keyword()) :: {:ok, Job.t()} | {:error, term()}

Enqueues a job. Starts the target queue dispatcher if needed. Emits [:kathikon, :job, :insert].

Options

OptionDefaultDescription
:queue:defaultTarget queue atom
:priority0Higher values claim first
:max_attemptsKathikon.Config.max_attempts()Retry limit
:schedule_inSeconds until available
:schedule_atDateTime when job becomes available

Examples

# Immediate job
{:ok, job} = Kathikon.insert(MyWorker, %{"id" => "1"})

# Email queue, high priority
{:ok, job} =
  Kathikon.insert(EmailWorker, %{"to" => "a@b.com"},
    queue: :emails,
    priority: 5
  )

# Delayed job
{:ok, job} =
  Kathikon.insert(DigestWorker, %{},
    schedule_in: 3600
  )

# Fixed time
{:ok, job} =
  Kathikon.insert(ReportWorker, %{},
    schedule_at: ~U[2026-12-25 09:00:00Z]
  )

cancel/1

@spec cancel(String.t()) :: {:ok, Job.t()} | {:error, term()}
{:ok, job} = Kathikon.cancel("abc123...")
# {:error, :executing}
# {:error, {:invalid_state, :completed}}

fetch/1

@spec fetch(String.t()) :: {:ok, Job.t()} | {:error, :not_found}
{:ok, job} = Kathikon.fetch(job_id)
job.state
job.attempts
job.errors

all/0

@spec all() :: [Job.t()]

Returns every job in storage. Intended for tests and debugging.

Kathikon.all()
|> Enum.count(&(&1.state == :completed))

start_queue/1

@spec start_queue(atom()) :: :ok

Ensures a dispatcher is running for the queue.

:ok = Kathikon.start_queue(:imports)

Kathikon.Job

Struct representing a durable job obligation.

Fields

FieldTypeDescription
idString.t()Unique hex id
queueatom()Target queue
workermodule()Worker module
argsmap()Arguments for perform/1
stateatomCurrent lifecycle state
prioritynon_neg_integer()Claim ordering
max_attemptspos_integer()Retry ceiling
attemptsnon_neg_integer()Completed attempts
scheduled_atDateTime | nilWhen scheduled job becomes due
available_atDateTime | nilWhen job can be claimed
inserted_atDateTime | nilInsert time
started_atDateTime | nilLast claim time
completed_atDateTime | nilSuccess or discard time
cancelled_atDateTime | nilCancel time
errors[map()]Failure history

States

:scheduled:available:executing:completed | :retryable | :discarded | :cancelled

build/3

@spec build(module(), map(), keyword()) :: t()

Low-level job construction. Prefer Kathikon.insert/3.

job = Kathikon.Job.build(MyWorker, %{"x" => 1}, queue: :default, priority: 2)

claimable?/2

@spec claimable?(t(), DateTime.t()) :: boolean()
Kathikon.Job.claimable?(job, DateTime.utc_now())
# true when state in [:available, :retryable] and available_at <= now

backoff_seconds/1

Kathikon.Job.backoff_seconds(2)  # 20

Kathikon.Worker

Behaviour for job workers.

Callback

@callback perform(Kathikon.Job.t()) :: :ok | {:error, term()} | {:sleep, pos_integer()}

Example

defmodule MyApp.Workers.Webhook do
  use Kathikon.Worker

  @impl true
  def perform(%{args: %{"url" => url, "body" => body}}) do
    case MyApp.HTTP.post(url, body) do
      {:ok, _} -> :ok
      {:error, reason} -> {:error, reason}
    end
  end
end

Kathikon.Config

Runtime configuration readers. Set values in config :kathikon, ....

FunctionReturns
queues/0Keyword of queue configs
queue_names/0[atom()]
queue_config/1Keyword for one queue
concurrency/1Max tasks for queue
poll_interval/0Dispatcher poll ms
scheduler_interval/0Scheduler tick ms
prune_interval/0Pruner tick ms
retention_period/0Terminal job retention ms
max_attempts/0Default retry limit
mnesia_copies/0:ram or :disc
Kathikon.Config.concurrency(:emails)  # 5

Kathikon.Storage

Facade over Kathikon.Backend.Storage. Used internally; embedders may call setup/0.

FunctionDescription
setup/0Create Mnesia schema and tables
clear_jobs!/0Delete all jobs (tests)
reset!/0Drop and recreate tables (tests)
insert/1, update/1, fetch/1, delete/1Job CRUD
claim/2Atomic queue claim
promote_scheduled/1Batch schedule promotion
all/0List jobs
backend/0, backend/1Get/set backend module
:ok = Kathikon.Storage.setup()

Kathikon.Telemetry

attach_default_logger/0

Attaches a logger handler for all Kathikon events.

Kathikon.Telemetry.attach_default_logger()

Events

See Telemetry guide.


Runtime processes

These modules are started by Kathikon.Application. You typically do not call them directly.

Kathikon.Application

OTP application callback. Calls Storage.setup/0 and starts the supervision tree.

Kathikon.Queue

DynamicSupervisor for per-queue dispatchers.

FunctionDescription
ensure_started/1Start dispatcher for queue
start_configured/0Start all configured queues

Kathikon.Dispatcher

GenServer — polls Mnesia, claims jobs, spawns Task workers.

Registered as {:dispatcher, queue} in Kathikon.Registry.

start_link/1 options: :queue, :config, :poll_interval, :storage (defaults to Kathikon.Storage).

Kathikon.Scheduler

GenServer — ticks on scheduler_interval, calls storage.promote_scheduled/1.

Registered as Kathikon.Scheduler when started by the application.

start_link/1 options: :interval, :storage, :name (false for unnamed test instances).

Kathikon.Pruner

GenServer — deletes terminal jobs past retention_period.

Registered as Kathikon.Pruner when started by the application.

start_link/1 options: :interval, :storage, :name (false for unnamed test instances).


Placeholders (not implemented)

Kathikon.Cron

Kathikon.Cron.insert(worker, args, opts)
# {:error, :not_implemented}  — Phase 3

Kathikon.Lifeline

Kathikon.Lifeline.start_link()
# {:error, :not_implemented}  — Phase 2

Internal (extension points)

ModuleRole
Kathikon.Backend.StorageStorage behaviour
Kathikon.Backend.Storage.MnesiaMnesia implementation

Configure via storage_backend: Kathikon.Backend.Storage.Mnesia.