Trogon.Commanded.ConsistencyPolicy (Trogon.Commanded v1.0.0)

Copy Markdown View Source

Eventual consistency policy for projection queries with read-after-write guarantees.

Waits for projections to reach a required consistency level before returning query results, enabling proper read-after-write semantics in eventually consistent systems.

Policy types

  • MinVersionPolicy - Retries until projection.version >= required_version
  • ExactVersionPolicy - Returns only if projection.version == required_version

Examples

alias Trogon.Commanded.ConsistencyPolicy
alias Trogon.Commanded.ConsistencyPolicy.{MinVersionPolicy, ExactVersionPolicy, VersionedData}

# Integer milliseconds
policy = MinVersionPolicy.new!(%{version: 5, timeout: 5000, delay: 100})

# Or Duration
policy = MinVersionPolicy.new!(%{
  version: 5,
  timeout: Duration.new!(second: 5),
  delay: 100
})

{:ok, order} = ConsistencyPolicy.run(policy, fn ->
  order = Repo.get(OrderProjection, order_id)
  stream_version = order.version
  {:ok, VersionedData.new(stream_version, order)}
end)

# No consistency requirement
{:ok, data} = ConsistencyPolicy.run(nil, fn ->
  result = Repo.get(OrderProjection, order_id)
  {:ok, VersionedData.new(0, result)}
end)

Summary

Types

Callback function triggered by the policy.

Positive integer representing milliseconds.

Policy or nil (no consistency check).

t()

MinVersionPolicy or ExactVersionPolicy.

Event stream version (projection position).

Functions

Build policy from protobuf Consistency.

Run a query under this policy.

Normalize timeout/delay to Duration.

Convert policy to protobuf Consistency.

Types

callback_fn()

@type callback_fn() :: (-> {:ok, Trogon.Commanded.ConsistencyPolicy.VersionedData.t()}
                     | {:error, term()})

Callback function triggered by the policy.

milliseconds()

@type milliseconds() :: pos_integer()

Positive integer representing milliseconds.

policy()

@type policy() :: t() | nil

Policy or nil (no consistency check).

t()

MinVersionPolicy or ExactVersionPolicy.

version()

@type version() :: non_neg_integer()

Event stream version (projection position).

Functions

from_proto(proto)

Build policy from protobuf Consistency.

Returns nil if proto is nil or has no requirement. Dispatches to the appropriate policy based on the requirement type.

run(policy, callback_fn)

Run a query under this policy.

Retries until consistency requirements are met or timeout.

to_duration(ms)

@spec to_duration(milliseconds() | Duration.t()) :: Duration.t()

Normalize timeout/delay to Duration.

Accepts a positive integer (milliseconds) or a Duration struct.

to_proto(policy)

Convert policy to protobuf Consistency.

Dispatches to the appropriate policy's to_proto/1.