Intro

Kathikon is a BEAM-native durable job queue. Jobs live in Mnesia and run through explicit states: scheduled → available → executing → completed | retryable | discarded | cancelled.

Run the setup cells top-to-bottom, then try each demo section.

Setup

Install & configure

Livebook runs on a named node without a Mnesia disc directory — use mnesia_copies: :ram (also auto-detected when the node name contains livebook).

Use env: :prod so Mix.install only pulls runtime deps (:telemetry). env: :dev would require dev-only tools (ex_doc, sobelow, …) that are not fetched in Livebook.

Mix.install(
  [
    {:kathikon, path: Path.expand("..", __DIR__), env: :prod},
    {:kino, "~> 0.19.0"},
    {:kino_vega_lite, "~> 0.1.11"}
  ],
  config: [
    kathikon: [
      mnesia_copies: :ram,
      poll_interval: 150,
      scheduler_interval: 400,
      prune_interval: 2_000,
      retention_period: 3_000,
      max_attempts: 3,
      queues: [
        default: [concurrency: 2],
        emails: [concurrency: 1],
        priority: [concurrency: 1]
      ]
    ]
  ]
)

Reset demo jobs

Mix.install starts Kathikon (and Mnesia) automatically.

:ok = Kathikon.Storage.clear_jobs!()

Activity log

defmodule Demo.Log do
  @name __MODULE__

  def start, do: Agent.start_link(fn -> [] end, name: @name)
  def append(line), do: Agent.update(@name, &(&1 ++ [line]))
  def entries, do: Agent.get(@name, & &1)
  def clear, do: Agent.update(@name, fn _ -> [] end)
end

{:ok, _} = Demo.Log.start()

Helpers

defmodule Demo do
  def await(job_id, state, timeout \\ 10_000) do
    deadline = System.monotonic_time(:millisecond) + timeout
    poll(job_id, state, deadline)
  end

  defp poll(job_id, state, deadline) do
    case Kathikon.fetch(job_id) do
      {:ok, %{state: ^state} = job} ->
        {:ok, job}

      {:ok, _} ->
        if System.monotonic_time(:millisecond) > deadline do
          {:error, :timeout}
        else
          Process.sleep(100)
          poll(job_id, state, deadline)
        end

      other ->
        other
    end
  end

  def job_table(jobs) do
    jobs
    |> Enum.map(fn j ->
      %{
        id: String.slice(j.id, 0, 8),
        queue: j.queue,
        state: j.state,
        attempts: j.attempts,
        priority: j.priority
      }
    end)
    |> Kino.DataTable.new()
  end

  def dispatcher(queue) do
    GenServer.whereis({:via, Registry, {Kathikon.Registry, {:dispatcher, queue}}})
  end
end

Workers

SendEmailWorker

defmodule Demo.SendEmailWorker do
  use Kathikon.Worker

  @impl true
  def perform(%{args: %{"to" => to, "subject" => subject}}) do
    Process.sleep(250)
    Demo.Log.append("Sent \"#{subject}\" to #{to}")
    :ok
  end
end

FlakyPaymentWorker

Fails once, then succeeds — shows retries.

defmodule Demo.FlakyPaymentWorker do
  use Kathikon.Worker

  @impl true
  def perform(job) do
    if job.attempts + 1 >= 2 do
      Demo.Log.append("Payment captured")
      :ok
    else
      Demo.Log.append("Payment gateway timeout")
      {:error, :timeout}
    end
  end
end

AlwaysFailWorker

defmodule Demo.AlwaysFailWorker do
  use Kathikon.Worker

  @impl true
  def perform(_job), do: {:error, :declined}
end

ThumbnailWorker

Slow jobs — good for watching concurrency.

defmodule Demo.ThumbnailWorker do
  use Kathikon.Worker

  @impl true
  def perform(%{args: %{"asset" => asset}}) do
    Demo.Log.append("Thumbnail started: #{asset}")
    Process.sleep(600)
    Demo.Log.append("Thumbnail done: #{asset}")
    :ok
  end
end

FlashSaleWorker

defmodule Demo.FlashSaleWorker do
  use Kathikon.Worker

  @impl true
  def perform(%{args: %{"label" => label}}) do
    Demo.Log.append("Flash sale: #{label}")
    :ok
  end
end

AsyncStore

Simulates a 3rd-party API that returns :pending until the async job is ready.

defmodule Demo.AsyncStore do
  @name __MODULE__
  @ready_after 3

  def start, do: Agent.start_link(fn -> %{} end, name: @name)

  def status(request_url) do
    Agent.get_and_update(@name, fn state ->
      polls = Map.get(state, request_url, 0) + 1
      new_state = Map.put(state, request_url, polls)

      result =
        if polls >= @ready_after do
          {:ok, %{"status" => "ready", "url" => request_url}}
        else
          :pending
        end

      {result, new_state}
    end)
  end

  def forget(request_url), do: Agent.update(@name, &Map.delete(&1, request_url))
  def reset!, do: Agent.update(@name, fn _ -> %{} end)
end

{:ok, _} = Demo.AsyncStore.start()

AsyncRequestWorker

Typical {:sleep, seconds} use case: poll an external endpoint until the result is ready.

