A GenServer that tracks request processes to ensure each request is processed at most once.
Storage
A first-time tracked request will store {:processing, {node, pid}} request
state with an expiration date for the provided request ID. Once the request
has completed, put_response/3 must be called to update the cached response.
The response will be stored as {:ok, data} with an expiration date.
The process for a tracked request may halt unexpectedly (e.g. due to raised
exception). This module will track the terminated process and store the value
as {:halted, reason}.
By default, all cached responses will be removed after 24 hours. Configure
with :cache_ttl.
Lookup
For subsequent requests, the state of the first-time tracked request will be
returned in the format of {:cache, {:ok, data}, expires_at}.
If the request payload fingerprint differs,
{:mismatch, {:fingerprint, fingerprint}, expires_at} is returned.
If the request is identical to a first-time request that has not yet
completed, {:processing, {node, pid}, expires_at} is returned.
If the request unexpectedly terminated,
{:cache, {:halted, reason}, expires_at} is returned.
Options
:cache_ttl- the TTL in milliseconds for any objects in the cache store. Defaults to 24 hours.:prune- the interval in milliseconds to prune the cache store for expired objects. Defaults to 60 seconds.:store- the cache store module to use to store the cache objects. Defaults to{IdempotencyPlug.ETSStore, [table: Elixir.IdempotencyPlug.RequestTracker]}.
Telemetry events
The following events are emitted by the tracker:
[:idempotency_plug, :request_tracker, :cache_miss]- dispatched when a request was not found in the cache- Measurement:
%{system_time: integer()} Metadata:
%{request_id: binary(), fingerprint: binary(), store: atom(), expires_at: DateTime.t() | nil, result: :ok | :error, reason: term() | nil}
- Measurement:
[:idempotency_plug, :request_tracker, :cache_hit]- dispatched when a request was found in the cache- Measurement:
%{system_time: integer()} Metadata:
%{request_id: binary(), fingerprint: binary(), store: atom(), expires_at: DateTime.t(), result: :processing | :halted | :ok | :mismatch, reason: term() | nil}
- Measurement:
[:idempotency_plug, :request_tracker, :prune, :start]- dispatched before the cache is pruned- Measurement:
%{monotonic_time: integer(), system_time: integer()} - Metadata:
%{store: atom()}
- Measurement:
[:idempotency_plug, :request_tracker, :prune, :exception]- dispatched on exceptions during cache pruning- Measurement:
%{monotonic_time: integer(), duration: integer()} Metadata:
%{store: atom(), kind: :throw | :error | :exit, reason: term(), stacktrace: list()}
- Measurement:
[:idempotency_plug, :request_tracker, :prune, :stop]- dispatched after successfully pruning the cache- Measurement:
%{monotonic_time: integer(), duration: integer()} - Metadata:
%{store: atom()}
- Measurement:
For :cache_miss telemetry events, :result indicates whether the insert succeeded:
:ok- entry inserted successfully:error- store failed to insert entry
For :cache_hit telemetry events, :result indicates the lookup outcome:
:processing— entry found and the original request process is still running:halted— entry found but the original request crashed:ok— entry found with matching fingerprint:mismatch— entry found but the payload fingerprint differs
The :reason field carries additional context depending on :result:
:cache_miss-:errorresult contains the error from the failed insert attempt:cache_hit-:haltedresult contains the exit reason of the original request process- Otherwise
:reasonisnil
Examples
children = [
{
IdempotencyPlug.RequestTracker,
cache_ttl: :timer.hours(6),
prune: :timer.minutes(1),
store: {IdempotencyPlug.EctoStore, repo: MyApp.Repo}
}
]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
Summary
Functions
Returns a specification to start this module under a supervisor.
Updates the state for a given request ID.
Tracks a request ID.
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec put_response(GenServer.server(), binary(), term()) :: {:ok, DateTime.t()} | {:error, term()}
Updates the state for a given request ID.
@spec track(GenServer.server(), binary(), binary()) :: {:error, term()} | {:init, binary(), DateTime.t()} | {:mismatch, {:fingerprint, binary()}, DateTime.t()} | {:processing, {node(), pid()}, DateTime.t()} | {:cache, {:ok, term()}, DateTime.t()} | {:cache, {:halted, term()}, DateTime.t()}
Tracks a request ID.
This function will return {:init, key, expires_at} for first-time requests.
Subsequent requests will return the request state. If the request payload
fingerprint differs from what was stored, an error is returned.
Emits :cache_hit and :cache_miss telemetry events. See the module
documentation for details.