Eventual consistency policy for projection queries with read-after-write guarantees.
Waits for projections to reach a required consistency level before returning query results, enabling proper read-after-write semantics in eventually consistent systems.
Policy types
MinVersionPolicy- Retries untilprojection.version >= required_versionExactVersionPolicy- Returns only ifprojection.version == required_version
Examples
alias Trogon.Commanded.ConsistencyPolicy
alias Trogon.Commanded.ConsistencyPolicy.{MinVersionPolicy, ExactVersionPolicy, VersionedData}
# Integer milliseconds
policy = MinVersionPolicy.new!(%{version: 5, timeout: 5000, delay: 100})
# Or Duration
policy = MinVersionPolicy.new!(%{
version: 5,
timeout: Duration.new!(second: 5),
delay: 100
})
{:ok, order} = ConsistencyPolicy.run(policy, fn ->
order = Repo.get(OrderProjection, order_id)
stream_version = order.version
{:ok, VersionedData.new(stream_version, order)}
end)
# No consistency requirement
{:ok, data} = ConsistencyPolicy.run(nil, fn ->
result = Repo.get(OrderProjection, order_id)
{:ok, VersionedData.new(0, result)}
end)
Summary
Types
Callback function triggered by the policy.
Positive integer representing milliseconds.
Policy or nil (no consistency check).
MinVersionPolicy or ExactVersionPolicy.
Event stream version (projection position).
Functions
Build policy from protobuf Consistency.
Run a query under this policy.
Normalize timeout/delay to Duration.
Convert policy to protobuf Consistency.
Types
@type callback_fn() :: (-> {:ok, Trogon.Commanded.ConsistencyPolicy.VersionedData.t()} | {:error, term()})
Callback function triggered by the policy.
@type milliseconds() :: pos_integer()
Positive integer representing milliseconds.
@type policy() :: t() | nil
Policy or nil (no consistency check).
@type t() :: Trogon.Commanded.ConsistencyPolicy.MinVersionPolicy.t() | Trogon.Commanded.ConsistencyPolicy.ExactVersionPolicy.t()
MinVersionPolicy or ExactVersionPolicy.
@type version() :: non_neg_integer()
Event stream version (projection position).
Functions
@spec from_proto(TrogonProto.Consistency.V1Alpha1.Consistency.t() | nil) :: policy()
Build policy from protobuf Consistency.
Returns nil if proto is nil or has no requirement.
Dispatches to the appropriate policy based on the requirement type.
@spec run(policy(), callback_fn()) :: {:ok, term()} | {:error, Trogon.Commanded.ConsistencyPolicy.TimeoutError.t()} | {:error, Trogon.Commanded.ConsistencyPolicy.VersionMismatchError.t()} | {:error, term()}
Run a query under this policy.
Retries until consistency requirements are met or timeout.
@spec to_duration(milliseconds() | Duration.t()) :: Duration.t()
Normalize timeout/delay to Duration.
Accepts a positive integer (milliseconds) or a Duration struct.
@spec to_proto(t()) :: TrogonProto.Consistency.V1Alpha1.Consistency.t()
Convert policy to protobuf Consistency.
Dispatches to the appropriate policy's to_proto/1.