DoubleEntryLedger.Workers.CommandWorker.CreateTransactionCommand (double_entry_ledger v0.4.0)

View Source

Handles the processing of creation events in the double-entry ledger system.

This module is responsible for transforming event data into ledger transactions and creating those transactions within the accounting system. It implements optimistic concurrency control (OCC) to handle potential conflicts when multiple processes attempt to create transactions simultaneously.

Workflow

  1. Receives an event with action: :create_transaction
  2. Transforms the event's transaction data into a valid transaction map
  3. Attempts to create a transaction in the database
  4. If successful, marks the event as processed and links it to the created transaction
  5. If unsuccessful due to concurrency issues, implements retry logic
  6. If unsuccessful due to other errors, marks the event as failed

Error Handling

The module implements comprehensive error handling with specific error types:

  • Transaction map transformation errors
  • Database transaction errors
  • OCC retry exhaustion errors

Each error is recorded in the event's history for auditability.

Main Functions

Summary

Functions

Builds the shared Ecto.Multi pipeline for both success and error flows.

Builds the Ecto.Multi for creating a transaction from an event.

Adds the step to mark the event as processed after transaction creation.

Handles the case when OCC retries are exhausted.

Handles errors that occur when converting event data to a transaction map.

Processes a create event by transforming it into a transaction in the ledger.

Processes a command with OCC retry logic.

Finalizes when retry attempts are exhausted.

Functions

build_multi(module, occable_item, repo)

Builds the shared Ecto.Multi pipeline for both success and error flows.

  1. :transaction_map – converts raw data to a map or returns an error tuple
  2. merges in either:

Parameters

  • module - the processor module implementing the callbacks
  • occable_item - the Command or TransactionCommandMap being processed
  • repo - the Ecto repo to use for DB ops

Returns

build_transaction(command, transaction_map, instance_id, repo)

Builds the Ecto.Multi for creating a transaction from an event.

Parameters

  • event: The event to process.
  • transaction_map: The transaction data map derived from the event.
  • repo: The Ecto repository.

Returns

handle_build_transaction(multi, command, repo)

Adds the step to mark the event as processed after transaction creation.

Parameters

  • multi: The Ecto.Multi built so far.
  • event: The event being processed.
  • _repo: The Ecto repository (unused).

Returns

  • The updated Ecto.Multi with an :command_success update step.

handle_occ_final_timeout(command_map, repo)

Handles the case when OCC retries are exhausted.

Delegates to DoubleEntryLedger.Workers.CommandWorker.TransactionCommandResponseHandler.handle_occ_final_timeout/2.

Parameters

  • command_map: The event being processed.
  • repo: The Ecto repository.

Returns

  • An Ecto.Multi that updates the event as dead letter or timed out.

handle_transaction_map_error(command_map, error, repo)

Handles errors that occur when converting event data to a transaction map.

Delegates to DoubleEntryLedger.Workers.CommandWorker.TransactionCommandResponseHandler.handle_transaction_map_error/3.

Parameters

  • command_map: The event being processed.
  • error: The error encountered during transaction map conversion.
  • repo: The Ecto repository.

Returns

  • An Ecto.Multi that updates the event with error information.

process(command, repo \\ Repo)

Processes a create event by transforming it into a transaction in the ledger.

Takes an event with action: :create_transaction and attempts to transform its data into a valid transaction. Handles the complete lifecycle of transaction creation, including optimistic concurrency control, error handling, and event status updates.

Parameters

  • event: An Command struct containing the transaction data to be processed.
  • repo: (Optional) The Ecto repository to use for database operations, defaults to Repo.

Returns

  • {:ok, transaction, event}: When transaction creation succeeds.
  • {:error, event}: When processing fails due to OCC timeout.
  • {:error, changeset}: When there's a validation error or database error.
  • {:error, reason}: When another error occurs, with a reason explaining the failure.

process_with_retry(occable_item, repo \\ Repo)

Processes a command with OCC retry logic.

Converts command data to a transaction map, builds an Ecto.Multi, and retries on Ecto.StaleEntryError up to the configured maximum.

Parameters

  • occable_item: The command or command map to process
  • repo: The Ecto repository (defaults to Repo)

Returns

  • {:ok, %{transaction: Transaction.t(), command_success: Command.t()}} on success
  • {:ok, %{command_failure: Command.t()}} on failure
  • Ecto.Multi.failure() on unrecoverable error

retry(module, occable_item, error_map, attempts, repo)

Finalizes when retry attempts are exhausted.

Invokes Occable.timed_out/3 to mark the item as timed out, merges in handle_occ_final_timeout/2, then runs one last transaction.

Parameters

  • module - the processor module
  • occable_item - the item that timed out
  • error_map - the accumulated errors and retry count
  • repo - the Ecto repo

Returns

  • The result of the final repo.transaction/1