Bloccs.Join (bloccs v0.2.0)

Copy Markdown View Source

Runtime internals

Infrastructure called by compiler-generated pipelines — not part of the stable user API. You drive this through manifests, not by calling it directly; signatures may change between minor versions.

Correlation buffer for [join] nodes — infrastructure, like Bloccs.Idempotency.

A join node has two or more in-ports carrying distinct schemas. Each in-port compiles to its own Broadway pipeline; every arrival is handed here and stashed by correlation key (the value of the node's [join].on field). When all of a node's in-ports have produced a payload for the same key, the partial match is removed and returned complete — the arriving pipeline then runs the node's contract over the correlated %{port => payload} map and dispatches the result.

Timeout / deadletter

A partial match that is not completed within the node's [join].timeout_ms is swept: if the node declared [join].deadletter = "port", the partial is emitted there as an %{key, present, payloads} envelope (nothing is lost); otherwise it is dropped with a [:bloccs, :join, :timeout] event.

State lives in a public ETS table owned by this GenServer. Correlation merges run inside the server so the read-modify-write stays atomic; the sweep runs on a timer (configurable via :bloccs, :join_sweep_ms, default 1s) and sweep_now/0 forces one synchronously (used by tests).

Summary

Functions

Record an arrival on port for join scope. Returns {:complete, payloads} (a %{port => payload} map) once every required in-port has arrived for the arrival's correlation key, :partial while still waiting, or {:error, reason} (:no_config / :no_key).

Returns a specification to start this module under a supervisor.

Register a join node's correlation config. Called from a generated network supervisor at boot (alongside Router.register/2). scope is {network, node}.

Drop all buffered partial matches (keeps registered configs). Test-only.

Start the buffer (started under the bloccs application supervisor).

Force a sweep synchronously (deadletter/drop expired partials). Test helper.

Types

config()

@type config() :: %{
  required: MapSet.t(atom()),
  on: String.t(),
  timeout_ms: pos_integer() | nil,
  deadletter: atom() | nil
}

scope()

@type scope() :: {atom(), atom()}

Functions

arrive(scope, port, payload)

@spec arrive(scope(), atom(), map()) ::
  {:complete, %{required(atom()) => map()}}
  | :partial
  | {:error, :no_config | :no_key}

Record an arrival on port for join scope. Returns {:complete, payloads} (a %{port => payload} map) once every required in-port has arrived for the arrival's correlation key, :partial while still waiting, or {:error, reason} (:no_config / :no_key).

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

register(scope, config)

@spec register(scope(), config()) :: :ok

Register a join node's correlation config. Called from a generated network supervisor at boot (alongside Router.register/2). scope is {network, node}.

reset()

@spec reset() :: :ok

Drop all buffered partial matches (keeps registered configs). Test-only.

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Start the buffer (started under the bloccs application supervisor).

sweep_now()

@spec sweep_now() :: :ok

Force a sweep synchronously (deadletter/drop expired partials). Test helper.