Runtime internals
Infrastructure called by compiler-generated pipelines — not part of the stable user API. You drive this through manifests, not by calling it directly; signatures may change between minor versions.
Correlation buffer for [join] nodes — infrastructure, like Bloccs.Idempotency.
A join node has two or more in-ports carrying distinct schemas. Each in-port
compiles to its own Broadway pipeline; every arrival is handed here and stashed
by correlation key (the value of the node's [join].on field). When all of
a node's in-ports have produced a payload for the same key, the partial match
is removed and returned complete — the arriving pipeline then runs the node's
contract over the correlated %{port => payload} map and dispatches the result.
Timeout / deadletter
A partial match that is not completed within the node's [join].timeout_ms is
swept: if the node declared [join].deadletter = "port", the partial is emitted
there as an %{key, present, payloads} envelope (nothing is lost); otherwise it
is dropped with a [:bloccs, :join, :timeout] event.
State lives in a public ETS table owned by this GenServer. Correlation merges
run inside the server so the read-modify-write stays atomic; the sweep runs on
a timer (configurable via :bloccs, :join_sweep_ms, default 1s) and
sweep_now/0 forces one synchronously (used by tests).
Summary
Functions
Record an arrival on port for join scope. Returns {:complete, payloads}
(a %{port => payload} map) once every required in-port has arrived for the
arrival's correlation key, :partial while still waiting, or {:error, reason}
(:no_config / :no_key).
Returns a specification to start this module under a supervisor.
Register a join node's correlation config. Called from a generated network
supervisor at boot (alongside Router.register/2). scope is {network, node}.
Drop all buffered partial matches (keeps registered configs). Test-only.
Start the buffer (started under the bloccs application supervisor).
Force a sweep synchronously (deadletter/drop expired partials). Test helper.
Types
Functions
@spec arrive(scope(), atom(), map()) :: {:complete, %{required(atom()) => map()}} | :partial | {:error, :no_config | :no_key}
Record an arrival on port for join scope. Returns {:complete, payloads}
(a %{port => payload} map) once every required in-port has arrived for the
arrival's correlation key, :partial while still waiting, or {:error, reason}
(:no_config / :no_key).
Returns a specification to start this module under a supervisor.
See Supervisor.
Register a join node's correlation config. Called from a generated network
supervisor at boot (alongside Router.register/2). scope is {network, node}.
@spec reset() :: :ok
Drop all buffered partial matches (keeps registered configs). Test-only.
@spec start_link(keyword()) :: GenServer.on_start()
Start the buffer (started under the bloccs application supervisor).
@spec sweep_now() :: :ok
Force a sweep synchronously (deadletter/drop expired partials). Test helper.