Per-process MLX stream management for concurrent inference.
MLX dispatches GPU work through Metal command queues. By default
every op shares a single command queue (the default worker thread).
Emily.Stream lets each BEAM process use its own worker thread —
its own Metal command queue — so multiple processes can run
inference concurrently on a shared model.
Public API
new/1— create a stream on:gpuor:cpu. Each stream allocates a dedicated OS thread that owns the MLX stream object; the thread is joined when the stream reference is garbage collected.with_stream/2— install a stream for the current process for the duration of a function call, then restore the previous stream (or the default) on exit. Nesting is safe.close/1— stop a stream's worker deterministically instead of waiting for garbage collection: queued ops are cancelled (their callers raise) and the OS thread is joined off the BEAM schedulers.
How it works
with_stream/2 stores the worker reference in the process
dictionary under :emily_worker. Emily.Backend reads it and
passes it to every NIF call. Each NIF dispatches its work to the
worker's dedicated OS thread where the MLX stream lives. Tensors
allocated by one stream can be read by another (MLX arrays are
refcounted and thread-safe for reads), but lazy tensors must be
evaluated on the stream that created them.
Configuration
Two application-env keys tune worker behaviour (set them in your
config/config.exs):
:worker_queue_limit(default8192) — the maximum number of operations that may be queued on a single worker before further dispatch is rejected with aRuntimeError. Each op is awaited synchronously, so a process holds at most one queued item; this cap is reached only by many processes dispatching to one worker concurrently, and provides back-pressure against a runaway producer.:await_timeout(default:infinity) — milliseconds to wait for a native result before raising.:infinitynever times out; set a finite value to bound how long a caller can block on one operation.
config :emily, worker_queue_limit: 8192, await_timeout: :infinityConcurrent serving patterns
Stream-per-process (shared model, per-process queues):
stream = Emily.Stream.new(:gpu)
Emily.Stream.with_stream(stream, fn ->
Nx.Serving.batched_run(my_serving, input)
end)Each serving worker allocates its own stream once at init. Weights are shared — no duplication.
Pooled servings (K instances behind a pool):
Start K Nx.Serving instances behind poolboy / Registry / etc.
Each loads its own weights and runs on the default stream. No
Emily.Stream needed. Trade-off: each pool member holds its own
weight copy, so memory scales with K.
For small models the pool approach is simpler. For large models (Qwen3-7B+) where duplicating weights is impractical, use stream-per-process.
Examples
iex> stream = Emily.Stream.new(:gpu)
iex> Emily.Stream.with_stream(stream, fn -> 42 end)
42
Summary
Functions
Stop the stream's worker thread.
Create a new stream (Metal command queue) on the given device.
Execute fun with the given stream as the default for MLX ops.
Types
@type t() :: %Emily.Stream{worker: reference()}
Functions
@spec close(t()) :: :ok
Stop the stream's worker thread.
Cancels any operations still queued on the stream — their awaiting
processes get a RuntimeError (:stopped) rather than hanging — lets
the in-flight op finish, and then tears the worker down off the BEAM
schedulers. Idempotent; using the stream after close/1 raises.
Closing is optional: a stream's worker is also stopped when the
%Emily.Stream{} is garbage collected. close/1 lets you release the
worker deterministically instead of waiting for GC.
Examples
iex> stream = Emily.Stream.new(:gpu)
iex> Emily.Stream.close(stream)
:ok
@spec new(:gpu | :cpu) :: t()
Create a new stream (Metal command queue) on the given device.
Each stream is backed by a dedicated OS thread that owns the MLX stream and its Metal command encoder. The worker thread is cleaned up when the resource is garbage collected.
Examples
iex> stream = Emily.Stream.new(:gpu)
iex> match?(%Emily.Stream{}, stream)
true
@spec with_stream(t(), (-> result)) :: result when result: var
Execute fun with the given stream as the default for MLX ops.
Stores the worker reference in the process dictionary so that
Emily.Backend passes it to every NIF call. The previous worker
(if any) is restored in an after block, so nesting is safe.
Examples
iex> stream = Emily.Stream.new(:gpu)
iex> Emily.Stream.with_stream(stream, fn ->
...> Nx.tensor([1.0, 2.0, 3.0], backend: Emily.Backend)
...> |> Nx.sum()
...> |> Nx.to_number()
...> end)
6.0