DoubleEntryLedger.CommandQueue.Scheduling (double_entry_ledger v0.2.0)
View SourceProvides scheduling helpers for commands in the command queue.
It exposes functions that manage the full lifecycle of a command in the queue:
- Scheduling and retrying failed commands with exponential backoff
- Managing transitions between different command states (pending, processing, failed, dead letter)
- Handling special cases like updates waiting for create commands
- Adding errors and tracking retry attempts
The scheduling system uses configurable parameters:
- Maximum number of retries before a command is sent to dead letter
- Base delay for first retry attempt
- Maximum delay cap to prevent excessive wait times
- Jitter to prevent thundering herd problems during retries
Summary
Functions
Builds a changeset to mark a command as permanently failed (dead letter).
Builds a changeset to mark a command as processed.
Builds a changeset to revert a command to the pending state.
Builds a changeset to schedule a retry for a failed command.
Builds a changeset to schedule the retry of an update command that depends on a failed create command.
Claims a command for processing by marking it as being processed by a specific processor.
Sets the next retry time for a failed command using exponential backoff.
Functions
@spec build_mark_as_dead_letter(DoubleEntryLedger.Command.t(), String.t()) :: Ecto.Changeset.t()
Builds a changeset to mark a command as permanently failed (dead letter).
This is used when a command has failed terminally and should not be retried.
Adds the provided error message to the command's errors and sets the status
to :dead_letter.
Parameters
command- The command to mark as dead lettererror- The error message explaining why the command is being marked as dead letter
Returns
Ecto.Changeset.t()- The changeset for updating the command
@spec build_mark_as_processed(DoubleEntryLedger.Command.t()) :: Ecto.Changeset.t(DoubleEntryLedger.Command.t())
Builds a changeset to mark a command as processed.
This function updates the queue item's status to :processed and records completion metadata.
Parameters
command- The Command struct to update
Returns
Ecto.Changeset.t()- The changeset for marking the command as processed
@spec build_revert_to_pending(DoubleEntryLedger.Command.t(), any()) :: Ecto.Changeset.t()
Builds a changeset to revert a command to the pending state.
Adds the provided error message to the queue item's errors list and
changes the status to :pending to allow it to be reprocessed.
Parameters
command- The command to revert to pending stateerror- The error message to add to the command's errors
Returns
Ecto.Changeset.t()- The changeset for updating the command
@spec build_schedule_retry_with_reason( DoubleEntryLedger.Command.t(), String.t() | nil, DoubleEntryLedger.CommandQueueItem.state() ) :: Ecto.Changeset.t()
Builds a changeset to schedule a retry for a failed command.
Handles both normal retries and terminal failures (dead letter):
- If the retry count exceeds the configured maximum, marks as dead letter
- Otherwise, calculates the next retry time using exponential backoff
- Sets the appropriate command status, clears processor reference, and adds the error
Parameters
command- The command that needs to be retriederror- The error message to add to the command's errorsstatus- The status to set (usually :failed)
Returns
Ecto.Changeset.t()- The changeset for updating the command
@spec build_schedule_update_retry( DoubleEntryLedger.Command.t(), DoubleEntryLedger.Workers.CommandWorker.UpdateCommandError.t() ) :: Ecto.Changeset.t()
Builds a changeset to schedule the retry of an update command that depends on a failed create command.
Ensures that update commands don't retry before their prerequisite create commands by scheduling them after the create command's next retry time.
Parameters
command- The update command that needs to be retriederror- An UpdateCommandError struct containing the create command and error details
Returns
Ecto.Changeset.t()- The changeset for updating the command
@spec claim_command_for_processing(Ecto.UUID.t(), String.t(), Ecto.Repo.t()) :: {:ok, DoubleEntryLedger.Command.t()} | {:error, atom()}
Claims a command for processing by marking it as being processed by a specific processor.
This function implements optimistic concurrency control to ensure that only one processor can claim a command at a time. It only allows claiming commands with status :pending or :occ_timeout.
Parameters
id: The UUID of the command to claimprocessor_id: A string identifier for the processor claiming the command (defaults to "manual")repo: The Ecto repository to use (defaults to Repo)
Returns
{:ok, command}: If the command was successfully claimed{:error, :command_not_found}: If no command with the given ID exists{:error, :command_already_claimed}: If the command was claimed by another processor{:error, :command_not_claimable}: If the command is not in a claimable state (not pending or occ_timeout)
@spec mark_as_dead_letter(DoubleEntryLedger.Command.t(), String.t(), Ecto.Repo.t()) :: {:error, DoubleEntryLedger.Command.t()} | {:error, Ecto.Changeset.t()}
@spec schedule_retry_with_reason( DoubleEntryLedger.Command.t(), String.t(), DoubleEntryLedger.CommandQueueItem.state(), Ecto.Repo.t() ) :: {:error, DoubleEntryLedger.Command.t()} | {:error, Ecto.Changeset.t()}
Sets the next retry time for a failed command using exponential backoff.
Parameters
command- The command that failed and needs retry schedulingerror- The error message or reason for failurestatus- The status to set for the command (defaults to:failed)
Returns
{:error, updated_command}- The command with updated retry information{:error, changeset}- Error updating the command