IdempotencyPlug.RequestTracker (IdempotencyPlug v0.2.2)

Copy Markdown View Source

A GenServer that tracks request processes to ensure each request is processed at most once.

Storage

A first-time tracked request will store {:processing, {node, pid}} request state with an expiration date for the provided request ID. Once the request has completed, put_response/3 must be called to update the cached response. The response will be stored as {:ok, data} with an expiration date.

The process for a tracked request may halt unexpectedly (e.g. due to raised exception). This module will track the terminated process and store the value as {:halted, reason}.

By default, all cached responses will be removed after 24 hours. Configure with :cache_ttl.

Lookup

For subsequent requests, the state of the first-time tracked request will be returned in the format of {:cache, {:ok, data}, expires_at}.

If the request payload fingerprint differs, {:mismatch, {:fingerprint, fingerprint}, expires_at} is returned.

If the request is identical to a first-time request that has not yet completed, {:processing, {node, pid}, expires_at} is returned.

If the request unexpectedly terminated, {:cache, {:halted, reason}, expires_at} is returned.

Options

  • :cache_ttl - the TTL in milliseconds for any objects in the cache store. Defaults to 24 hours.

  • :prune - the interval in milliseconds to prune the cache store for expired objects. Defaults to 60 seconds.

  • :store - the cache store module to use to store the cache objects. Defaults to {IdempotencyPlug.ETSStore, [table: Elixir.IdempotencyPlug.RequestTracker]}.

Telemetry events

The following events are emitted by the tracker:

  • [:idempotency_plug, :request_tracker, :cache_miss] - dispatched when a request was not found in the cache

    • Measurement: %{system_time: integer()}
    • Metadata: %{request_id: binary(), fingerprint: binary(), store: atom(), expires_at: DateTime.t() | nil, result: :ok | :error, reason: term() | nil}

  • [:idempotency_plug, :request_tracker, :cache_hit] - dispatched when a request was found in the cache

    • Measurement: %{system_time: integer()}
    • Metadata: %{request_id: binary(), fingerprint: binary(), store: atom(), expires_at: DateTime.t(), result: :processing | :halted | :ok | :mismatch, reason: term() | nil}

  • [:idempotency_plug, :request_tracker, :prune, :start] - dispatched before the cache is pruned

    • Measurement: %{monotonic_time: integer(), system_time: integer()}
    • Metadata: %{store: atom()}
  • [:idempotency_plug, :request_tracker, :prune, :exception] - dispatched on exceptions during cache pruning

    • Measurement: %{monotonic_time: integer(), duration: integer()}
    • Metadata: %{store: atom(), kind: :throw | :error | :exit, reason: term(), stacktrace: list()}

  • [:idempotency_plug, :request_tracker, :prune, :stop] - dispatched after successfully pruning the cache

    • Measurement: %{monotonic_time: integer(), duration: integer()}
    • Metadata: %{store: atom()}

For :cache_miss telemetry events, :result indicates whether the insert succeeded:

  • :ok - entry inserted successfully
  • :error - store failed to insert entry

For :cache_hit telemetry events, :result indicates the lookup outcome:

  • :processing — entry found and the original request process is still running
  • :halted — entry found but the original request crashed
  • :ok — entry found with matching fingerprint
  • :mismatch — entry found but the payload fingerprint differs

The :reason field carries additional context depending on :result:

  • :cache_miss - :error result contains the error from the failed insert attempt
  • :cache_hit - :halted result contains the exit reason of the original request process
  • Otherwise :reason is nil

Examples

children = [
  {
    IdempotencyPlug.RequestTracker,
      cache_ttl: :timer.hours(6),
      prune: :timer.minutes(1),
      store: {IdempotencyPlug.EctoStore, repo: MyApp.Repo}
  }
]

Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)

Summary

Functions

Returns a specification to start this module under a supervisor.

Updates the state for a given request ID.

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

put_response(name_or_pid, request_id, response)

@spec put_response(GenServer.server(), binary(), term()) ::
  {:ok, DateTime.t()} | {:error, term()}

Updates the state for a given request ID.

start_link(opts)

track(name_or_pid, request_id, fingerprint)

@spec track(GenServer.server(), binary(), binary()) ::
  {:error, term()}
  | {:init, binary(), DateTime.t()}
  | {:mismatch, {:fingerprint, binary()}, DateTime.t()}
  | {:processing, {node(), pid()}, DateTime.t()}
  | {:cache, {:ok, term()}, DateTime.t()}
  | {:cache, {:halted, term()}, DateTime.t()}

Tracks a request ID.

This function will return {:init, key, expires_at} for first-time requests. Subsequent requests will return the request state. If the request payload fingerprint differs from what was stored, an error is returned.

Emits :cache_hit and :cache_miss telemetry events. See the module documentation for details.