aion/internal/pump
The workflow-side query pump loop around suspending awaits.
The engine answers workflow queries at yield points (AT-007 C20): when a
query is pending for the workflow, a suspending await returns the
sentinel Error("aion_query:" <> json) instead of resolving. run
recognises the sentinel, services the query through
aion_flow_query_pump (handler lookup, try/catch, reply), and re-enters
the same await, which re-resolves identically — pump iterations are
invisible to history and to replay. Every other result passes through
untouched.
The suspension call-shape contract
Every suspending engine NIF (sleep, receive_signal,
await_activity_result, await_child, collect_*) parks the workflow
process with beamr’s message-wakeable suspension: a wake RE-EXECUTES the
BEAM call instruction that invoked the NIF, and the NIF re-resolves from
recorded history (the engine’s two-phase suspend). Re-execution is only
sound when that call instruction is idempotent. A tail call
(call_ext_last) is NOT idempotent — it deallocates the caller’s stack
frame before the NIF runs, so re-executing it on wake pops a second
frame, desyncing the return path (observed as the NIF’s result value
being called as a function: bad function term {ok, <<"fired">>}).
Therefore every suspending FFI call MUST sit in non-tail position. The
thunks passed to run enforce this with shield: the FFI call is the
argument of a cross-module call, and the Erlang compiler can neither
tail-call nor inline a remote call, so the suspending call always
compiles to a plain call_ext whose re-execution is safe.
In addition, every thunk’s body must be exactly one shielded FFI call on
captured values: arguments are precomputed outside the thunk, never
derived inside it. Nothing in the thunk may recompute state on re-entry —
the same contract the engine documents for hand-rolled await funs in
crates/aion/tests/fixtures/aion_fixture_query.erl — and the pump itself
relies on it when it re-enters the same await after servicing a query.
Values
pub fn run(
do: fn() -> Result(String, String),
) -> Result(String, String)
Run a suspending await thunk, servicing any pending queries the engine
surfaces as aion_query: sentinels before the await’s own resolution.
The loop is tail-recursive: each serviced query is answered exactly once,
then the await is re-entered until it resolves with a non-sentinel
result. A query handler raise never crashes the workflow — the Erlang
pump converts it into a reply_query_error and the loop continues.
Thunk authors: the suspending FFI call inside the thunk MUST be wrapped
in [shield] (see the module docs for the call-shape contract).
pub fn shield(
outcome: Result(String, String),
) -> Result(String, String)
Pin a suspending FFI call out of tail position.
Called as pump.shield(ffi.sleep(...)) from another module, the FFI
call is evaluated as the argument of a remote call: argument position is
never tail position, and the Erlang compiler never inlines remote calls
(hot-code-loading semantics), so the suspending NIF is always invoked by
a re-execution-safe call_ext. The body re-matches the result so the
function cannot collapse to an identity even under whole-program
optimisation. See the module docs for why a call_ext_last to a
suspending NIF corrupts the stack on wake.