DoubleEntryLedger.Stores.CommandStore (double_entry_ledger v0.4.0)

View Source

Provides functions for managing commands in the double-entry ledger system.

This module serves as the primary interface for all command-related operations, including creating, retrieving, processing, and querying commands. It manages the complete lifecycle of commands from creation through processing to completion or failure.

Key Functionality

  • Command Management: Create, retrieve, and track commands.
  • Command Processing: Claim commands for processing, mark commands as processed or failed.
  • Command Queries: Find commands by instance, transaction ID, account ID, or other criteria.
  • Error Handling: Track and manage errors that occur during command processing.

Usage Examples

Creating and processing a new command

Commands can be created and processed immediately or queued for asynchronous processing. If the command is processed immediately, it will create the associated transaction and update the command status. If processing fails, it will be queued and retried.

event_params = %{
  "instance_id" => instance.id,
  "action" => "create_transaction",
  "source" => "payment_system",
  "source_idempk" => "txn_123",
  "payload" => %{
    "status" => "pending",
    "entries" => [
      %{"account_id" => cash_account.id, "amount" => 100_00, "currency" => "USD"},
      %{"account_id" => revenue_account.id, "amount" => 100_00, "currency" => "USD"}
    ]
  }
}

# create and process the command immediately
{:ok, transaction, event} = DoubleEntryLedger.Apis.CommandApi.process_from_params(event_params)

# create command for asynchronous processing later
{:ok, event} = DoubleEntryLedger.Stores.CommandStore.create(event_params)

Retrieving commands for an instance

{:ok, {commands, meta}} = DoubleEntryLedger.Stores.CommandStore.list_for_instance(instance.id)

Retrieving commands for a transaction

{:ok, {commands, meta}} = DoubleEntryLedger.Stores.CommandStore.list_for_transaction(transaction.id)

Retrieving commands for an account

events = DoubleEntryLedger.Stores.CommandStore.list_all_for_account(account.id)

Process command without saving it in the CommandStore on error

If you want more control over error handling, you can process a command without saving it in the CommandStore on error. This allows you to handle the command processing logic without automatically persisting the command, which can be useful for debugging or custom error handling.

{:ok, transaction, event} = DoubleEntryLedger.Apis.CommandApi.process_from_params(event_params, [on_error: :fail])

Implementation Notes

  • The module implements optimistic concurrency control for command claiming and processing, ensuring that commands are processed exactly once even in high-concurrency environments.
  • All queries are paginated and ordered by insertion time descending for efficient retrieval.
  • Error handling is explicit, with clear return values for all failure modes.

Summary

Functions

Creates a new command in the database.

Retrieves a command by its unique ID.

Lists commands for an instance with cursor pagination via Flop.

Lists commands for a transaction with cursor pagination via Flop.

Functions

create(attrs)

Creates a new command in the database.

Parameters

  • attrs: Map of attributes for creating the command

Returns

  • {:ok, command}: If the command was successfully created
  • {:error, changeset}: If validation failed

Examples

iex> {:ok, instance} = InstanceStore.create(%{address: "Sample:Instance"})
iex> account_data = %{address: "Cash:Account", type: :asset, currency: :USD}
iex> {:ok, asset_account} = AccountStore.create(instance.address, account_data, "unique_id_123")
iex> {:ok, liability_account} = AccountStore.create(instance.address, %{account_data | address: "Liability:Account", type: :liability}, "unique_id_456")
iex> transaction_map = %TransactionCommandMap{
...>   instance_address: instance.address,
...>   action: :create_transaction,
...>   source: "from-somewhere",
...>   source_idempk: "unique_1234",
...>   payload: %{
...>     status: :pending,
...>     entries: [
...>       %{account_address: asset_account.address, amount: 100, currency: :USD},
...>       %{account_address: liability_account.address, amount: 100, currency: :USD}
...>     ]}}
iex>   {:ok, command} = CommandStore.create(transaction_map)
iex>  command.command_queue_item.status
:pending