defmodule Demo.AsyncRequestWorker do
  use Kathikon.Worker

  @poll_interval 3

  @impl true
  def perform(%{args: %{"request_url" => request_url}}) do
    case Demo.AsyncStore.status(request_url) do
      {:ok, data} ->
        Demo.Log.append("Data received — processing #{inspect(data)}")
        Demo.AsyncStore.forget(request_url)
        :ok

      :pending ->
        Demo.Log.append("Poll #{request_url}: still pending")
        {:sleep, @poll_interval}
    end
  end
end

Example scenarios

ScenarioWorkerFeatures
Welcome emailSendEmailWorkerinsert, execute
Email queueSendEmailWorkerqueue: :emails
Weekly digestSendEmailWorkerschedule_in
Flash saleFlashSaleWorkerpriority
Payment retryFlakyPaymentWorkerbackoff
Async pollAsyncRequestWorker{:sleep, seconds}
Dead letterAlwaysFailWorkermax_attempts
Cancel newsletterSendEmailWorkercancel/1
ThumbnailsThumbnailWorkerconcurrency

1. Welcome email

Demo.Log.clear()

{:ok, job} =
  Kathikon.insert(Demo.SendEmailWorker, %{
    "to" => "ada@example.com",
    "subject" => "Welcome"
  })

{:ok, done} = Demo.await(job.id, :completed)
{job.state, done.attempts}
Demo.job_table(Kathikon.all())

2. Dedicated email queue

:ok = Kathikon.start_queue(:emails)

{:ok, job} =
  Kathikon.insert(
    Demo.SendEmailWorker,
    %{"to" => "grace@example.com", "subject" => "Password reset"},
    queue: :emails
  )

Demo.await(job.id, :completed)

3. Scheduled digest

Insert, wait for the scheduler to promote, then wait for completion — all in one cell so job stays in scope.

{:ok, job} =
  Kathikon.insert(
    Demo.SendEmailWorker,
    %{"to" => "linus@example.com", "subject" => "Weekly digest"},
    schedule_in: 3
  )

IO.inspect(job.state, label: "initial")

Process.sleep(500)

case Kathikon.fetch(job.id) do
  {:ok, promoted} ->
    IO.inspect(promoted.state, label: "after scheduler tick")
    Demo.await(job.id, :completed)

  {:error, :not_found} ->
    {:error, :not_found}
end

4. Flash sale priority

:ok = Kathikon.start_queue(:priority)
Demo.Log.clear()

for {label, pri} <- [{"VIP", 10}, {"General", 1}] do
  Kathikon.insert(Demo.FlashSaleWorker, %{"label" => label},
    queue: :priority,
    priority: pri,
    schedule_in: 1
  )
end

Process.sleep(1_500)
Demo.Log.entries()

5. Payment retry

~5 seconds between attempts (exponential backoff).

Demo.Log.clear()

{:ok, job} =
  Kathikon.insert(Demo.FlakyPaymentWorker, %{"invoice" => "INV-42"}, max_attempts: 3)

{:ok, done} = Demo.await(job.id, :completed, 20_000)
{done.attempts, done.errors}

6. Async polling

Poll a 3rd-party endpoint until the async result is ready. While status is :pending, return {:sleep, seconds} — the job moves to :scheduled and runs again later without incrementing attempts or appending to errors (contrast with Payment retry above). Production intervals are often much longer (e.g. {:sleep, 360}); we use 3s here so the demo finishes quickly.

Demo.Log.clear()
Demo.AsyncStore.reset!()

{:ok, job} =
  Kathikon.insert(Demo.AsyncRequestWorker, %{
    "request_url" => "https://api.example.com/jobs/abc-123"
  })

Process.sleep(400)

{:ok, deferred} = Kathikon.fetch(job.id)
{deferred.state, deferred.attempts, deferred.errors}
{:ok, done} = Demo.await(job.id, :completed, 20_000)
{done.attempts, done.errors, Demo.Log.entries()}

7. Discard

{:ok, job} =
  Kathikon.insert(Demo.AlwaysFailWorker, %{"card" => "4111"}, max_attempts: 2)

{:ok, done} = Demo.await(job.id, :discarded, 25_000)
{done.state, done.errors}

8. Cancel

{:ok, job} =
  Kathikon.insert(
    Demo.SendEmailWorker,
    %{"to" => "unsub@example.com", "subject" => "Newsletter"},
    schedule_in: 60
  )

{:ok, cancelled} = Kathikon.cancel(job.id)
Kathikon.cancel(job.id)

9. Concurrency

Demo.Log.clear()

for asset <- ["a.png", "b.png", "c.png"] do
  Kathikon.insert(Demo.ThumbnailWorker, %{"asset" => asset})
end

Process.sleep(2_500)
Demo.Log.entries()

10. Pruning

Terminal jobs are removed after retention_period (3s here).

{:ok, job} =
  Kathikon.insert(Demo.SendEmailWorker, %{"to" => "x@y.com", "subject" => "Temp"})

{:ok, _} = Demo.await(job.id, :completed)
Process.sleep(6_000)
Kathikon.fetch(job.id)

11. Inspect jobs

Demo.job_table(Kathikon.all())
Kathikon.all() |> Enum.frequencies_by(& &1.state)

Cleanup

Kathikon.Storage.clear_jobs!()
Demo.Log.clear()
Demo.AsyncStore.reset!()