Migration

The DDL lives in the library; the host writes a one-line migration that delegates to it:

defmodule MyApp.Repo.Migrations.SetupGenDurable do
  use Ecto.Migration

  def up,   do: GenDurable.Migration.up()
  def down, do: GenDurable.Migration.down()
end

up/1 records the installed schema version in a table comment and only applies missing increments, so the host-facing call stays stable as the library evolves. :prefix puts the tables in a non-public Postgres schema.

Starting the engine

Add it to your supervision tree after your repo:

children = [
  MyApp.Repo,
  {GenDurable, repo: MyApp.Repo, queues: [default: 10, checkout: 5]}
]

You don't list your FSM modules — they're resolved from the row (the fsm column defaults to the module name). Pass :fsms only to register a machine with a custom :name, or to keep an old :version running alongside a new one.

Crash recovery: lease + reaper

Every claimed row holds a lease (lease_expires_at), extended by a heartbeat while the step runs. If a worker dies mid-step, the lease expires and the reaper returns the row to runnable with attempt += 1 — the step re-runs from scratch. This is the at-least-once safety floor: a crashed step (no outcome) is retried whether or not your code asked for it, so step effects must tolerate running again.

Garbage collection

Terminal (done/failed) rows are deleted by a built-in GC sweep, so finished work doesn't accumulate forever. The delete scales with the batch, not the table size.

  • :gc_interval (default 60_000 ms) — time between sweeps; nil disables GC entirely.
  • :gc_retention (default 86_400_000 ms ≈ 1 day) — how long a row is kept after it terminates.
  • :gc_batch (default 10_000) — max rows per sweep; it re-sweeps at once when a sweep fills.

A terminal child whose parent is still mid-join is spared until the parent finishes.

Configuration reference

The engine is started as {GenDurable, opts}:

OptionDefaultMeaning
:repo— (required)the host's Ecto.Repo
:fsms[]modules to register — only for a custom :name or versioning
:queues[default: 10]queue_name => concurrency
:rate_limits[]named token-bucket limits
:lease_ttl60_000ms a claimed row stays leased before the reaper may reclaim it
:heartbeat_interval20_000ms between lease extensions for claimed rows
:poll_interval1_000base ms between idle polls
:reap_interval30_000ms between reaper sweeps
:gc_interval60_000ms between GC sweeps; nil disables GC
:gc_retention86_400_000ms a terminal row is kept before GC may delete it
:gc_batch10_000max rows deleted per GC sweep
:prefetch0rows each queue buffers beyond its running slots
:min_demand1skip picking unless at least this many slots are free
:max_poll_interval5_000idle-backoff ceiling for the poll interval
:drain_timeout5_000ms each queue waits for in-flight steps on shutdown

Keep heartbeat_interval × 3 ≲ lease_ttl for margin (the defaults satisfy this).

Tuning the feeder

:prefetch, :min_demand, and :max_poll_interval are the feeder aggressiveness knobs. Defaults are conservative (fair across nodes, low idle DB chatter). Raising :prefetch claims work ahead into an in-memory buffer — buffered rows are heartbeated, so depth is safe against :lease_ttl, but a deep buffer trades off cross-node fairness, priority freshness, and crash blast radius.

Telemetry

Attach to these [:gen_durable, …] events:

EventWhenMeasurements / Metadata
[:step, :stop]a step finished%{duration} / %{id, fsm, step, kind}
[:pick, :stop]a picker batch ran%{count, demand} / %{queue, worker}
[:scheduler, :saturation]per-poll gauge%{in_flight, buffer, concurrency, prefetch} / %{queue}
[:scheduler, :drain]graceful queue shutdown%{released, in_flight} / %{queue}
[:concurrency, :contended]a concurrency_key lock was contended%{count} / %{id, fsm, concurrency_key}
[:reaper, :reaped]expired leases reclaimed%{count} / %{ids}
[:gc, :swept]terminal rows deleted%{count} / %{}
[:rate_limit, :throttled]a bucket bit%{wanted, granted} / %{key, queue}
[:rate_limit, :unknown]a step named an unconfigured limit%{count} / %{key, name, fsm, step}