Runtime internals
The rendezvous behind Bloccs.call/4 and Bloccs.cast/4. You drive this
through those two functions and a reply = true node, not by calling it
directly.
Correlates a network's reply (or terminal error) back to the caller that injected the request.
bloccs is otherwise fire-and-forget: Bloccs.Producer.push/3 returns once a
message is admitted, not once it is processed. The collector restores
request/response on top of that — without making the pipeline synchronous — by
reusing the per-message trace_id (Bloccs.Lineage) as the correlation key:
Bloccs.call/4mints a root lineage (trace_idT), registersThere (so the collector is expecting it before the request can fail or reply), then pushes the request.- The request flows through the graph;
trace_idrides unchanged on every 1:1 / split / router hop. - A node declared
reply = truereports its emitted payload here (an{:ok, payload}result); a node that fails terminally on the trace reports an{:error, %Bloccs.EffectError{}}result. Both are keyed byT. - The collector matches
Tand hands the result to the blocked caller.
A single process serves every running network; entries are keyed by
{network_id, trace_id} so ids never collide across networks.
Register-before-push (no races, no fire-and-forget overhead)
call/4 / cast/4 register synchronously before pushing, so by the time
the message can produce a reply or error, the collector is already expecting it.
A result for a key that is not registered is dropped immediately (not
buffered) — so a plain Bloccs.Producer.push/3 or a reply = true node used in
a fire-and-forget pipeline costs the collector nothing beyond an O(1) drop. A
result that arrives before the caller blocks on await/2 is held (:ready)
until claimed. The timeout is enforced collector-side, so a late result can
never crash a timed-out caller.
Limitations (current milestone)
- Aggregation is first-wins: the first result reported for a
trace_idis the answer; later ones for the same id are dropped. Fan-out networks with several reply emits per request need an:allpolicy (later milestone). - A
[batch]/[join]on the request path mints a freshtrace_id, breaking correlation — including error correlation. Keep the reply node on the request's 1:1 trace.
Summary
Functions
Block until the result for a previously register/3ed trace_id arrives, or
its deadline fires. Returns the reported result ({:ok, payload} or
{:error, %Bloccs.EffectError{}}), or {:error, :timeout}.
Returns a specification to start this module under a supervisor.
Register a blocking (call/4) request for trace_id on network_id, to be
collected by await/2. Synchronous: returns once the collector is expecting
the id, so the caller can push without racing the reply. The timeout_ms
deadline is enforced collector-side.
Register an async (cast/4 with send_result: true) request: when the result
for trace_id arrives, pid is sent {:bloccs_reply, trace_id, result} (a
{:ok, payload} or {:error, %Bloccs.EffectError{}}), or
{:bloccs_reply, trace_id, {:error, :timeout}} after timeout_ms.
Report a successful reply payload for trace_id. Called from the runtime for a
reply = true node. Dropped if trace_id isn't registered; first report wins.
Report a terminal failure for trace_id. Called from the runtime when a node
on the trace fails terminally. Dropped if trace_id isn't registered; first
report wins (a reply already reported takes precedence).
Types
@type key() :: {network_id(), Bloccs.Lineage.id()}
@type network_id() :: atom()
@type result() :: {:ok, term()} | {:error, Bloccs.EffectError.t()}
Functions
@spec await(network_id(), Bloccs.Lineage.id()) :: result() | {:error, :timeout}
Block until the result for a previously register/3ed trace_id arrives, or
its deadline fires. Returns the reported result ({:ok, payload} or
{:error, %Bloccs.EffectError{}}), or {:error, :timeout}.
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec register(network_id(), Bloccs.Lineage.id(), timeout()) :: :ok
Register a blocking (call/4) request for trace_id on network_id, to be
collected by await/2. Synchronous: returns once the collector is expecting
the id, so the caller can push without racing the reply. The timeout_ms
deadline is enforced collector-side.
@spec register_async(network_id(), Bloccs.Lineage.id(), pid(), timeout()) :: :ok
Register an async (cast/4 with send_result: true) request: when the result
for trace_id arrives, pid is sent {:bloccs_reply, trace_id, result} (a
{:ok, payload} or {:error, %Bloccs.EffectError{}}), or
{:bloccs_reply, trace_id, {:error, :timeout}} after timeout_ms.
@spec report(network_id(), Bloccs.Lineage.id(), term()) :: :ok
Report a successful reply payload for trace_id. Called from the runtime for a
reply = true node. Dropped if trace_id isn't registered; first report wins.
@spec report_error(network_id(), Bloccs.Lineage.id(), Bloccs.EffectError.t()) :: :ok
Report a terminal failure for trace_id. Called from the runtime when a node
on the trace fails terminally. Dropped if trace_id isn't registered; first
report wins (a reply already reported takes precedence).
@spec start_link(keyword()) :: GenServer.on_start()