DoubleEntryLedger.CommandQueue.Scheduling (double_entry_ledger v0.4.0)

View Source

Provides scheduling helpers for commands in the command queue.

It exposes functions that manage the full lifecycle of a command in the queue:

  • Scheduling and retrying failed commands with exponential backoff
  • Managing transitions between different command states (pending, processing, failed, dead letter)
  • Handling special cases like updates waiting for create commands
  • Adding errors and tracking retry attempts

The scheduling system uses configurable parameters:

  • Maximum number of retries before a command is sent to dead letter
  • Base delay for first retry attempt
  • Maximum delay cap to prevent excessive wait times
  • Jitter to prevent thundering herd problems during retries

Summary

Functions

Builds a changeset to mark a command as permanently failed (dead letter).

Builds a changeset to mark a command as processed.

Builds a changeset to revert a command to the pending state.

Builds a changeset to schedule a retry for a failed command.

Builds a changeset to schedule the retry of an update command that depends on a failed create command.

Claims a command for processing by marking it as being processed by a specific processor.

Sets the next retry time for a failed command using exponential backoff.

Functions

build_mark_as_dead_letter(command, error)

@spec build_mark_as_dead_letter(DoubleEntryLedger.Command.t(), String.t()) ::
  Ecto.Changeset.t()

Builds a changeset to mark a command as permanently failed (dead letter).

This is used when a command has failed terminally and should not be retried. Adds the provided error message to the command's errors and sets the status to :dead_letter.

Parameters

  • command - The command to mark as dead letter
  • error - The error message explaining why the command is being marked as dead letter

Returns

  • Ecto.Changeset.t() - The changeset for updating the command

build_mark_as_processed(command)

Builds a changeset to mark a command as processed.

This function updates the queue item's status to :processed and records completion metadata.

Parameters

  • command - The Command struct to update

Returns

  • Ecto.Changeset.t() - The changeset for marking the command as processed

build_revert_to_pending(command, error)

@spec build_revert_to_pending(DoubleEntryLedger.Command.t(), any()) ::
  Ecto.Changeset.t()

Builds a changeset to revert a command to the pending state.

Adds the provided error message to the queue item's errors list and changes the status to :pending to allow it to be reprocessed.

Parameters

  • command - The command to revert to pending state
  • error - The error message to add to the command's errors

Returns

  • Ecto.Changeset.t() - The changeset for updating the command

build_schedule_retry_with_reason(command, error, status)

@spec build_schedule_retry_with_reason(
  DoubleEntryLedger.Command.t(),
  String.t() | nil,
  DoubleEntryLedger.CommandQueueItem.state()
) :: Ecto.Changeset.t()

Builds a changeset to schedule a retry for a failed command.

Handles both normal retries and terminal failures (dead letter):

  • If the retry count exceeds the configured maximum, marks as dead letter
  • Otherwise, calculates the next retry time using exponential backoff
  • Sets the appropriate command status, clears processor reference, and adds the error

Parameters

  • command - The command that needs to be retried
  • error - The error message to add to the command's errors
  • status - The status to set (usually :failed)

Returns

  • Ecto.Changeset.t() - The changeset for updating the command

build_schedule_update_retry(command, error)

Builds a changeset to schedule the retry of an update command that depends on a failed create command.

Ensures that update commands don't retry before their prerequisite create commands by scheduling them after the create command's next retry time.

Parameters

  • command - The update command that needs to be retried
  • error - An UpdateCommandError struct containing the create command and error details

Returns

  • Ecto.Changeset.t() - The changeset for updating the command

claim_command_for_processing(id, processor_id, repo \\ Repo)

@spec claim_command_for_processing(Ecto.UUID.t(), String.t(), Ecto.Repo.t()) ::
  {:ok, DoubleEntryLedger.Command.t()} | {:error, atom()}

Claims a command for processing by marking it as being processed by a specific processor.

This function implements optimistic concurrency control to ensure that only one processor can claim a command at a time. It only allows claiming commands with status :pending or :occ_timeout.

Parameters

  • id: The UUID of the command to claim
  • processor_id: A string identifier for the processor claiming the command (defaults to "manual")
  • repo: The Ecto repository to use (defaults to Repo)

Returns

  • {:ok, command}: If the command was successfully claimed
  • {:error, :command_not_found}: If no command with the given ID exists
  • {:error, :command_already_claimed}: If the command was claimed by another processor
  • {:error, :command_not_claimable}: If the command is not in a claimable state (not pending or occ_timeout)

mark_as_dead_letter(command, error, repo \\ Repo)

@spec mark_as_dead_letter(DoubleEntryLedger.Command.t(), String.t(), Ecto.Repo.t()) ::
  {:error, DoubleEntryLedger.Command.t()} | {:error, Ecto.Changeset.t()}

schedule_retry_with_reason(command, reason, status, repo \\ Repo)

Sets the next retry time for a failed command using exponential backoff.

Parameters

  • command - The command that failed and needs retry scheduling
  • error - The error message or reason for failure
  • status - The status to set for the command (defaults to :failed)

Returns

  • {:error, updated_command} - The command with updated retry information
  • {:error, changeset} - Error updating the command