Context module for delivery planning, attempt recording, and status transitions.
Delivery rows are idempotent per (notification_id, channel). Attempt rows are append-only — each adapter call produces a new attempt row.
Summary
Functions
Persists planning-time orchestration facts on the canonical delivery row.
Persists render identity on the canonical delivery row for reused planning paths.
Persists the validated render result on the canonical delivery row.
Persists workflow run and active-step linkage on the canonical delivery row.
Claims a recoverable delivery row by stamping durable recovery metadata on the
canonical row. Returns {:noop, delivery} if the row is no longer eligible.
Cancels a deferred delivery row in place, preserving the same delivery identity.
Returns {:noop, delivery} when the row is no longer a pending deferred delivery.
Transitions a :failed delivery to :cancelled with suppression_reason: "retries_exhausted".
Fetches a delivery by ID without raising. Pairs with get_delivery!/1 for
queue-boundary callers that prefer explicit {:error, :not_found}.
Fetches a delivery by ID, raising if not found.
Fetches a delivery by a provider message ID from its attempts.
Lists deferred delivery rows that are still pending and due for resume.
Lists delivery rows that are still pending, ready, older than the supplied threshold, and not already claimed for recovery.
Lists persisted events that are older than the supplied threshold, have at least one notification, and still have zero delivery rows planned.
Releases a digest-held source row back to the normal ready lifecycle.
Marks a digest-held source row as skipped at flush with an explicit reason.
Marks a digest-held source row as included in an emitted digest.
Plans a delivery row for the given notification_id and channel. Idempotent: duplicate calls on the same (notification_id, channel) create exactly one row.
Atomically inserts an attempt row and transitions the delivery status.
Re-drives a recoverable delivery row through the configured dispatcher while preserving canonical delivery identity and durable recovery metadata.
Re-drives a persisted event whose notifications exist but deliveries were never planned, using persisted render declarations instead of notifier callbacks.
Promotes a due deferred delivery row back to :ready without changing delivery identity.
Returns {:noop, delivery} when the row is no longer pending, deferred, and due.
Transitions a delivery to :suppressed and persists the suppression_reason.
Transitions a delivery to :suppressed, persists suppression_reason, and records
policy checkpoint metadata (planning or perform).
Returns the list of terminal delivery states — used by the dispatcher (Plan 02-02) to short-circuit dispatch for already-terminal deliveries.
Transitions a delivery to a new status, respecting the allowed transition table. Returns {:error, {:invalid_transition, from: current, to: new}} for disallowed transitions.
Functions
@spec apply_planning_decision(Chimeway.Delivery.t(), map()) :: {:ok, Chimeway.Delivery.t()} | {:error, term()}
Persists planning-time orchestration facts on the canonical delivery row.
@spec apply_render_identity(Chimeway.Delivery.t(), map()) :: {:ok, Chimeway.Delivery.t()} | {:error, term()}
Persists render identity on the canonical delivery row for reused planning paths.
@spec apply_render_result(Chimeway.Delivery.t(), map()) :: {:ok, Chimeway.Delivery.t()} | {:error, term()}
Persists the validated render result on the canonical delivery row.
@spec apply_workflow_linkage(Chimeway.Delivery.t(), map()) :: {:ok, Chimeway.Delivery.t()} | {:error, term()}
Persists workflow run and active-step linkage on the canonical delivery row.
@spec begin_recovery( binary() | Chimeway.Delivery.t(), keyword() ) :: {:ok, Chimeway.Delivery.t()} | {:noop, Chimeway.Delivery.t()}
Claims a recoverable delivery row by stamping durable recovery metadata on the
canonical row. Returns {:noop, delivery} if the row is no longer eligible.
@spec cancel_deferred_delivery( binary() | Chimeway.Delivery.t(), String.t(), keyword() ) :: {:ok, Chimeway.Delivery.t()} | {:noop, Chimeway.Delivery.t()}
Cancels a deferred delivery row in place, preserving the same delivery identity.
Returns {:noop, delivery} when the row is no longer a pending deferred delivery.
@spec exhaust_delivery(Chimeway.Delivery.t()) :: {:ok, Chimeway.Delivery.t()} | {:error, term()}
Transitions a :failed delivery to :cancelled with suppression_reason: "retries_exhausted".
This is the ONLY entry point for the failed -> :cancelled transition. The general
transition_status/2 path intentionally rejects failed -> :cancelled (the
@allowed_transitions table has failed: [:dispatched] only). exhaust_delivery/1
performs a direct change/2 |> Repo.update() that bypasses the transition table —
exactly mirroring how suppress_delivery/3 writes the :suppressed terminal state
from any non-terminal status.
Called from the Oban worker when
job.attempt == job.max_attempts and the adapter classification was :temporary
(REL-03 D-10/D-11). Records policy_checkpoint: "perform" in metadata so traces
preserve the explanation that exhaustion happened at perform time.
@spec fetch_delivery(binary()) :: {:ok, Chimeway.Delivery.t()} | {:error, :not_found}
Fetches a delivery by ID without raising. Pairs with get_delivery!/1 for
queue-boundary callers that prefer explicit {:error, :not_found}.
Added in Phase 33 to satisfy D-06 (worker must stop using raising lookup
paths at the queue boundary). Used by Chimeway.Webhooks.ProcessFeedbackWorker.
@spec get_delivery!(binary()) :: Chimeway.Delivery.t()
Fetches a delivery by ID, raising if not found.
@spec get_delivery_by_provider_message_id(String.t()) :: {:ok, Chimeway.Delivery.t()} | {:error, :not_found}
Fetches a delivery by a provider message ID from its attempts.
@spec list_due_deferred_deliveries(keyword()) :: [Chimeway.Delivery.t()]
Lists deferred delivery rows that are still pending and due for resume.
@spec list_recoverable_deliveries(keyword()) :: [Chimeway.Delivery.t()]
Lists delivery rows that are still pending, ready, older than the supplied threshold, and not already claimed for recovery.
@spec list_recoverable_events(keyword()) :: [Chimeway.Events.Event.t()]
Lists persisted events that are older than the supplied threshold, have at least one notification, and still have zero delivery rows planned.
@spec mark_digest_immediate(Chimeway.Delivery.t(), binary(), String.t(), keyword()) :: {:ok, Chimeway.Delivery.t()} | {:noop, Chimeway.Delivery.t()}
Releases a digest-held source row back to the normal ready lifecycle.
@spec mark_digest_skipped(Chimeway.Delivery.t(), binary(), String.t(), keyword()) :: {:ok, Chimeway.Delivery.t()} | {:noop, Chimeway.Delivery.t()}
Marks a digest-held source row as skipped at flush with an explicit reason.
@spec mark_digested(Chimeway.Delivery.t(), binary(), String.t(), keyword()) :: {:ok, Chimeway.Delivery.t()} | {:noop, Chimeway.Delivery.t()}
Marks a digest-held source row as included in an emitted digest.
@spec plan_delivery(binary(), atom() | binary(), keyword()) :: {:ok, Chimeway.Delivery.t()} | {:error, Ecto.Changeset.t() | term()}
Plans a delivery row for the given notification_id and channel. Idempotent: duplicate calls on the same (notification_id, channel) create exactly one row.
@spec record_attempt(Chimeway.Delivery.t(), map()) :: {:ok, %{delivery: Chimeway.Delivery.t(), attempt: Chimeway.DeliveryAttempt.t()}} | {:error, atom(), term(), map()}
Atomically inserts an attempt row and transitions the delivery status.
Returns {:ok, %{delivery: updated_delivery, attempt: attempt}} on success, or
{:error, step, reason, changes} if any step fails (both operations roll back).
Phase 14 contract additions
- Acquires a
SELECT ... FOR UPDATErow lock on the delivery via the:lock_deliveryMulti step BEFORE computingattempt_number(W8 preemptive fix). This serializes concurrentrecord_attempt/2callers for the same delivery and makesattempt_numbercontiguity invariant under concurrent execution. Thepending -> dispatchedtransition thatExecutor.run_delivery/1performs BEFORE calling this function is a secondary serialization layer. - Computes
attempt_numberinside the Multi via the:next_attempt_numberstep (RESEARCH Pattern 4). - Routes
error_classpermanent/bounced outcomes to:cancelledwith the appropriatesuppression_reasoninside the same transaction (RESEARCH Pitfall 2). This makes sync and Oban paths converge on a terminal state without forking — sync gains REL-03 convergence automatically. - Telemetry stop metadata now includes
attempt_numberanderror_class, preserving the Phase 10 correlation_id/notification_key keys.
@spec recover_delivery( binary() | Chimeway.Delivery.t(), keyword() ) :: {:ok, map()} | {:noop, map()} | {:error, term()}
Re-drives a recoverable delivery row through the configured dispatcher while preserving canonical delivery identity and durable recovery metadata.
@spec recover_event( binary() | Chimeway.Events.Event.t(), keyword() ) :: {:ok, map()} | {:noop, map()} | {:error, term()}
Re-drives a persisted event whose notifications exist but deliveries were never planned, using persisted render declarations instead of notifier callbacks.
@spec resume_deferred_delivery( binary() | Chimeway.Delivery.t(), keyword() ) :: {:ok, Chimeway.Delivery.t()} | {:noop, Chimeway.Delivery.t()}
Promotes a due deferred delivery row back to :ready without changing delivery identity.
Returns {:noop, delivery} when the row is no longer pending, deferred, and due.
@spec suppress_delivery(Chimeway.Delivery.t(), atom()) :: {:ok, Chimeway.Delivery.t()} | {:error, term()}
Transitions a delivery to :suppressed and persists the suppression_reason.
@spec suppress_delivery(Chimeway.Delivery.t(), atom(), keyword()) :: {:ok, Chimeway.Delivery.t()} | {:error, term()}
Transitions a delivery to :suppressed, persists suppression_reason, and records
policy checkpoint metadata (planning or perform).
Returns the list of terminal delivery states — used by the dispatcher (Plan 02-02) to short-circuit dispatch for already-terminal deliveries.
@spec transition_status(Chimeway.Delivery.t(), atom()) :: {:ok, Chimeway.Delivery.t()} | {:error, term()}
Transitions a delivery to a new status, respecting the allowed transition table. Returns {:error, {:invalid_transition, from: current, to: new}} for disallowed transitions.