PtcRunner.Lisp.Eval.ParallelRunner (PtcRunner v0.11.0)

Copy Markdown View Source

Heap-capped, slot-bounded parallel execution of untrusted PTC-Lisp work (pmap/pcalls).

Why not Task.async_stream

max_heap_size is a spawn-time BEAM option: it must be in force the instant a process is created, because the closure (and everything it captures — e.g. a large data/ snapshot bound by an enclosing let) is copied onto the new process heap before any line of the worker function runs. Task.async_stream creates the worker process itself, so a heap cap set from inside the worker body is too late by construction: the unbounded closure copy has already happened.

ParallelRunner owns the worker lifecycle so it can pass {:max_heap_size, ...} to Process.spawn at creation. The cap is then in force from process birth and the closure copy lands on a limited heap — a worker whose captured environment alone exceeds the budget is killed at the first garbage collection.

Heap model: fixed per-worker cap + global slot budget

Every parallel worker — top-level AND nested — is spawned with the same fixed worker_max_heap max_heap_size with shared-binary accounting enabled. The heap cap is NOT divided by concurrency: dividing is unsound for nested parallelism, because a parent worker stays alive while its nested children run, so a parent and its children are all live at once.

Instead, a single shared PtcRunner.Lisp.Eval.ParallelBudget semaphore (capacity max_parallel_workers) bounds how many workers may be alive at once across the whole Lisp.run/2, at every nesting depth. Each worker acquires exactly one slot before spawn and releases it on every termination path. The aggregate guarantee is:

max live parallel heap  max_parallel_workers × worker_max_heap

A worker that wants to spawn nested parallelism with no slot free fails fast with :parallel_capacity_exceeded — slot acquisition is a non-blocking try-acquire, so it never deadlocks waiting on a slot that can only free when the worker itself finishes.

Other guarantees

  • Heap cap at birth — every worker is spawned with {:max_heap_size, %{size: worker_max_heap, kill: true, include_shared_binaries: true, ...}}.
  • One shared deadline — a single absolute deadline_mono bounds the whole operation. Nested runner calls inherit the same deadline.
  • Orphan cleanup — workers are linked to the calling (sandbox) process, so if it is killed mid-operation the BEAM tears the workers down with it. On the normal failure/success path the runner explicitly kills any worker still alive.
  • Deterministic error classification — abnormal exits map to stable atoms: :killed → :memory_exceeded, past-deadline → :timeout, any other abnormal exit → :runtime_error.

Each worker is spawned with BOTH :link and :monitor:

  • :link gives orphan cleanup for free when the caller dies. A plain :monitor does not — a monitored worker outlives its dead monitor. Since run/3 executes synchronously inside the sandbox process, the only signal that reaches a worker when the sandbox is Process.exit(pid, :kill)-ed is a link signal.
  • :monitor gives the {:DOWN, ...} notification used to classify a worker's exit reason without ambiguity.

A worker killed by its heap cap exits :killed, which would propagate through the link and take the sandbox down too. To prevent that, run/3 enables trap_exit only for the duration of the call and restores the prior flag in an after block, converting worker link signals into {:EXIT, _, _} messages.

Because the sandbox process may itself be linked to its caller (the link: true mode of Sandbox.execute/3, used by the MCP request worker), the temporary trap_exit must not swallow that caller's cancellation. run/3 therefore distinguishes {:EXIT, ...} signals by source: signals from its own workers are handled as worker exits, while an abnormal exit from any non-worker is treated as real cancellation — all workers are killed and the exit is re-propagated so linked cancellation still tears the sandbox down. :normal exits from non-workers are ignored.

Summary

Types

Options for run/3.

Per-worker payload returned by fun.

Functions

Runs fun over items in parallel under a fixed per-worker heap cap and a shared global worker-slot budget.

Types

opts()

@type opts() :: [
  worker_max_heap: pos_integer() | nil,
  max_concurrency: pos_integer(),
  budget: PtcRunner.Lisp.Eval.ParallelBudget.t() | nil,
  deadline_mono: integer(),
  trace_ctx: term(),
  spawn_fun: (function(), list() -> {pid(), reference()})
]

Options for run/3.

  • :worker_max_heap - FIXED max_heap_size (in words) applied to every worker at spawn time. nil means no cap (only when the sandbox is uncapped).
  • :max_concurrency - local scheduling window: max workers this call keeps alive at once (>= 1).
  • :budget - shared ParallelBudget semaphore (the HARD global cap on parallel workers across the whole Lisp.run). nil disables the slot budget (used only when there is no global cap configured).
  • :deadline_mono - absolute monotonic-time deadline in ms shared by the whole operation (including nested runner calls).
  • :trace_ctx - trace context captured in the parent, re-attached inside each worker.
  • :spawn_fun - the 2-arity (fun, spawn_opts) -> {pid, ref} used to create each worker (default: &Process.spawn/2). A seam for fault-injection tests that need a spawn to raise partway through filling the window; production callers never set it.

worker_result()

@type worker_result() :: {:ok, term()} | {:error, term()}

Per-worker payload returned by fun.

Functions

run(items, fun, opts)

@spec run([term()], (term() -> worker_result()), opts()) ::
  {:ok, [term()]} | {:error, term()}

Runs fun over items in parallel under a fixed per-worker heap cap and a shared global worker-slot budget.

fun is invoked as fun.(item) inside a freshly spawned, heap-capped worker process and must return a worker_result. The worker re-attaches the supplied trace context before calling fun.

Returns {:ok, results} (per-worker return values, in input order) or {:error, reason} on the first failure. reason is one of:

  • {:memory_exceeded, index} - worker index was killed by its heap cap
  • {:timeout, index} - the shared deadline elapsed before worker index finished
  • :parallel_capacity_exceeded - the global worker-slot budget was exhausted, so a worker could not be started
  • {:runtime_error, index, term} - worker index exited abnormally
  • any term from an {:error, term} returned by fun itself