Tasque.Queue (Tasque v1.0.0)
View SourceInternal GenServer that manages the FIFO task queue, concurrency gating, and dispatch loop for a single Tasque instance.
Internal module
This module is not part of the public API. Use the functions in Tasque
to interact with the queue. The implementation details documented here
are provided for contributors and the curious.
State
The GenServer maintains the following fields:
:queue— an Erlang:queueof task entries waiting to be dispatched:pending_refs— a map of internaltask_ref => entryfor running tasks:queued_refs— a map ofcaller_ref => pidfor waiting tasks:cancelled_refs— a map ofcaller_ref => truefor tombstoned tasks:caller_to_task_ref— a map ofcaller_ref => task_reffor fast timeout lookups:max_concurrency— the upper bound onmap_size(pending_refs):task_supervisor— the registered name of the companionTask.Supervisor
Dispatch Algorithm
The private dispatch/1 function is called after every state-changing
event (enqueue, completion, crash, timeout). It greedily fills available
concurrency slots:
- If
map_size(pending_refs) >= max_concurrency, return immediately - If the queue is empty, return immediately
- Otherwise, dequeue the next entry, start it via
Task.Supervisor.async_nolink/2, record it in:pending_refs, and recurse to fill any remaining slots
Message Protocol
Tasks are started with async_nolink, so results arrive as handle_info
messages:
| Message | Meaning | Action |
|---|---|---|
{task_ref, result} | Successful completion | Deliver {:ok, result}, demonitor, free slot |
{:DOWN, task_ref, :process, _, reason} | Task crashed | Deliver {:exit, reason}, free slot |
{:tasque_timeout, caller_ref} | Per-task timeout fired | If queued, tombstone and skip during dispatch; if running, terminate task; in both cases deliver {:exit, :timeout} |
A catch-all handle_info/2 clause silently discards unexpected messages
(e.g., a late :DOWN arriving after a timeout has already handled the task).
Timeout Implementation
When a task with a :timeout option is enqueued, the queue schedules
a {:tasque_timeout, caller_ref} message to itself via
Process.send_after/3. If the task is still in the queue when the
timeout fires, it is tombstoned and skipped during dispatch. If it is
currently running, the queue attempts to terminate it and deliver a
timeout result. If the task has already completed by the time the
timeout message is handled, the normal completion or crash message wins.
If the task completes before the timer fires, the timer is cancelled with
Process.cancel_timer/1. Late timeout messages for already-completed
tasks are harmless no-ops.
Summary
Functions
Returns a specification to start this module under a supervisor.
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.