AI.Embeddings.Pool (fnord v0.9.23)

View Source

GenServer managing a long-lived embed.exs process in JSONL pool mode.

Spawns the embedding script as an Erlang Port and communicates via line-delimited JSON on stdin/stdout. Each request gets a unique id; responses are matched back to waiting callers by id.

The script handles its own concurrency internally (Task.Supervisor with bounded workers), so this GenServer is the single point of contact for the rest of the application.

Lifecycle

ensure_started/1 is idempotent and the usual entry point for commands that need embeddings (ask, index, search, memory, conversations). It no-ops when the Indexer has been overridden for tests. A normal GenServer.stop/1 or shutdown/0 marks the pool as shutting down so the inevitable port-death messages that follow don't emit bogus "embed process died" warnings.

On unexpected port death (crash, non-zero exit, monitor fired), the pool fails any in-flight callers with {:error, :port_died}, waits briefly, and respawns the port.

Error shapes

Callers of embed/1 (and AI.Embeddings.get/1, which delegates here) should expect these error tuples:

  • {:error, :pool_not_running} - the pool GenServer is not alive. Call ensure_started/1 first.
  • {:error, :port_not_connected} - a call arrived during the restart window after the port died but before the new one spawned. Retrying is usually fine.
  • {:error, :port_died} - the port died while a call was in flight.
  • {:error, :timeout} - embedding did not complete within the 30-minute call timeout (sized to cover first-invocation cold boot).
  • {:error, :shutting_down} - the pool is terminating; the caller's request will not be processed.
  • {:error, binary} - structured error surfaced by embed.exs itself (e.g. missing text field, internal exception). The binary is a short description.

Back-pressure

The pool does not cap pending requests. Each embed call becomes a GenServer.call with a 30-minute timeout and gets queued in the pending map until embed.exs produces a matching response. In-flight concurrency on the embed.exs side is bounded by :workers (default max(System.schedulers_online() - 2, 8) - scales to the host, with an 8-worker floor so small boxes still get reasonable throughput); callers beyond that wait on the port's stdin pipe.

Summary

Functions

Returns a specification to start this module under a supervisor.

Returns the worker count the pool will use when :workers isn't passed. Exposed for visibility (logging, debug); not used internally.

Embeds a text string, returning a 384-dimensional float vector. Blocks until the result arrives from the embed.exs process.

Idempotently ensures the pool is running for commands that produce embeddings (ask, index, search, memory, conversation search).

Gracefully stops the pool, suppressing the warnings that would otherwise be triggered by the port-closed messages that follow. Idempotent; safe to call when the pool is not running.

Starts the pool. Options

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

default_workers()

@spec default_workers() :: pos_integer()

Returns the worker count the pool will use when :workers isn't passed. Exposed for visibility (logging, debug); not used internally.

embed(text)

@spec embed(String.t()) ::
  {:ok, [float()]}
  | {:error, :pool_not_running}
  | {:error, :port_not_connected}
  | {:error, :port_died}
  | {:error, :timeout}
  | {:error, :shutting_down}
  | {:error, binary()}

Embeds a text string, returning a 384-dimensional float vector. Blocks until the result arrives from the embed.exs process.

See the module doc for the full list of possible error tuples.

ensure_started(opts \\ [])

@spec ensure_started(keyword()) :: :ok

Idempotently ensures the pool is running for commands that produce embeddings (ask, index, search, memory, conversation search).

No-ops when the configured Indexer has been overridden for testing (StubIndexer etc.); those paths never reach AI.Embeddings at all, and spawning a real embed.exs in a temp-dir test harness produces noise.

shutdown()

@spec shutdown() :: :ok

Gracefully stops the pool, suppressing the warnings that would otherwise be triggered by the port-closed messages that follow. Idempotent; safe to call when the pool is not running.

start_link(opts \\ [])

@spec start_link(keyword()) :: GenServer.on_start()

Starts the pool. Options:

  • :workers - number of concurrent embedding workers. Defaults to max(System.schedulers_online() - 2, 8) so larger hosts scale up without starving the rest of the BEAM.