Exgit.ObjectStore.SharedPromisor (exgit v0.1.0)

Copy Markdown View Source

GenServer wrapper around Exgit.ObjectStore.Promisor that serializes cache access across processes.

The plain Promisor is a pure value: two concurrent resolve(p, sha_a) / resolve(p, sha_b) calls from the same p each fetch independently and only one caller's grown cache survives — a benign but wasteful cache race.

For workloads that genuinely do concurrent bulk reads against the same repo (a grep agent spawning N tasks, a CI worker hydrating a cache from many branches in parallel), wrap the Promisor in a SharedPromisor:

{:ok, pid} = SharedPromisor.start_link(promisor)

# Both tasks see + grow the same cache, serialized by the
# GenServer. No race, no duplicate fetches.
Task.async(fn -> SharedPromisor.resolve(pid, sha_a) end)
Task.async(fn -> SharedPromisor.resolve(pid, sha_b) end)

The API mirrors Promisor.resolve/2 / put/2 / empty?/1 / fetch_with_filter/3 — each call acquires a GenServer lock, mutates the internal Promisor, and releases.

When NOT to use this

  • Single-threaded agent loops. The plain pure-value Promisor is faster (no message-passing overhead).
  • Short-lived scripts. The setup cost of spinning up a GenServer is ~200µs, usually more than the work saved.
  • Repos you'd rather snapshot. Share the %Promisor{} struct by value; two tasks holding the same struct see the same cache deterministically.

Failure semantics

If the wrapped Promisor call raises (e.g. a transport misconfiguration), the exception propagates through to the caller and the GenServer terminates. Supervise it under a :one_for_one strategy in your app tree if you want auto-restart; callers will need to re-discover the new pid and re-seed if they want state to persist across restarts.

Telemetry

Each public call emits a [:exgit, :object_store, :shared_promisor, op] span event where op is :resolve | :put | :has? | :get. Use this to track lock-contention latency separately from the underlying [:exgit, :object_store, :*] Promisor events.

Summary

Functions

Returns a specification to start this module under a supervisor.

True if the wrapped cache has no objects.

Fetch with an explicit filter spec and merge results into the cache. See Promisor.fetch_with_filter/3.

Pure-read lookup. Does NOT grow the cache; returns {:error, :not_found} on miss.

True if sha is in the local cache. Does NOT trigger a fetch.

Insert an object into the shared cache. Returns {:ok, sha} or {:error, :cache_overfull} when the wrapped Promisor has :on_overfull => :error.

Replace the wrapped Promisor. Use sparingly — this discards the current cache. Intended for testing and for swapping transports.

Look up sha, fetching from the transport on cache miss.

Snapshot the current Promisor state. Useful for serialization, debugging, or handing back a pure value after the shared cache has served its purpose.

Start a SharedPromisor process wrapping the given Promisor.

Types

server()

@type server() :: GenServer.server()

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

empty?(server, timeout \\ 5000)

@spec empty?(server(), timeout()) :: boolean()

True if the wrapped cache has no objects.

fetch_with_filter(server, wants, opts, timeout \\ 60000)

@spec fetch_with_filter(server(), [binary()], keyword(), timeout()) ::
  :ok | {:error, term()}

Fetch with an explicit filter spec and merge results into the cache. See Promisor.fetch_with_filter/3.

get(server, sha, timeout \\ 5000)

@spec get(server(), binary(), timeout()) :: {:ok, Exgit.Object.t()} | {:error, term()}

Pure-read lookup. Does NOT grow the cache; returns {:error, :not_found} on miss.

has_object?(server, sha, timeout \\ 5000)

@spec has_object?(server(), binary(), timeout()) :: boolean()

True if sha is in the local cache. Does NOT trigger a fetch.

put(server, object, timeout \\ 30000)

@spec put(server(), Exgit.Object.t(), timeout()) ::
  {:ok, binary()} | {:error, :cache_overfull}

Insert an object into the shared cache. Returns {:ok, sha} or {:error, :cache_overfull} when the wrapped Promisor has :on_overfull => :error.

replace(server, new_promisor, timeout \\ 5000)

@spec replace(server(), Exgit.ObjectStore.Promisor.t(), timeout()) :: :ok

Replace the wrapped Promisor. Use sparingly — this discards the current cache. Intended for testing and for swapping transports.

resolve(server, sha, timeout \\ 30000)

@spec resolve(server(), binary(), timeout()) ::
  {:ok, Exgit.Object.t()} | {:error, term()}

Look up sha, fetching from the transport on cache miss.

Unlike Promisor.resolve/2, does NOT return a promisor in the success tuple — the cache grew inside the GenServer, and the next call will see the updated state automatically.

Returns one of:

  • {:ok, object} — cache hit or successful fetch
  • {:error, reason} — transport failure, cache unchanged
  • {:error, :not_found} — fetch returned a pack that didn't contain the requested SHA; sibling objects that WERE returned are now in the cache and will benefit future calls.

snapshot(server, timeout \\ 5000)

@spec snapshot(server(), timeout()) :: Exgit.ObjectStore.Promisor.t()

Snapshot the current Promisor state. Useful for serialization, debugging, or handing back a pure value after the shared cache has served its purpose.

start_link(promisor, opts \\ [])

Start a SharedPromisor process wrapping the given Promisor.

Accepts all the usual GenServer.start_link/3 options (:name, :timeout, etc.) plus a required promisor key.

Examples

{:ok, pid} = SharedPromisor.start_link(my_promisor)
{:ok, pid} = SharedPromisor.start_link(my_promisor, name: MyApp.Cache)