Each queue has its own dispatcher process that claims jobs from Mnesia and runs them with bounded concurrency.
Configuring queues
# config/config.exs
config :kathikon,
queues: [
default: [concurrency: 10],
emails: [concurrency: 5],
thumbnails: [concurrency: 2]
]On application start, Kathikon.Queue.start_configured/0 starts a dispatcher for each configured queue.
Enqueueing to a specific queue
{:ok, job} =
Kathikon.insert(MyApp.Workers.SendEmail, %{"to" => "a@b.com"},
queue: :emails
)Kathikon.insert/3 calls Kathikon.Queue.ensure_started/1, so dispatchers for ad-hoc queues are started on first insert even if not listed in config (they get default concurrency 10).
Starting a queue manually
:ok = Kathikon.start_queue(:reports)Useful when adding queues at runtime before Phase 3 dynamic queue management.
How concurrency works
Each dispatcher:
- Polls Mnesia every
poll_intervalms (default200) - Atomically claims up to
concurrency - runningjobs - Spawns a
Taskper job to callworker.perform/1 - Updates job state when the task finishes
# Two thumbnail jobs can run in parallel; a third waits
config :kathikon,
queues: [thumbnails: [concurrency: 2]]Claim ordering
When multiple jobs are available on the same queue:
- Higher
priorityfirst - Earlier
available_atas tiebreaker
Kathikon.insert(Worker, %{}, priority: 10) # claimed before priority: 1See Scheduling for priority with scheduled jobs.
Isolation between queues
Queues are failure domains:
- A crashed dispatcher restarts via
DynamicSupervisorwithout affecting other queues - Slow work on
:thumbnailsdoes not block:emails - Each queue has independent concurrency limits
Registry lookup
Dispatchers register as {:dispatcher, queue_name} in Kathikon.Registry:
GenServer.whereis({:via, Registry, {Kathikon.Registry, {:dispatcher, :emails}}})