DoubleEntryLedger.Workers.CommandWorker.CreateTransactionCommand (double_entry_ledger v0.4.0)
View SourceHandles the processing of creation events in the double-entry ledger system.
This module is responsible for transforming event data into ledger transactions and creating those transactions within the accounting system. It implements optimistic concurrency control (OCC) to handle potential conflicts when multiple processes attempt to create transactions simultaneously.
Workflow
- Receives an event with
action: :create_transaction - Transforms the event's transaction data into a valid transaction map
- Attempts to create a transaction in the database
- If successful, marks the event as processed and links it to the created transaction
- If unsuccessful due to concurrency issues, implements retry logic
- If unsuccessful due to other errors, marks the event as failed
Error Handling
The module implements comprehensive error handling with specific error types:
- Transaction map transformation errors
- Database transaction errors
- OCC retry exhaustion errors
Each error is recorded in the event's history for auditability.
Main Functions
process/2— Entry point for processing a create event.build_transaction/3— Constructs the Ecto.Multi for transaction creation.handle_build_transaction/3— Adds event update step to the Multi.handle_transaction_map_error/3— Handles errors in transaction map conversion.handle_occ_final_timeout/2— Handles OCC retry exhaustion.
Summary
Functions
Builds the shared Ecto.Multi pipeline for both success and error flows.
Builds the Ecto.Multi for creating a transaction from an event.
Adds the step to mark the event as processed after transaction creation.
Handles the case when OCC retries are exhausted.
Handles errors that occur when converting event data to a transaction map.
Processes a create event by transforming it into a transaction in the ledger.
Processes a command with OCC retry logic.
Finalizes when retry attempts are exhausted.
Functions
@spec build_multi(module(), DoubleEntryLedger.Occ.Occable.t(), Ecto.Repo.t()) :: Ecto.Multi.t()
Builds the shared Ecto.Multi pipeline for both success and error flows.
:transaction_map– converts raw data to a map or returns an error tuple- merges in either:
handle_transaction_map_error/3when conversion failsbuild_transaction/4+handle_build_transaction/3on success
Parameters
module- the processor module implementing the callbacksoccable_item- the Command or TransactionCommandMap being processedrepo- the Ecto repo to use for DB ops
Returns
- an
Ecto.Multiready forrepo.transaction/1
Builds the Ecto.Multi for creating a transaction from an event.
Parameters
event: The event to process.transaction_map: The transaction data map derived from the event.repo: The Ecto repository.
Returns
- An
Ecto.Multithat inserts the transaction.
Adds the step to mark the event as processed after transaction creation.
Parameters
multi: The Ecto.Multi built so far.event: The event being processed._repo: The Ecto repository (unused).
Returns
- The updated
Ecto.Multiwith an:command_successupdate step.
Handles the case when OCC retries are exhausted.
Delegates to DoubleEntryLedger.Workers.CommandWorker.TransactionCommandResponseHandler.handle_occ_final_timeout/2.
Parameters
command_map: The event being processed.repo: The Ecto repository.
Returns
- An
Ecto.Multithat updates the event as dead letter or timed out.
Handles errors that occur when converting event data to a transaction map.
Delegates to DoubleEntryLedger.Workers.CommandWorker.TransactionCommandResponseHandler.handle_transaction_map_error/3.
Parameters
command_map: The event being processed.error: The error encountered during transaction map conversion.repo: The Ecto repository.
Returns
- An
Ecto.Multithat updates the event with error information.
@spec process(DoubleEntryLedger.Command.t(), Ecto.Repo.t()) :: DoubleEntryLedger.Workers.CommandWorker.success_tuple() | DoubleEntryLedger.Workers.CommandWorker.error_tuple()
Processes a create event by transforming it into a transaction in the ledger.
Takes an event with action: :create_transaction and attempts to transform its data into
a valid transaction. Handles the complete lifecycle of transaction creation,
including optimistic concurrency control, error handling, and event status updates.
Parameters
event: AnCommandstruct containing the transaction data to be processed.repo: (Optional) The Ecto repository to use for database operations, defaults toRepo.
Returns
{:ok, transaction, event}: When transaction creation succeeds.{:error, event}: When processing fails due to OCC timeout.{:error, changeset}: When there's a validation error or database error.{:error, reason}: When another error occurs, with a reason explaining the failure.
Processes a command with OCC retry logic.
Converts command data to a transaction map, builds an Ecto.Multi, and
retries on Ecto.StaleEntryError up to the configured maximum.
Parameters
occable_item: The command or command map to processrepo: The Ecto repository (defaults toRepo)
Returns
{:ok, %{transaction: Transaction.t(), command_success: Command.t()}}on success{:ok, %{command_failure: Command.t()}}on failureEcto.Multi.failure()on unrecoverable error
@spec retry( module(), DoubleEntryLedger.Occ.Occable.t(), DoubleEntryLedger.Command.ErrorMap.t(), non_neg_integer(), Ecto.Repo.t() ) :: {:ok, %{ transaction: DoubleEntryLedger.Transaction.t(), command_success: DoubleEntryLedger.Command.t() }} | {:ok, %{command_failure: DoubleEntryLedger.Command.t()}} | Ecto.Multi.failure()
Finalizes when retry attempts are exhausted.
Invokes Occable.timed_out/3 to mark the item as timed out, merges in
handle_occ_final_timeout/2, then runs one last transaction.
Parameters
module- the processor moduleoccable_item- the item that timed outerror_map- the accumulated errors and retry countrepo- the Ecto repo
Returns
- The result of the final
repo.transaction/1