DoubleEntryLedger.Workers.CommandWorker.UpdateTransactionCommandMapNoSaveOnError (double_entry_ledger v0.3.0)
View SourceProcesses TransactionCommandMap structures for atomic update of commands and their associated transactions in the Double Entry Ledger system, without saving on error.
Implements the Optimistic Concurrency Control (OCC) pattern to ensure safe concurrent processing of update commands, providing robust error handling, retry logic, and transactional guarantees. This module ensures that update operations are performed atomically and consistently, and that all error and retry scenarios are handled transparently. Unlike the standard update command map processor, this variant does not persist changes on error, but instead returns changesets with error details for client handling.
Features
- Transaction Processing: Handles update of transactions based on the command map's action.
- Atomic Operations: Ensures all command and transaction changes are performed in a single database transaction.
- Error Handling: Maps validation and dependency errors to the appropriate changeset or command state, but does not persist on error.
- Retry Logic: Retries OCC conflicts and schedules retries for dependency errors.
- OCC Integration: Integrates with the OCC processor behavior for safe, idempotent event processing.
Main Functions
process/2— Entry point for processing update command maps with error handling and OCC.build_transaction/3— Constructs Ecto.Multi operations for update actions.handle_build_transaction/3— Adds event update or error handling steps to the Multi.handle_transaction_map_error/3— Returns a changeset with error details, does not persist.handle_occ_final_timeout/2— Handles OCC retry exhaustion, does not persist.
This module ensures that update commands are processed exactly once, even in high-concurrency environments, and that all error and retry scenarios are handled transparently and returned to the caller for further handling.
Summary
Functions
Builds the shared Ecto.Multi pipeline for both success and error flows.
Adds the step to update the event or handle errors after transaction processing.
Returns a changeset with error details for the given event map and error, without persisting the error.
Processes a TransactionCommandMap by creating both a command record and its associated transaction atomically, without saving on error.
Processes a command with OCC retry logic.
Finalizes when retry attempts are exhausted.
Types
@type logable() :: DoubleEntryLedger.Command.t() | DoubleEntryLedger.Command.AccountCommandMap.t() | DoubleEntryLedger.Command.TransactionCommandMap.t() | map()
Functions
@spec build_multi(module(), DoubleEntryLedger.Occ.Occable.t(), Ecto.Repo.t()) :: Ecto.Multi.t()
Builds the shared Ecto.Multi pipeline for both success and error flows.
:transaction_map– converts raw data to a map or returns an error tuple- merges in either:
handle_transaction_map_error/3when conversion failsbuild_transaction/4+handle_build_transaction/3on success
Parameters
module- the processor module implementing the callbacksoccable_item- the Command or TransactionCommandMap being processedrepo- the Ecto repo to use for DB ops
Returns
- an
Ecto.Multiready forrepo.transaction/1
Adds the step to update the event or handle errors after transaction processing.
This function inspects the results of the previous Ecto.Multi steps and determines
the appropriate next action for the event:
- If both the transaction and event creation succeed, the event is marked as processed.
- If the related create event is not yet processed, the event is reverted to pending.
- If the related create event failed, a retry is scheduled for the update event.
- For all other errors, the event is marked as dead letter.
If an error occurs, a changeset with error details is returned instead of persisting the error state.
Parameters
multi: TheEcto.Multibuilt so far.command_map: The event map being processed._repo: The Ecto repository (unused).
Returns
- The updated
Ecto.Multiwith either an:command_successor:command_failurestep, or a changeset with error details.
Returns a changeset with error details for the given event map and error, without persisting the error.
This function is used to handle errors in transaction mapping, providing a changeset that describes the error without affecting the database state.
@spec process( DoubleEntryLedger.Command.TransactionCommandMap.t(), Ecto.Repo.t() | nil ) :: DoubleEntryLedger.Workers.CommandWorker.success_tuple() | {:error, Ecto.Changeset.t(DoubleEntryLedger.Command.TransactionCommandMap.t()) | String.t()}
Processes a TransactionCommandMap by creating both a command record and its associated transaction atomically, without saving on error.
This function is designed for synchronous use, ensuring that both the command and the transaction are created or updated in one atomic operation. It handles both :create_transaction and :update action types, with appropriate transaction building logic for each case. The entire operation uses Optimistic Concurrency Control (OCC) with retry mechanisms to handle concurrent modifications effectively. If an error occurs, a changeset with error details is returned instead of persisting the error state.
Parameters
command_map: ATransactionCommandMapstruct containing all command and transaction data.repo: The repository to use for database operations (defaults toRepo).
Returns
{:ok, transaction, command}on success, where both the transaction and command are created/updated successfully.{:error, changeset}if validation or dependency errors occur (not persisted).{:error, reason}for other errors, with a string describing the error and the failing step.
Processes a command with OCC retry logic.
Converts command data to a transaction map, builds an Ecto.Multi, and
retries on Ecto.StaleEntryError up to the configured maximum.
Parameters
occable_item: The command or command map to processrepo: The Ecto repository (defaults toRepo)
Returns
{:ok, %{transaction: Transaction.t(), command_success: Command.t()}}on success{:ok, %{command_failure: Command.t()}}on failureEcto.Multi.failure()on unrecoverable error
@spec retry( module(), DoubleEntryLedger.Occ.Occable.t(), DoubleEntryLedger.Command.ErrorMap.t(), non_neg_integer(), Ecto.Repo.t() ) :: {:ok, %{ transaction: DoubleEntryLedger.Transaction.t(), command_success: DoubleEntryLedger.Command.t() }} | {:ok, %{command_failure: DoubleEntryLedger.Command.t()}} | Ecto.Multi.failure()
Finalizes when retry attempts are exhausted.
Invokes Occable.timed_out/3 to mark the item as timed out, merges in
handle_occ_final_timeout/2, then runs one last transaction.
Parameters
module- the processor moduleoccable_item- the item that timed outerror_map- the accumulated errors and retry countrepo- the Ecto repo
Returns
- The result of the final
repo.transaction/1