GenDurable.Queries (gen_durable v0.2.0)

Copy Markdown View Source

Every database statement, one function each, as raw SQL.

All functions take the repo explicitly. The complete_* functions run the outcome UPDATE and the consumed-signal DELETE in one transaction. concurrency_key serialization (advisory lock) helpers live here too; the caller is responsible for holding a single connection (Repo.checkout/1) for the lock's lifetime.

Every statement has a static SQL text and goes through the connection-level prepared-statement cache (cache_statement:), so Postgres parses and plans it once per connection instead of on every call — bulk inserts pass rows as parallel arrays via unnest to keep the text static (and the parameter count fixed, clear of the 65535-parameter protocol cap). The one exception is upsert_rate_configs (dynamic VALUES, boot-time only). Hosts behind a transaction-pooling proxy set prepare: :unnamed on the repo to bypass the cache.

JSON values (state, result, signal payloads) arrive here as encoded JSON text and are bound as TEXT parameters cast server-side ($n::text::jsonb). A bare $n::jsonb parameter would make the driver JSON-encode the already-encoded string, storing a double-encoded jsonb scalar instead of an object — invisible to ->>/jsonb indexes. Rows written by versions that did exactly that still decode fine: the read paths accept both formats.

Summary

Functions

advisory_try_lock(repo, key)

advisory_unlock(repo, key)

complete_await(repo, id, worker, state_json, names, next_step, presented_ids, timeout_ms)

complete_done(repo, id, worker, result_json)

complete_next(repo, id, worker, step, state_json, consumed_ids, rate_limit, weight)

complete_retry(repo, id, worker, state_json, delay_ms)

complete_schedule_childs(repo, parent_id, worker, next_step, state_json, children, consumed_ids)

complete_stop(repo, id, worker, reason_text)

deliver_signal(repo, target, name, payload_json, dedup_key)

expire_awaits(repo)

gc(repo, retention_ms, batch)

gc_buckets(repo)

heartbeat(repo, ids, worker, lease_ttl_ms)

insert(repo, p)

insert_all(repo, rows)

load_signals(repo, target_id)

pick(repo, queue, batch, worker, lease_ttl_ms)

reap(repo)

reclaim_orphans(repo, queue, prefix, margin_ms)

release(repo, ids, worker)

reset_to_runnable(repo, id, worker)

upsert_rate_configs(repo, configs)