Bloccs.Collector (bloccs v0.9.1)

Copy Markdown View Source

Runtime internals

The rendezvous behind Bloccs.call/4 and Bloccs.cast/4. You drive this through those two functions and a reply = true node, not by calling it directly.

Correlates a network's reply (or terminal error) back to the caller that injected the request.

bloccs is otherwise fire-and-forget: Bloccs.Producer.push/3 returns once a message is admitted, not once it is processed. The collector restores request/response on top of that — without making the pipeline synchronous — by reusing the per-message trace_id (Bloccs.Lineage) as the correlation key:

  1. Bloccs.call/4 mints a root lineage (trace_id T), registers T here (so the collector is expecting it before the request can fail or reply), then pushes the request.
  2. The request flows through the graph; trace_id rides unchanged on every 1:1 / split / router hop.
  3. A node declared reply = true reports its emitted payload here (an {:ok, payload} result); a node that fails terminally on the trace reports an {:error, %Bloccs.EffectError{}} result. Both are keyed by T.
  4. The collector matches T and hands the result to the blocked caller.

A single process serves every running network; entries are keyed by {network_id, trace_id} so ids never collide across networks.

Register-before-push (no races, no fire-and-forget overhead)

call/4 / cast/4 register synchronously before pushing, so by the time the message can produce a reply or error, the collector is already expecting it. A result for a key that is not registered is dropped immediately (not buffered) — so a plain Bloccs.Producer.push/3 or a reply = true node used in a fire-and-forget pipeline costs the collector nothing beyond an O(1) drop. A result that arrives before the caller blocks on await/2 is held (:ready) until claimed. The timeout is enforced collector-side, so a late result can never crash a timed-out caller.

Observability

The request/response lifecycle is observable two ways (for the dashboard / your own handlers):

  • Telemetry[:bloccs, :request, :start] when a request registers and [:bloccs, :request, :stop] when it resolves (with :outcome :reply | :error | :timeout, a :duration, and the %Bloccs.EffectError{} under :error on a failure). See Bloccs.Telemetry.
  • Snapshotstats/0 returns the current in-flight request count, total and per network.

Limitations (current milestone)

  • Aggregation is first-wins: the first result reported for a trace_id is the answer; later ones for the same id are dropped. Fan-out networks with several reply emits per request need an :all policy (later milestone).
  • A [batch]/[join] on the request path mints a fresh trace_id, breaking correlation — including error correlation. Keep the reply node on the request's 1:1 trace.

Summary

Functions

Block until the result for a previously register/3ed trace_id arrives, or its deadline fires. Returns the reported result ({:ok, payload} or {:error, %Bloccs.EffectError{}}), or {:error, :timeout}.

Returns a specification to start this module under a supervisor.

Register a blocking (call/4) request for trace_id on network_id, to be collected by await/2. Synchronous: returns once the collector is expecting the id, so the caller can push without racing the reply. The timeout_ms deadline is enforced collector-side.

Register an async (cast/4 with send_result: true) request: when the result for trace_id arrives, pid is sent {:bloccs_reply, trace_id, result} (a {:ok, payload} or {:error, %Bloccs.EffectError{}}), or {:bloccs_reply, trace_id, {:error, :timeout}} after timeout_ms.

Report a successful reply payload for trace_id. Called from the runtime for a reply = true node. Dropped if trace_id isn't registered; first report wins.

Report a terminal failure for trace_id. Called from the runtime when a node on the trace fails terminally. Dropped if trace_id isn't registered; first report wins (a reply already reported takes precedence).

A snapshot of in-flight request/response state: the total still awaiting a result, and a per-network breakdown. For observability (e.g. a dashboard gauge).

Types

key()

@type key() :: {network_id(), Bloccs.Lineage.id()}

network_id()

@type network_id() :: atom()

result()

@type result() :: {:ok, term()} | {:error, Bloccs.EffectError.t()}

Functions

await(network_id, trace_id)

@spec await(network_id(), Bloccs.Lineage.id()) :: result() | {:error, :timeout}

Block until the result for a previously register/3ed trace_id arrives, or its deadline fires. Returns the reported result ({:ok, payload} or {:error, %Bloccs.EffectError{}}), or {:error, :timeout}.

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

register(network_id, trace_id, timeout_ms)

@spec register(network_id(), Bloccs.Lineage.id(), timeout()) :: :ok

Register a blocking (call/4) request for trace_id on network_id, to be collected by await/2. Synchronous: returns once the collector is expecting the id, so the caller can push without racing the reply. The timeout_ms deadline is enforced collector-side.

register_async(network_id, trace_id, pid, timeout_ms)

@spec register_async(network_id(), Bloccs.Lineage.id(), pid(), timeout()) :: :ok

Register an async (cast/4 with send_result: true) request: when the result for trace_id arrives, pid is sent {:bloccs_reply, trace_id, result} (a {:ok, payload} or {:error, %Bloccs.EffectError{}}), or {:bloccs_reply, trace_id, {:error, :timeout}} after timeout_ms.

report(network_id, trace_id, payload)

@spec report(network_id(), Bloccs.Lineage.id(), term()) :: :ok

Report a successful reply payload for trace_id. Called from the runtime for a reply = true node. Dropped if trace_id isn't registered; first report wins.

report_error(network_id, trace_id, error)

@spec report_error(network_id(), Bloccs.Lineage.id(), Bloccs.EffectError.t()) :: :ok

Report a terminal failure for trace_id. Called from the runtime when a node on the trace fails terminally. Dropped if trace_id isn't registered; first report wins (a reply already reported takes precedence).

start_link(opts \\ [])

@spec start_link(keyword()) :: GenServer.on_start()

stats()

@spec stats() :: %{
  in_flight: non_neg_integer(),
  by_network: %{required(network_id()) => pos_integer()}
}

A snapshot of in-flight request/response state: the total still awaiting a result, and a per-network breakdown. For observability (e.g. a dashboard gauge).