DoubleEntryLedger.Workers.CommandWorker.UpdateTransactionCommandMapNoSaveOnError (double_entry_ledger v0.3.0)

View Source

Processes TransactionCommandMap structures for atomic update of commands and their associated transactions in the Double Entry Ledger system, without saving on error.

Implements the Optimistic Concurrency Control (OCC) pattern to ensure safe concurrent processing of update commands, providing robust error handling, retry logic, and transactional guarantees. This module ensures that update operations are performed atomically and consistently, and that all error and retry scenarios are handled transparently. Unlike the standard update command map processor, this variant does not persist changes on error, but instead returns changesets with error details for client handling.

Features

  • Transaction Processing: Handles update of transactions based on the command map's action.
  • Atomic Operations: Ensures all command and transaction changes are performed in a single database transaction.
  • Error Handling: Maps validation and dependency errors to the appropriate changeset or command state, but does not persist on error.
  • Retry Logic: Retries OCC conflicts and schedules retries for dependency errors.
  • OCC Integration: Integrates with the OCC processor behavior for safe, idempotent event processing.

Main Functions

  • process/2 — Entry point for processing update command maps with error handling and OCC.
  • build_transaction/3 — Constructs Ecto.Multi operations for update actions.
  • handle_build_transaction/3 — Adds event update or error handling steps to the Multi.
  • handle_transaction_map_error/3 — Returns a changeset with error details, does not persist.
  • handle_occ_final_timeout/2 — Handles OCC retry exhaustion, does not persist.

This module ensures that update commands are processed exactly once, even in high-concurrency environments, and that all error and retry scenarios are handled transparently and returned to the caller for further handling.

Summary

Functions

Builds the shared Ecto.Multi pipeline for both success and error flows.

Adds the step to update the event or handle errors after transaction processing.

Returns a changeset with error details for the given event map and error, without persisting the error.

Processes a TransactionCommandMap by creating both a command record and its associated transaction atomically, without saving on error.

Processes a command with OCC retry logic.

Finalizes when retry attempts are exhausted.

Types

Functions

build_multi(module, occable_item, repo)

Builds the shared Ecto.Multi pipeline for both success and error flows.

  1. :transaction_map – converts raw data to a map or returns an error tuple
  2. merges in either:

Parameters

  • module - the processor module implementing the callbacks
  • occable_item - the Command or TransactionCommandMap being processed
  • repo - the Ecto repo to use for DB ops

Returns

error(message, logable, changeset)

@spec error(String.t(), logable(), any()) :: {:ok, String.t()}

handle_build_transaction(multi, command_map, repo)

Adds the step to update the event or handle errors after transaction processing.

This function inspects the results of the previous Ecto.Multi steps and determines the appropriate next action for the event:

  • If both the transaction and event creation succeed, the event is marked as processed.
  • If the related create event is not yet processed, the event is reverted to pending.
  • If the related create event failed, a retry is scheduled for the update event.
  • For all other errors, the event is marked as dead letter.

If an error occurs, a changeset with error details is returned instead of persisting the error state.

Parameters

  • multi: The Ecto.Multi built so far.
  • command_map: The event map being processed.
  • _repo: The Ecto repository (unused).

Returns

  • The updated Ecto.Multi with either an :command_success or :command_failure step, or a changeset with error details.

handle_transaction_map_error(command_map, error, repo)

Returns a changeset with error details for the given event map and error, without persisting the error.

This function is used to handle errors in transaction mapping, providing a changeset that describes the error without affecting the database state.

info(message, logable, schema)

@spec info(String.t(), logable(), any()) :: {:ok, String.t()}

process(command_map, repo \\ Repo)

Processes a TransactionCommandMap by creating both a command record and its associated transaction atomically, without saving on error.

This function is designed for synchronous use, ensuring that both the command and the transaction are created or updated in one atomic operation. It handles both :create_transaction and :update action types, with appropriate transaction building logic for each case. The entire operation uses Optimistic Concurrency Control (OCC) with retry mechanisms to handle concurrent modifications effectively. If an error occurs, a changeset with error details is returned instead of persisting the error state.

Parameters

  • command_map: A TransactionCommandMap struct containing all command and transaction data.
  • repo: The repository to use for database operations (defaults to Repo).

Returns

  • {:ok, transaction, command} on success, where both the transaction and command are created/updated successfully.
  • {:error, changeset} if validation or dependency errors occur (not persisted).
  • {:error, reason} for other errors, with a string describing the error and the failing step.

process_with_retry(occable_item, repo \\ Repo)

Processes a command with OCC retry logic.

Converts command data to a transaction map, builds an Ecto.Multi, and retries on Ecto.StaleEntryError up to the configured maximum.

Parameters

  • occable_item: The command or command map to process
  • repo: The Ecto repository (defaults to Repo)

Returns

  • {:ok, %{transaction: Transaction.t(), command_success: Command.t()}} on success
  • {:ok, %{command_failure: Command.t()}} on failure
  • Ecto.Multi.failure() on unrecoverable error

retry(module, occable_item, error_map, attempts, repo)

Finalizes when retry attempts are exhausted.

Invokes Occable.timed_out/3 to mark the item as timed out, merges in handle_occ_final_timeout/2, then runs one last transaction.

Parameters

  • module - the processor module
  • occable_item - the item that timed out
  • error_map - the accumulated errors and retry count
  • repo - the Ecto repo

Returns

  • The result of the final repo.transaction/1

warn(message, logable)

@spec warn(String.t(), logable()) :: {:ok, String.t()}

warn(message, logable, changeset)

@spec warn(String.t(), logable(), any()) :: {:ok, String.t()}