DurableServer.ObjectStore (durable_server v0.1.1)

Manages Tigris bucket operations for Fly apps.

Focused solely on bucket management and credential generation. For object operations, use Req + ReqS3 functionality directly.

Consistency

All operations are consistent by default. Consistent operations send then x-tigris-consistent true header. This guarantees consistency with reads and writes, regardless of the client region. Clients can opt into local region request with consistent: false on a case by case basis, to favor speed and reduced latency at the cost of consistency on a base-by-base basis.

Summary

Functions

Copies an object within S3 (used for moving to trash).

Creates a bucket for a Fly app. Returns {:ok, bucket_info} on success, or {:error, reason} if creation fails.

Creates a bucket for a Fly app and generates credentials for it in a single operation. This is a convenience function that combines create_bucket/1 and generate_bucket_credentials/1.

Deletes a bucket. This will fail if the bucket is not empty. Returns :ok on success, or {:error, reason} if deletion fails.

Deletes an object from S3.

Generates bucket-scoped credentials for a Fly app to access its own bucket.

Gets an object from storage.

Gets detailed information about a specific IAM policy including the policy document.

Streams all object keys in a bucket with optional prefix filtering.

Lists IAM policies that match a given bucket name pattern.

Lists all buckets in the account. Returns {:ok, buckets} on success, or {:error, reason} if listing fails.

Lists objects in a bucket with optional prefix filtering.

Attempts to claim a key in a bucket using a CAS (Compare-And-Swap) operation. Uses an if-match header to ensure the key doesn't exist when creating.

Atomically updates an object using etag-based conflict resolution.

Functions

copy_object(client, source_bucket, source_key, dest_bucket, dest_key)

Copies an object within S3 (used for moving to trash).

create_bucket(client, bucket_name, opts \\ [])

Creates a bucket for a Fly app. Returns {:ok, bucket_info} on success, or {:error, reason} if creation fails.

Note: If the bucket already exists, this function will return an error, which can be handled by the caller.

create_bucket_with_credentials(client, bucket_name)

Creates a bucket for a Fly app and generates credentials for it in a single operation. This is a convenience function that combines create_bucket/1 and generate_bucket_credentials/1.

Returns:

  • = scoped_obj_store} on success
  • } if bucket creation fails
  • } if credential generation fails

delete_bucket(client, bucket_name)

Deletes a bucket. This will fail if the bucket is not empty. Returns :ok on success, or {:error, reason} if deletion fails.

delete_object(client, key)

Deletes an object from S3.

ensure_bucket_exists(client)

generate_bucket_credentials(client, bucket_name)

Generates bucket-scoped credentials for a Fly app to access its own bucket.

This implementation uses Tigris IAM API to:

  1. Create an access key using CreateAccessKey
  2. Create a policy using CreatePolicy to restrict access to the specified bucket
  3. Attach the policy to the access key using AttachUserPolicy

Returns {:ok, credentials} on success, or {:error, reason} if creation fails.

get_object(client, key, opts \\ [])

Gets an object from storage.

Options

:consistent - whether to make a consistent request to the default region. (Default true)

Examples

iex> get_object(ObjectStore.new(), "my-key")
{:ok, %{body: "my-value", etag: "..."}

Returns of a map of the form %{body: body, etag: etag} or {:error, reason}.

get_policy_details(client, policy_arn)

Gets detailed information about a specific IAM policy including the policy document.

Uses the GetPolicy and GetPolicyVersion API operations to retrieve full policy details.

Examples

iex> get_policy_details("arn:aws:iam::123456789012:policy/bucket-policy-my-bucket-abc123")
{:ok, %{
  arn: "arn:aws:iam::123456789012:policy/bucket-policy-my-bucket-abc123",
  name: "bucket-policy-my-bucket-abc123",
  document: "{"Version":"2012-10-17","Statement":[...]}"
}}

list_all_objects_stream(client, prefix, opts \\ [])

Streams all object keys in a bucket with optional prefix filtering.

CAUTION: use this with care as since this will stream over every matching object in the bucket. While the stream will efficiently enumerable all objects without loading them all into memory at a time, it can still enumerate the entire object space on an eager match.

Returns a Stream that automatically handles pagination using continuation tokens. This is memory-efficient for large buckets as it only loads one page at a time.

Options

  • :error_handler - Function to handle errors. Receives error reason and should return :halt to stop the stream or :continue to skip the error. Defaults to raising the error.

Examples

# Stream all keys with prefix
ObjectStore.list_all_objects_stream(store, "my-prefix/")
|> Stream.take(100)
|> Enum.map(fn %{key: key, etag: etag} = _obj -> ... end)

# With custom error handling
ObjectStore.list_all_objects_stream(store, "prefix/",
  error_handler: fn error_reason ->
    Logger.warning("List error: #{inspect(error_reason)}")
    :continue
  end)

list_bucket_policies(client, bucket_name)

Lists IAM policies that match a given bucket name pattern.

Uses the ListPolicies API operation to find policies related to a bucket.

Examples

iex> ObjectStore.list_bucket_policies("my-bucket")
{:ok, [
  %{arn: "arn:aws:iam::123456789012:policy/bucket-policy-my-bucket-abc123", name: "bucket-policy-my-bucket-abc123"}
]}

list_buckets(client)

Lists all buckets in the account. Returns {:ok, buckets} on success, or {:error, reason} if listing fails.

list_objects(client, prefix, opts \\ [])

Lists objects in a bucket with optional prefix filtering.

Options

:consistent - whether to make a consistent request to the default region. (Default true)

new(opts)

new(client, opts)

new_req(client, opts \\ [])

put_object(client, key, data, opts \\ [])

Puts an object to S3.

Options

  • :max_retries - The maximum number of times to retry put. Default 0.
  • :etag - The existing etag to match. Conflicts return {:error, :conflict}
  • :timeout - Total time in ms for the operation including retries. If exceeded, no further retries will be attempted. Default: no timeout (unlimited retries until max_retries).

try_claim(client, key, body)

Attempts to claim a key in a bucket using a CAS (Compare-And-Swap) operation. Uses an if-match header to ensure the key doesn't exist when creating.

Args:

  • req: A configured %ObjectStore{} client
  • key: The key to claim
  • body: The content to write if claim succeeds

Returns:

  • } if successful
  • if key exists
  • for other failures

update_object(client, key, update_fn, opts \\ [])

Atomically updates an object using etag-based conflict resolution.

Uses a read-modify-write pattern with etag verification to avoid conflicts. If a conflict is detected, it will retry up to the specified maximum retries.

  • key – The object key to update
  • update_fn - Function that takes current data and returns {:ok, new_data} or {:error, reason}
    • To proceed with write, return {:ok, new_data}
    • To abort write, return {:error, reason}

Options

  • :timeout - Operation timeout (default: :infinity)
  • :max_retries - Maximum number of retry attempts (default: 5)
  • :consistent - Use consistent reads (default: true)
  • :content_type - Content type for the object (default: "application/octet-stream")
  • :task_supervisor - Task supervisor for async operations (default: uses client.task_supervisor)

Returns:

  • } on successful update
  • if the key doesn't exist
  • if retries are exhausted
  • for other failures

Examples

store = ObjectStore.new()
ObjectStore.update_object(store, "my-key", fn %{body: current_data, etag: current_etag} ->
  updated = current_data <> " - updated"
  {:ok, updated}
end, timeout: :infinity, max_retries: 5)