X3m.System.Scheduler delivers a X3m.System.Message at some point in the future. Think
of it as a persistable Process.send_after/3: scheduled messages survive a restart
because you persist them, and they are dispatched through the normal
X3m.System.Dispatcher when their time comes.
The scheduler is backend-agnostic and standalone — it doesn't require aggregates or any particular store.
Defining a scheduler
use X3m.System.Scheduler and implement the callbacks that persist and load alarms:
defmodule MyApp.Scheduler do
use X3m.System.Scheduler
alias X3m.System.Message, as: SysMsg
@impl X3m.System.Scheduler
def save_alarm(%SysMsg{} = msg, aggregate_id, repo) do
repo.insert_alarm(msg, aggregate_id, msg.assigns.dispatch_at)
:ok
end
@impl X3m.System.Scheduler
def load_alarms(from, until, repo) do
{:ok, repo.alarms_between(from, until)}
end
@impl X3m.System.Scheduler
def service_responded(%SysMsg{} = msg, repo) do
repo.delete_alarm(msg.id)
:ok
end
endStart it with the state that every callback receives as its last argument — typically your repo or any context the callbacks need:
{:ok, _pid} = MyApp.Scheduler.start_link(MyApp.AlarmsRepo)(Add it to your supervision tree the same way.)
Scheduling a message
Use dispatch/3 with either a delay in: milliseconds or an absolute at:
DateTime:
msg = X3m.System.Message.new(:send_reminder, raw_request: %{"id" => "acc-1"})
# in one hour
MyApp.Scheduler.dispatch(msg, "acc-1", in: 60 * 60 * 1_000)
# at a specific time
MyApp.Scheduler.dispatch(msg, "acc-1", at: ~U[2026-01-01 09:00:00Z])The scheduler assigns the resolved :dispatch_at onto the message, calls your
save_alarm/3 so it is persisted, and arms delivery. When the time arrives the message
is sent through X3m.System.Dispatcher.dispatch/2 to its service_name.
flowchart TD
S["Scheduler.dispatch(msg, id, at:/in:)"] --> SA["save_alarm/3 (persist)"]
SA --> WIN{"due within the in-memory window?"}
WIN -->|"yes: loaded by load_alarms/3"| ARM["armed in memory"]
WIN -->|"far future"| LATER["stays persisted; loaded later / after restart"]
LATER -.->|"window reached"| ARM
ARM -->|"time arrives"| DISP["Dispatcher.dispatch/2"]
DISP --> SR["service_responded/2"]
SR -->|":ok"| DONE["delete alarm"]
SR -->|"{:retry, in_ms, msg}"| ARMPersistence and the in-memory window
Not every future alarm is kept in memory. The scheduler loads alarms in bulk every
in_memory_interval/0 milliseconds, covering the next 2 * in_memory_interval/0
window. On startup it calls load_alarms/3 with from: nil; after that, from is the
previous until. This is why you persist alarms in save_alarm/3 and reload them in
load_alarms/3 — a far-future alarm may be persisted now and only loaded into memory
shortly before it is due, and it is reloaded after a restart.
If a message with the same X3m.System.Message.id is already scheduled in memory, it is
ignored (so duplicate scheduling is safe within a window).
Retries
After a scheduled message is dispatched, service_responded/2 is called with the
resolved message. Return:
:ok— delivery succeeded; drop the alarm (delete it from your store).{:retry, in_ms, message}— redelivermessageinin_msmilliseconds.
You can track attempts via the message's assigns:
@impl X3m.System.Scheduler
def service_responded(%SysMsg{response: {:ok, _}} = msg, repo) do
repo.delete_alarm(msg.id)
:ok
end
def service_responded(%SysMsg{assigns: %{dispatch_attempts: n}} = msg, _repo) when n < 5 do
{:retry, 30_000, msg}
end
def service_responded(%SysMsg{} = msg, repo) do
repo.mark_failed(msg.id)
:ok
enddispatch_attempts is incremented for you on each delivery.
Optional callbacks
Two callbacks have defaults you can override:
in_memory_interval/0— how often (ms) alarms are loaded into memory. Default is 6 hours.dispatch_timeout/1— how long (ms) to wait for the dispatched service to respond. Default is 5000.