Status: draft v1. Companion to gen_durable_spec.md (normative). This document is the how;
the spec is the what. Where this plan and the spec disagree, the spec wins — fix this plan.
The engine is an Elixir library: a Postgres-backed durable FSM runtime on top of GenServer. The one guarantee we are building toward: on step completion, the new state is committed to the database before execution proceeds (spec §1). Everything below serves that sentence.
0. Tech stack and key decisions
These are the choices that shape every later phase. Each has a recommendation and the reasoning; items marked CONFIRM are worth a yes/no before we lean on them.
| Decision | Recommendation | Why |
|---|---|---|
| Language / build | Elixir library, Mix, lib/gen_durable/… | Spec is GenServer-based. |
| DB driver | ecto_sql + postgrex (DECIDED) | Use Ecto.Migration for the DDL and an Ecto.Repo for connection pooling, but run hot-path statements as raw SQL (Repo.query!/Postgrex). The spec is SQL-first (FOR UPDATE SKIP LOCKED, generated columns, advisory locks); we do not hide those behind the query DSL. |
| New-work latency | Polling only. LISTEN/NOTIFY is banned, permanently. (DECIDED) | Spec §6 mandates a "dumb picker". Polling is trivially correct; NOTIFY is not an option in this codebase. Do not add it, do not propose it. |
state encoding | Typed struct per FSM, jsonb ↔ struct via Ecto embedded schema (DECIDED) | Each FSM declares its own state struct; the engine loads jsonb → struct before step/2 and dumps struct → jsonb on the outcome. ctx.state is that struct, not a raw map. Living with a plain map is allowed but unsupported — you're on your own. |
| FSM definition | use GenDurable.FSM behaviour, step/2 + handle/2 callbacks, state: struct module | The step text column maps to a function-clause head; state: names the embedded schema. |
| FSM versioning | Explicit registry {fsm_name, fsm_version} => module in config/start (DECIDED) | Spec §8: old instances finish on their fsm_version. Old versions (Checkout.V1, Checkout.V2) coexist as explicitly-registered modules; we resolve module per row, never assume "latest", no compile-time magic. |
| Defaults (timings) | lease 60s · heartbeat 20s · poll 1s · reaper 30s ("Balanced", DECIDED) | All config-overridable. Margin: heartbeat × 3 = lease. |
| Signal consume | Engine auto-deletes the snapshotted signal ids on the outcome txn (DECIDED) | Outcomes (§3) carry no signal info; engine snapshots the ids it loaded into ctx and deletes exactly those. Signals arriving mid-step survive. |
| Step ↔ connection | one DB connection is held for the whole step when partition_key is set | Session-level pg_try_advisory_lock must live on a connection that survives the step, which runs outside any transaction. This ties a connection to each in-flight partitioned step — call it out in capacity planning. |
The advisory-lock subtlety (read this before Phase 9)
partition_key serialization (spec §6, Pick) uses a session-level advisory lock, not
pg_advisory_xact_lock, because the step body runs outside a transaction. Consequences:
- The lock must be taken on, and held by, the same connection for the lock's lifetime, then
released after the outcome commits (
pg_advisory_unlock, or drop the connection). - The picking
UPDATEruns in a short transaction on a pooled connection; the lock attempt and the step run on a (possibly different) checked-out connection. Be deliberate about which connection owns the lock. - Worker death drops the connection → Postgres releases the advisory lock automatically. This is the free recovery the spec relies on; do not "optimize" it by reusing connections across steps.
1. Architecture overview
GenDurable.Application (supervisor)
├── GenDurable.Repo (Ecto repo / Postgrex pool)
├── GenDurable.Registry ({fsm, version} -> module)
├── GenDurable.Reaper (periodic: expired leases -> runnable, attempt+1)
└── GenDurable.Queue.Supervisor (one child per configured queue/pool)
└── GenDurable.Queue "default"
├── Scheduler (GenServer: poll, run Pick for free slots, hand rows to workers)
├── Heartbeat (extends lease_expires_at for in-flight rows)
└── Task.Supervisor (one Task per executing step; local concurrency = pool size)Data flow of one step:
- Scheduler picks
runnablerows (short txn,FOR UPDATE SKIP LOCKED) →executing+ lease. - If
partition_keyset, Executor takes the advisory lock; on conflict, return row torunnable. - Executor loads pending signals (if relevant), resolves the module, calls
step/2undertry. - Outcome (or
handle/2outcome on exception) is written in one transaction, deleting consumed signals in that same transaction (spec §5). - Advisory lock released, heartbeat stopped.
Crash with no outcome → lease expires → Reaper resets to runnable, attempt + 1, step re-runs
from scratch (spec §4.3). handle/2 is not called on a reap — that is the at-least-once floor.
2. Module layout
lib/gen_durable.ex # public API: insert/2, insert_all/2, signal/4
lib/gen_durable/application.ex
lib/gen_durable/repo.ex
lib/gen_durable/fsm.ex # `use GenDurable.FSM`, behaviour + __using__
lib/gen_durable/state.ex # `use GenDurable.State` — embedded schema + load/dump codec
lib/gen_durable/outcome.ex # outcome constructors / validation (the §3 table)
lib/gen_durable/context.ex # ctx struct passed to step/2 and handle/2
lib/gen_durable/registry.ex # {name, version} -> module
lib/gen_durable/queries.ex # ALL raw SQL from spec §10, one function each
lib/gen_durable/executor.ex # run one picked row end-to-end
lib/gen_durable/scheduler.ex # per-queue picker loop + concurrency accounting
lib/gen_durable/heartbeat.ex
lib/gen_durable/reaper.ex
lib/gen_durable/signals.ex # sender-side delivery
lib/gen_durable/migration.ex # GenDurable.Migration.up/1 down/1 — host app calls it (Oban-style)
test/support/test_repo/migrations/..._setup.exs # test repo only; just calls GenDurable.Migration.up()queries.ex is the single home for the spec's SQL so §10 maps 1:1 to functions and stays auditable.
The library ships no Ecto migration files of its own — the DDL lives in GenDurable.Migration
and the host app writes a one-line migration that delegates to it (see §4).
3. Public API (target shape)
# Typed state struct — one per FSM, an Ecto embedded schema
defmodule Checkout.State do
use GenDurable.State # thin wrapper over `embedded_schema`
embedded_schema do
field :order, :integer
field :n, :integer, default: 0
field :shipped, :boolean, default: false
end
end
# Defining a machine
defmodule Checkout do
use GenDurable.FSM, version: 1, queue: "checkout", state: Checkout.State
@impl true
def step("start", %{state: s} = _ctx), do: {:next, "await_pay", %{s | n: s.n + 1}}
def step("await_pay", ctx), do: {:await, "payment_confirmed", ctx.state}
def step("ship", %{state: s} = _ctx), do: {:done, %{"shipped" => true}}
@impl true
def handle(reason, ctx) do
if ctx.attempt < 5, do: {:replay, ctx.state, backoff(ctx.attempt)}, else: {:stop, reason}
end
end
# Enqueue — `state:` is cast into Checkout.State
{:ok, id} = GenDurable.insert(Checkout,
state: %{order: 42},
partition_key: "order:42",
priority: 0,
unique_key: <<...>>, unique_scope: ["runnable", "executing", "awaiting_signal"])
# Batch (single statement, dedup via the partial unique index)
GenDurable.insert_all(Checkout, [ %{...}, %{...} ])
# Signal into an await (durable, at-least-once)
GenDurable.signal(id, "payment_confirmed", %{"amount" => 100}, dedup_key: "evt-7")ctx.state is a %Checkout.State{} struct (loaded from jsonb before the call); step/2/handle/2
return a struct of the same type in :next/:replay/:await, which the engine dumps back to jsonb.
result in :done stays a plain string-keyed map (terminal payload, never re-loaded into step code).
ctx for step/2: %Context{id, fsm, fsm_version, step, attempt, state, signals}.
ctx for handle/2: same minus signals (spec §4.2: %{id, fsm, fsm_version, step, attempt, state}).
Outcomes returned by step/2 and handle/2 are exactly the spec §3 table:
{:next, step, state} · {:replay, state, delay_ms} · {:await, name, state} ·
{:done, result} · {:stop, reason}. Outcome validates the shape before it touches SQL.
4. Data model
Migration: library-owned, Oban-style
The DDL lives in GenDurable.Migration, not in copy-pasted Ecto migration files. The host app writes
a thin migration that delegates:
defmodule MyApp.Repo.Migrations.SetupGenDurable do
use Ecto.Migration
def up, do: GenDurable.Migration.up() # opts: [prefix: "public", version: :latest]
def down, do: GenDurable.Migration.down()
endup(opts \\ [])/down(opts \\ []), both runexecute/1of the spec §9 DDL verbatim (the generated column, partial indexes, and enum are clearer as raw SQL than via the migration DSL).:prefix— target Postgres schema (defaultpublic);:version— schema version to migrate to (default latest). v1 ships version 1 only.- The installed schema version is recorded in
COMMENT ON TABLE gen_durable(Oban's trick), read onupso only missing increments apply. This keeps the user-facing call stable when we add a v2.
Tables created: gen_durable, signals, the durable_status enum, the three indexes
(gen_durable_pick, gen_durable_lease, gen_durable_unique) plus signals_target.
Spec deviation D1 — unique_scope must be durable_status[], not text[]
Spec §7/§9 define unique_scope text[] and the generated column as
status::text = any(unique_scope). Postgres rejects this: a generated always as (...) stored
expression must be IMMUTABLE, but the enum→text cast (enum_out) is only STABLE (enum labels can be
renamed). Result: ERROR 42P17 generation expression is not immutable.
Fix applied in GenDurable.Migration (v1): store unique_scope as durable_status[] and compare
status = any(unique_scope) (enum = enum, enum_eq is IMMUTABLE — the cast disappears). The §7
semantics are unchanged. Downstream impact: inserts pass scope as $scope::durable_status[], and the
engine reads scope as enum-label strings. The normative spec §7/§9 should be patched to match.
Notes to verify during implementation:
unique_guardisgenerated always as (...) stored; the partial unique index is on it.- Batch insert uses
ON CONFLICT (unique_guard) WHERE unique_guard is not null DO NOTHING— the inference predicate must match the partial index predicate exactly, or PG won't pick the index. signalshasunique (target_id, dedup_key);dedup_key NULL⇒ no dedup (NULLs don't conflict).
5. Component design
5.1 Queries (queries.ex)
One function per spec §10 block, all parameterized, all raw SQL:
pick/3, heartbeat/2, reap/0, complete_next/…, complete_replay/…, complete_await/…,
complete_done/…, complete_stop/…, deliver_signal/…, insert/…, insert_all/….
The five "complete_" run the outcome UPDATE *and the consumed-signal DELETE in one txn.
5.2 Executor (executor.ex)
Given a picked row map, run the step to a committed outcome:
- If
partition_key,pg_try_advisory_lock(hashtext(partition_key))on the step connection; onfalse, reset row torunnable(drop lease) and stop — another worker owns the key. - Load pending signals for
id(snapshot their ids — we delete exactly these on commit, so signals arriving mid-step aren't lost). - Resolve module via
Registry.fetch!({fsm, fsm_version}); load jsonb → state struct (the FSM'sstate:embedded schema, viaEcto.embedded_load); buildContextwith the struct. try→module.step(step, ctx); on raised exception →module.handle(reason, ctx); ifhandle/2itself raises → write:stop/failed(spec §4.2).- Dump the returned state struct → jsonb (
Ecto.embedded_dump), then apply the outcome via the matchingcomplete_*query, deleting the snapshotted consumed signals in the same txn. pg_advisory_unlock, stop heartbeat.
A worker process crash before step 5 means no outcome row → reaper path. We do not trap and
convert crashes into handle/2; crash ≠ caught exception (spec §4 distinguishes the three sources).
5.3 Scheduler (scheduler.ex)
Per queue. Tracks free slots = pool_size − in_flight. On tick (interval, default ~lease_ttl/10,
CONFIRM) and when slots free up: run pick(queues, free_slots), spawn one Task per returned row
under the queue's Task.Supervisor, decrement slots; on Task completion (any reason) increment slots.
Demand-driven so we never pick more than we can run.
5.4 Heartbeat (heartbeat.ex)
Extends lease_expires_at for in-flight rows on an interval < lease_ttl (e.g. lease_ttl/3).
Implementable as one process per queue ticking over the in-flight set, or per-step timer. Lease TTL
and heartbeat interval are config; document the safety margin (heartbeat × k < lease_ttl).
5.5 Reaper (reaper.ex)
Periodic GenServer running the spec §10 Reaper UPDATE. Global (not per-queue). Interval config.
5.6 Signals (signals.ex)
signal/4: the spec §10 sender transaction — insert the signal row (with ON CONFLICT … DO NOTHING
for dedup) then flip awaiting_signal+matching awaits to runnable. Insert-before-flip so the
re-executed step already sees the signal in its inbox (spec §5).
6. Milestones
Status: M0–M10 all ✅ DONE. 33 tests green (Elixir 1.18 / OTP 27 + Postgres 17 in the
devcontainer). The whole engine — typed/map state, :next loop, await/signal, handle→
:replay→:stop, reaper crash-recovery, uniqueness, and partition_key serialization — runs
end-to-end. See module map in §2; tests in test/.
Each milestone ends in something testable. Dependencies are roughly linear; signals (M7) and uniqueness (M8) can proceed in parallel once persistence (M3) lands.
- M0 — Scaffolding. ✅ DONE. Mix project (Elixir 1.18 / OTP 27 via
.devcontainer+ Postgres), deps (ecto_sql3.14,postgrex0.22,jason),GenDurable.Migration.up/down(spec §9 DDL, Oban-style facade with version-in-table-comment),GenDurable.Test.Repo, test migration, smoke tests.down → upround-trip verified (enum + comment torn down cleanly). All green. Deviation found & applied: see "Spec deviation D1" below. - M1 — Core types (pure). ✅
FSMbehaviour +__using__,State(embedded schema + jsonb load/dump codec),Outcome,Context,Registry, step dispatch. No DB. Tests:outcome_test,state_test,registry_test. - M2 — Persistence ops. ✅ All of
queries.exagainst a real DB, called directly. Tests:queries_testdrives pick → each outcome, reaper, signals, uniqueness, batch insert. - M3 — Executor. ✅
executor.exruns one picked row end-to-end incl.try/handle/2/handle-raises. Crash ≠ caught exception (nohandle/2on a process crash). - M4 — Scheduler + pool + heartbeat. ✅
scheduler.ex— per-queue picker loop, concurrency cap, batched lease heartbeat. Test: Counter/MapCounter drive to:done. - M5 — Reaper. ✅
reaper.ex. Test:Rebornkills the worker mid-step → reaper resets →attempt+1→ succeeds;handle/2not invoked. - M6 —
:replay& backoff plumbing. ✅attemptreset on:next,+1on:replay,eligible_athonored (seequeries_test+Crasher). - M7 — await + signals. ✅
:awaitparks;signal/4delivers durably (insert-before-flip); re-executed step sees the signal and the engine deletes it in the outcome txn. Test:Awaiter. - M8 — Uniqueness + batch insert. ✅
unique_guarddedup, per-job scope, single-statementinsert_all. Test: dedup vs existing rows and within the batch; scope-exit frees the key. - M9 — partition_key serialization. ✅ Session advisory lock on a checked-out connection held for
the step; contended keys returned to
runnable; auto-release on worker death. Test:PartitionInc— N instances sharing a key, no lost updates. - M10 — Hardening. ✅ Telemetry (
[:gen_durable, :step, :stop],[:gen_durable, :reaper, :reaped]), config surface (GenDurable.Supervisoropts), README,--warnings-as-errorsclean.
Open follow-ups (post-v1, not blocking)
- D2: spec §10
:doneupdate listsstate = $state, but the{:done, result}outcome carries no state. The engine writesresultonly and leavesstateas-is. Worth reconciling in the spec. - Signal-consumption edge: on a progressing outcome the engine deletes the whole inbox snapshot; a non-matching signal present at that moment is dropped. Fine for one-await-one-signal (the common case); revisit if multi-name inboxes become real (spec §5 wants non-matching signals to persist).
- Always-load-signals: every step does one
signals_targetSELECT. Cheap, but optimizable later.
7. Testing strategy
The guarantee is concurrency- and crash-shaped, so tests must be too. Ecto.Adapters.SQL.Sandbox
fights advisory locks and multi-connection flows — run engine tests against a real DB,
async: false, with explicit cleanup.
- Outcome correctness: for each §3 outcome, assert the resulting row matches the §10
UPDATE(status,attempt,eligible_at, lease cleared, signals deleted). - At-least-once on crash: kill the step Task mid-flight; assert reap → re-run, and that an effect guarded by idempotency runs at-least-once (and an unguarded one may run twice — documented).
- await/signal: loss-free delivery (signal always wakes), duplicate-tolerant (dedup_key), and the "insert before flip" ordering so the woken step sees its signal.
- Uniqueness: concurrent
insert_allfrom multiple connections dedups; leaving an occupied status frees the key for re-insertion;unique_key IS NULLnever conflicts. - partition_key: spawn many machines sharing a key; assert steps never overlap and preserve order; assert cross-key parallelism; assert lock auto-release on simulated connection drop.
- Property test (optional): random interleavings of pick/heartbeat/reap/outcome preserve "state committed before proceed" and never lose a signal.
8. Decisions (all settled)
- DB:
ecto_sql+ raw hot-path SQL. - Latency: polling only;
LISTEN/NOTIFYbanned permanently — do not add it. - State: typed struct per FSM (Ecto embedded schema), jsonb ↔ struct; plain-map state is
unsupported.
resultstays a plain string-keyed map. - Timings (Balanced): lease 60s · heartbeat 20s · poll 1s · reaper 30s, all config-overridable.
- Registry: explicit
{name, version} => moduleconfig; old versions coexist as registered modules. - Signals: engine auto-deletes the snapshotted consumed signal ids in the outcome txn.
9. Risks / watch-items
- Connection pressure from partitioned steps (§0): each in-flight partitioned step holds a connection for its whole duration. Pool sizing must account for it.
- ON CONFLICT inference on the generated column must match the partial index predicate exactly.
- Heartbeat vs lease race: if heartbeat starves (slow DB), a live step can be reaped and run twice. The TTL margin and the at-least-once contract make this safe, but tune the margin.
handle/2re-entrancy: a caught exception callshandle/2, which may:replay; ensure the re-run path is identical to a fresh run (no leftover lease/lock state).