DoubleEntryLedger.Workers.CommandWorker.CreateTransactionCommandMap (double_entry_ledger v0.4.0)
View SourceProcesses TransactionCommandMap structures for atomic creation and update of events and their
associated transactions in the Double Entry Ledger system.
This module implements the Optimistic Concurrency Control (OCC) pattern to ensure safe concurrent processing of events, providing robust error handling, retry logic, and transactional guarantees. It supports both creation and update flows for events, ensuring that all operations are performed atomically and consistently.
Features
- Transaction Processing: Handles both creation and update of transactions based on the event map's action.
- Atomic Operations: Ensures all event and transaction changes are performed in a single database transaction.
- Error Handling: Maps validation and dependency errors to the appropriate changeset or event state.
- Retry Logic: Retries OCC conflicts and schedules retries for dependency errors.
- OCC Integration: Integrates with the OCC processor behavior for safe, idempotent event processing.
Main Functions
process_map/2— Entry point for processing event maps with error handling and OCC.build_transaction/3— Constructs Ecto.Multi operations for create or update actions.handle_build_transaction/3— Adds event update or error handling steps to the Multi.
This module ensures that events are processed exactly once, even in high-concurrency environments, and that all error and retry scenarios are handled transparently.
Summary
Functions
Builds the shared Ecto.Multi pipeline for both success and error flows.
Builds an Ecto.Multi transaction for processing an event map based on its action type.
Adds the step to update the event or handle errors after transaction processing.
Handles the case when OCC retries are exhausted for an event map.
Handles errors that occur when converting event map data to a transaction map.
Processes an TransactionCommandMap by creating both an event record and its associated transaction atomically.
Processes a command with OCC retry logic.
Finalizes when retry attempts are exhausted.
Types
@type logable() :: DoubleEntryLedger.Command.t() | DoubleEntryLedger.Command.AccountCommandMap.t() | DoubleEntryLedger.Command.TransactionCommandMap.t() | map()
Functions
@spec build_multi(module(), DoubleEntryLedger.Occ.Occable.t(), Ecto.Repo.t()) :: Ecto.Multi.t()
Builds the shared Ecto.Multi pipeline for both success and error flows.
:transaction_map– converts raw data to a map or returns an error tuple- merges in either:
handle_transaction_map_error/3when conversion failsbuild_transaction/4+handle_build_transaction/3on success
Parameters
module- the processor module implementing the callbacksoccable_item- the Command or TransactionCommandMap being processedrepo- the Ecto repo to use for DB ops
Returns
- an
Ecto.Multiready forrepo.transaction/1
Builds an Ecto.Multi transaction for processing an event map based on its action type.
This function implements the OCC processor behavior and creates the appropriate
transaction operations depending on whether the event is a :create_transaction or :update action.
For :create_transaction actions:
- Inserts a new event with status
:pending - Creates a new transaction in the ledger
- Updates the event to mark it as processed with the transaction ID
For :update actions:
- Inserts a new event with status
:pending - Retrieves the related "create event" transaction
- Updates the existing transaction with new data
- Updates the event to mark it as processed with the transaction ID
Parameters
command_map: AnTransactionCommandMapstruct containing the event details and action type.transaction_map: A map containing the transaction data to be created or updated.repo: The Ecto repository to use for database operations.
Returns
- An
Ecto.Multistruct containing the operations to execute within a transaction.
Adds the step to update the event or handle errors after transaction processing.
This function inspects the results of the previous Ecto.Multi steps and determines
the appropriate next action for the event:
- If both the transaction and event creation succeed, the event is marked as processed.
- If the related create event is not yet processed, the event is reverted to pending.
- If the related create event failed, a retry is scheduled for the update event.
- For all other errors, the event is marked as dead letter.
Parameters
multi: TheEcto.Multibuilt so far.command_map: The event map being processed._repo: The Ecto repository (unused).
Returns
- The updated
Ecto.Multiwith either an:command_successor:command_failurestep.
Handles the case when OCC retries are exhausted for an event map.
Delegates to DoubleEntryLedger.Workers.CommandWorker.TransactionCommandResponseHandler.handle_occ_final_timeout/2.
Parameters
command_map: The event map being processed.repo: The Ecto repository.
Returns
- An
Ecto.Multithat updates the event as dead letter or timed out.
Handles errors that occur when converting event map data to a transaction map.
Delegates to DoubleEntryLedger.Workers.CommandWorker.TransactionCommandResponseHandler.handle_transaction_map_error/3.
Parameters
command_map: The event map being processed.error: The error encountered during transaction map conversion.repo: The Ecto repository.
Returns
- An
Ecto.Multithat updates the event with error information.
@spec process( DoubleEntryLedger.Command.TransactionCommandMap.t(), Ecto.Repo.t() | nil ) :: DoubleEntryLedger.Workers.CommandWorker.success_tuple() | DoubleEntryLedger.Workers.CommandWorker.error_tuple()
Processes an TransactionCommandMap by creating both an event record and its associated transaction atomically.
This function is designed for synchronous use, ensuring that both the event and the transaction
are created or updated in one atomic operation. It handles both :create_transaction and :update action types,
with appropriate transaction building logic for each case. The entire operation uses Optimistic
Concurrency Control (OCC) with retry mechanisms to handle concurrent modifications effectively.
Parameters
command_map: AnTransactionCommandMapstruct containing all event and transaction data.repo: The repository to use for database operations (defaults toRepo).
Returns
{:ok, transaction, event}on success, where both the transaction and event are created/updated successfully.{:error, event}if the transaction processing fails with an OCC or dependency issue:- If there was an OCC timeout, the event will be in the
:occ_timeoutstate and can be retried. - If this is an update event and the create event is still in pending state, the event will be in the
:pendingstate.
- If there was an OCC timeout, the event will be in the
{:error, changeset}if validation errors occur:- For event validation failures, the TransactionCommandMap changeset will contain event-related errors.
- For transaction validation failures, the TransactionCommandMap changeset will contain mapped transaction errors.
{:error, reason}for other errors, with a string describing the error and the failing step.
Processes a command with OCC retry logic.
Converts command data to a transaction map, builds an Ecto.Multi, and
retries on Ecto.StaleEntryError up to the configured maximum.
Parameters
occable_item: The command or command map to processrepo: The Ecto repository (defaults toRepo)
Returns
{:ok, %{transaction: Transaction.t(), command_success: Command.t()}}on success{:ok, %{command_failure: Command.t()}}on failureEcto.Multi.failure()on unrecoverable error
@spec retry( module(), DoubleEntryLedger.Occ.Occable.t(), DoubleEntryLedger.Command.ErrorMap.t(), non_neg_integer(), Ecto.Repo.t() ) :: {:ok, %{ transaction: DoubleEntryLedger.Transaction.t(), command_success: DoubleEntryLedger.Command.t() }} | {:ok, %{command_failure: DoubleEntryLedger.Command.t()}} | Ecto.Multi.failure()
Finalizes when retry attempts are exhausted.
Invokes Occable.timed_out/3 to mark the item as timed out, merges in
handle_occ_final_timeout/2, then runs one last transaction.
Parameters
module- the processor moduleoccable_item- the item that timed outerror_map- the accumulated errors and retry countrepo- the Ecto repo
Returns
- The result of the final
repo.transaction/1