get_by_id(id)

@spec get_by_id(Ecto.UUID.t()) :: DoubleEntryLedger.Command.t() | nil

Retrieves a command by its unique ID.

Returns the command if found, or nil if no command exists with the given ID.

Parameters

  • id: The UUID of the command to retrieve

Returns

  • Command.t(): The found command
  • nil: If no command with the given ID exists

get_by_instance_address_and_id(instance_address, id)

@spec get_by_instance_address_and_id(String.t(), Ecto.UUID.t()) ::
  DoubleEntryLedger.Command.t() | nil

list_for_instance(instance_or_id, flop_params \\ %{})

@spec list_for_instance(DoubleEntryLedger.Instance.t() | Ecto.UUID.t(), map()) ::
  {:ok, {[DoubleEntryLedger.Command.t()], Flop.Meta.t()}}
  | {:error, Flop.Meta.t()}

Lists commands for an instance with cursor pagination via Flop.

Accepts either an %Instance{} struct or its UUID string. Preloads :command_queue_item and :transaction on each command.

Parameters

  • instance_or_id (Instance.t() | Ecto.UUID.t()): Parent instance.

  • flop_params (map, optional): Flop params. No filterable fields (JSONB command_map filtering deferred).

Returns

  • {:ok, {[Command.t()], Flop.Meta.t()}} on success.
  • {:error, Flop.Meta.t()} on invalid params.

Examples

iex> {:ok, instance} = InstanceStore.create(%{address: "Sample:Instance"})
iex> account_data = %{address: "Cash:Account", type: :asset, currency: :USD}
iex> {:ok, a1} = AccountStore.create(instance.address, account_data, "u1")
iex> {:ok, a2} = AccountStore.create(instance.address, %{account_data | address: "Liab:Account", type: :liability}, "u2")
iex> create_attrs = %{status: :posted, entries: [
...>   %{account_address: a1.address, amount: 100, currency: :USD},
...>   %{account_address: a2.address, amount: 100, currency: :USD}]}
iex> {:ok, _} = TransactionStore.create(instance.address, create_attrs, "idem-1")
iex> {:ok, {commands, %Flop.Meta{}}} = CommandStore.list_for_instance(instance)
iex> length(commands)
3

list_for_transaction(transaction_or_id, flop_params \\ %{})

@spec list_for_transaction(DoubleEntryLedger.Transaction.t() | Ecto.UUID.t(), map()) ::
  {:ok, {[DoubleEntryLedger.Command.t()], Flop.Meta.t()}}
  | {:error, Flop.Meta.t()}

Lists commands for a transaction with cursor pagination via Flop.

Accepts either a %Transaction{} struct or its UUID string.

Parameters

  • transaction_or_id (Transaction.t() | Ecto.UUID.t()): Scoping transaction.

  • flop_params (map, optional): Flop params.

Returns

  • {:ok, {[Command.t()], Flop.Meta.t()}} on success.
  • {:error, Flop.Meta.t()} on invalid params.

Examples

iex> {:ok, instance} = InstanceStore.create(%{address: "Sample:Instance"})
iex> account_data = %{address: "Cash:Account", type: :asset, currency: :USD}
iex> {:ok, a1} = AccountStore.create(instance.address, account_data, "u1")
iex> {:ok, a2} = AccountStore.create(instance.address, %{account_data | address: "Liab:Account", type: :liability}, "u2")
iex> create_attrs = %{status: :pending, entries: [
...>   %{account_address: a1.address, amount: 100, currency: :USD},
...>   %{account_address: a2.address, amount: 100, currency: :USD}]}
iex> {:ok, %{id: id}} = TransactionStore.create(instance.address, create_attrs, "idem-2")
iex> TransactionStore.update(instance.address, id, %{status: :posted}, "idem-2")
iex> {:ok, {commands, %Flop.Meta{}}} = CommandStore.list_for_transaction(id)
iex> length(commands)
2