Intro
Kathikon is a BEAM-native durable job queue. Jobs live in Mnesia and run through explicit states: scheduled → available → executing → completed | retryable | discarded | cancelled.
Run the setup cells top-to-bottom, then try each demo section.
Setup
Install & configure
Livebook runs on a named node without a Mnesia disc directory — use mnesia_copies: :ram (also auto-detected when the node name contains livebook).
Use env: :prod so Mix.install only pulls runtime deps (:telemetry). env: :dev would require dev-only tools (ex_doc, sobelow, …) that are not fetched in Livebook.
Mix.install(
[
{:kathikon, path: Path.expand("..", __DIR__), env: :prod},
{:kino, "~> 0.19.0"},
{:kino_vega_lite, "~> 0.1.11"}
],
config: [
kathikon: [
mnesia_copies: :ram,
poll_interval: 150,
scheduler_interval: 400,
prune_interval: 2_000,
retention_period: 3_000,
max_attempts: 3,
queues: [
default: [concurrency: 2],
emails: [concurrency: 1],
priority: [concurrency: 1]
]
]
]
)Reset demo jobs
Mix.install starts Kathikon (and Mnesia) automatically.
:ok = Kathikon.Storage.clear_jobs!()Activity log
defmodule Demo.Log do
@name __MODULE__
def start, do: Agent.start_link(fn -> [] end, name: @name)
def append(line), do: Agent.update(@name, &(&1 ++ [line]))
def entries, do: Agent.get(@name, & &1)
def clear, do: Agent.update(@name, fn _ -> [] end)
end
{:ok, _} = Demo.Log.start()Helpers
defmodule Demo do
def await(job_id, state, timeout \\ 10_000) do
deadline = System.monotonic_time(:millisecond) + timeout
poll(job_id, state, deadline)
end
defp poll(job_id, state, deadline) do
case Kathikon.fetch(job_id) do
{:ok, %{state: ^state} = job} ->
{:ok, job}
{:ok, _} ->
if System.monotonic_time(:millisecond) > deadline do
{:error, :timeout}
else
Process.sleep(100)
poll(job_id, state, deadline)
end
other ->
other
end
end
def job_table(jobs) do
jobs
|> Enum.map(fn j ->
%{
id: String.slice(j.id, 0, 8),
queue: j.queue,
state: j.state,
attempts: j.attempts,
priority: j.priority
}
end)
|> Kino.DataTable.new()
end
def dispatcher(queue) do
GenServer.whereis({:via, Registry, {Kathikon.Registry, {:dispatcher, queue}}})
end
endWorkers
SendEmailWorker
defmodule Demo.SendEmailWorker do
use Kathikon.Worker
@impl true
def perform(%{args: %{"to" => to, "subject" => subject}}) do
Process.sleep(250)
Demo.Log.append("Sent \"#{subject}\" to #{to}")
:ok
end
endFlakyPaymentWorker
Fails once, then succeeds — shows retries.
defmodule Demo.FlakyPaymentWorker do
use Kathikon.Worker
@impl true
def perform(job) do
if job.attempts + 1 >= 2 do
Demo.Log.append("Payment captured")
:ok
else
Demo.Log.append("Payment gateway timeout")
{:error, :timeout}
end
end
endAlwaysFailWorker
defmodule Demo.AlwaysFailWorker do
use Kathikon.Worker
@impl true
def perform(_job), do: {:error, :declined}
endThumbnailWorker
Slow jobs — good for watching concurrency.
defmodule Demo.ThumbnailWorker do
use Kathikon.Worker
@impl true
def perform(%{args: %{"asset" => asset}}) do
Demo.Log.append("Thumbnail started: #{asset}")
Process.sleep(600)
Demo.Log.append("Thumbnail done: #{asset}")
:ok
end
endFlashSaleWorker
defmodule Demo.FlashSaleWorker do
use Kathikon.Worker
@impl true
def perform(%{args: %{"label" => label}}) do
Demo.Log.append("Flash sale: #{label}")
:ok
end
endAsyncStore
Simulates a 3rd-party API that returns :pending until the async job is ready.
defmodule Demo.AsyncStore do
@name __MODULE__
@ready_after 3
def start, do: Agent.start_link(fn -> %{} end, name: @name)
def status(request_url) do
Agent.get_and_update(@name, fn state ->
polls = Map.get(state, request_url, 0) + 1
new_state = Map.put(state, request_url, polls)
result =
if polls >= @ready_after do
{:ok, %{"status" => "ready", "url" => request_url}}
else
:pending
end
{result, new_state}
end)
end
def forget(request_url), do: Agent.update(@name, &Map.delete(&1, request_url))
def reset!, do: Agent.update(@name, fn _ -> %{} end)
end
{:ok, _} = Demo.AsyncStore.start()AsyncRequestWorker
Typical {:sleep, seconds} use case: poll an external endpoint until the result is ready.
defmodule Demo.AsyncRequestWorker do
use Kathikon.Worker
@poll_interval 3
@impl true
def perform(%{args: %{"request_url" => request_url}}) do
case Demo.AsyncStore.status(request_url) do
{:ok, data} ->
Demo.Log.append("Data received — processing #{inspect(data)}")
Demo.AsyncStore.forget(request_url)
:ok
:pending ->
Demo.Log.append("Poll #{request_url}: still pending")
{:sleep, @poll_interval}
end
end
endExample scenarios
| Scenario | Worker | Features |
|---|---|---|
| Welcome email | SendEmailWorker | insert, execute |
| Email queue | SendEmailWorker | queue: :emails |
| Weekly digest | SendEmailWorker | schedule_in |
| Flash sale | FlashSaleWorker | priority |
| Payment retry | FlakyPaymentWorker | backoff |
| Async poll | AsyncRequestWorker | {:sleep, seconds} |
| Dead letter | AlwaysFailWorker | max_attempts |
| Cancel newsletter | SendEmailWorker | cancel/1 |
| Thumbnails | ThumbnailWorker | concurrency |
1. Welcome email
Demo.Log.clear()
{:ok, job} =
Kathikon.insert(Demo.SendEmailWorker, %{
"to" => "ada@example.com",
"subject" => "Welcome"
})
{:ok, done} = Demo.await(job.id, :completed)
{job.state, done.attempts}Demo.job_table(Kathikon.all())2. Dedicated email queue
:ok = Kathikon.start_queue(:emails)
{:ok, job} =
Kathikon.insert(
Demo.SendEmailWorker,
%{"to" => "grace@example.com", "subject" => "Password reset"},
queue: :emails
)
Demo.await(job.id, :completed)3. Scheduled digest
Insert, wait for the scheduler to promote, then wait for completion — all in one cell so job stays in scope.
{:ok, job} =
Kathikon.insert(
Demo.SendEmailWorker,
%{"to" => "linus@example.com", "subject" => "Weekly digest"},
schedule_in: 3
)
IO.inspect(job.state, label: "initial")
Process.sleep(500)
case Kathikon.fetch(job.id) do
{:ok, promoted} ->
IO.inspect(promoted.state, label: "after scheduler tick")
Demo.await(job.id, :completed)
{:error, :not_found} ->
{:error, :not_found}
end4. Flash sale priority
:ok = Kathikon.start_queue(:priority)
Demo.Log.clear()
for {label, pri} <- [{"VIP", 10}, {"General", 1}] do
Kathikon.insert(Demo.FlashSaleWorker, %{"label" => label},
queue: :priority,
priority: pri,
schedule_in: 1
)
end
Process.sleep(1_500)
Demo.Log.entries()5. Payment retry
~5 seconds between attempts (exponential backoff).
Demo.Log.clear()
{:ok, job} =
Kathikon.insert(Demo.FlakyPaymentWorker, %{"invoice" => "INV-42"}, max_attempts: 3)
{:ok, done} = Demo.await(job.id, :completed, 20_000)
{done.attempts, done.errors}6. Async polling
Poll a 3rd-party endpoint until the async result is ready. While status is :pending, return {:sleep, seconds} — the job moves to :scheduled and runs again later without incrementing attempts or appending to errors (contrast with Payment retry above). Production intervals are often much longer (e.g. {:sleep, 360}); we use 3s here so the demo finishes quickly.
Demo.Log.clear()
Demo.AsyncStore.reset!()
{:ok, job} =
Kathikon.insert(Demo.AsyncRequestWorker, %{
"request_url" => "https://api.example.com/jobs/abc-123"
})
Process.sleep(400)
{:ok, deferred} = Kathikon.fetch(job.id)
{deferred.state, deferred.attempts, deferred.errors}{:ok, done} = Demo.await(job.id, :completed, 20_000)
{done.attempts, done.errors, Demo.Log.entries()}7. Discard
{:ok, job} =
Kathikon.insert(Demo.AlwaysFailWorker, %{"card" => "4111"}, max_attempts: 2)
{:ok, done} = Demo.await(job.id, :discarded, 25_000)
{done.state, done.errors}8. Cancel
{:ok, job} =
Kathikon.insert(
Demo.SendEmailWorker,
%{"to" => "unsub@example.com", "subject" => "Newsletter"},
schedule_in: 60
)
{:ok, cancelled} = Kathikon.cancel(job.id)
Kathikon.cancel(job.id)9. Concurrency
Demo.Log.clear()
for asset <- ["a.png", "b.png", "c.png"] do
Kathikon.insert(Demo.ThumbnailWorker, %{"asset" => asset})
end
Process.sleep(2_500)
Demo.Log.entries()10. Pruning
Terminal jobs are removed after retention_period (3s here).
{:ok, job} =
Kathikon.insert(Demo.SendEmailWorker, %{"to" => "x@y.com", "subject" => "Temp"})
{:ok, _} = Demo.await(job.id, :completed)
Process.sleep(6_000)
Kathikon.fetch(job.id)11. Inspect jobs
Demo.job_table(Kathikon.all())Kathikon.all() |> Enum.frequencies_by(& &1.state)Cleanup
Kathikon.Storage.clear_jobs!()
Demo.Log.clear()
Demo.AsyncStore.reset!()