AI.Embeddings.Pool (fnord v0.9.27)
View SourceGenServer managing a long-lived embed.exs process in JSONL pool mode.
Spawns the embedding script as an Erlang Port and communicates via line-delimited JSON on stdin/stdout. Each request gets a unique id; responses are matched back to waiting callers by id.
The script handles its own concurrency internally (Task.Supervisor with bounded workers), so this GenServer is the single point of contact for the rest of the application.
Lifecycle
ensure_started/1 is idempotent and the usual entry point for commands
that need embeddings (ask, index, search, memory,
conversations). It no-ops when the Indexer has been overridden for
tests. A normal GenServer.stop/1 or shutdown/0 marks the pool as
shutting down so the inevitable port-death messages that follow don't
emit bogus "embed process died" warnings.
On unexpected port death (crash, non-zero exit, monitor fired), the
pool fails any in-flight callers with {:error, :port_died}, waits
briefly, and respawns the port.
Error shapes
Callers of embed/1 (and AI.Embeddings.get/1, which delegates here)
should expect these error tuples:
{:error, :pool_not_running}- the pool GenServer is not alive. Callensure_started/1first.{:error, :port_not_connected}- a call arrived during the restart window after the port died but before the new one spawned. Retrying is usually fine.{:error, :port_died}- the port died while a call was in flight.{:error, :timeout}- embedding did not complete within the 30-minute call timeout (sized to cover first-invocation cold boot).{:error, :shutting_down}- the pool is terminating; the caller's request will not be processed.{:error, binary}- structured error surfaced by embed.exs itself (e.g. missing text field, internal exception). The binary is a short description.
Back-pressure
The pool does not cap pending requests. Each embed call becomes a
GenServer.call with a 30-minute timeout and gets queued in the
pending map until embed.exs produces a matching response. In-flight
concurrency on the embed.exs side is bounded by :workers (default
max(System.schedulers_online() - 2, 8) - scales to the host, with an
8-worker floor so small boxes still get reasonable throughput); callers
beyond that wait on the port's stdin pipe.
Summary
Functions
Returns a specification to start this module under a supervisor.
Returns the worker count the pool will use when :workers isn't passed.
Exposed for visibility (logging, debug); not used internally.
Embeds a text string, returning a 384-dimensional float vector. Blocks until the result arrives from the embed.exs process.
Idempotently ensures the pool is running for commands that produce embeddings (ask, index, search, memory, conversation search).
Gracefully stops the pool, suppressing the warnings that would otherwise be triggered by the port-closed messages that follow. Idempotent; safe to call when the pool is not running.
Starts the pool. Options
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec default_workers() :: pos_integer()
Returns the worker count the pool will use when :workers isn't passed.
Exposed for visibility (logging, debug); not used internally.
@spec embed(String.t()) :: {:ok, [float()]} | {:error, :pool_not_running} | {:error, :port_not_connected} | {:error, :port_died} | {:error, :timeout} | {:error, :shutting_down} | {:error, binary()}
Embeds a text string, returning a 384-dimensional float vector. Blocks until the result arrives from the embed.exs process.
See the module doc for the full list of possible error tuples.
@spec ensure_started(keyword()) :: :ok
Idempotently ensures the pool is running for commands that produce embeddings (ask, index, search, memory, conversation search).
No-ops when the configured Indexer has been overridden for testing (StubIndexer etc.); those paths never reach AI.Embeddings at all, and spawning a real embed.exs in a temp-dir test harness produces noise.
@spec shutdown() :: :ok
Gracefully stops the pool, suppressing the warnings that would otherwise be triggered by the port-closed messages that follow. Idempotent; safe to call when the pool is not running.
@spec start_link(keyword()) :: GenServer.on_start()
Starts the pool. Options:
- :workers - number of concurrent embedding workers. Defaults to
max(System.schedulers_online() - 2, 8)so larger hosts scale up without starving the rest of the BEAM.