Bloccs.Router (bloccs v0.1.1)

Copy Markdown View Source

Runtime internals

Infrastructure called by compiler-generated pipelines — not part of the stable user API. You drive this through manifests, not by calling it directly; signatures may change between minor versions.

Routes outbound emits from compiled nodes to the input producers of their downstream targets.

The edge table is registered per-network via register/2 at supervisor startup. dispatch/4 looks up the table and pushes each payload into every matching downstream producer via Bloccs.Producer.push/2. A push that fails (:no_producer, or a {:error, :timeout}) emits a [:bloccs, :dispatch, :error] telemetry event and is collected into dispatch/4's return value so the caller can act on undelivered emits rather than silently dropping them.

A :sink target is a process registered under a Bloccs.Sink name; sinks are typically test collectors (e.g. Bloccs.Sink.Collector) that record what reached an exposed output port.

Summary

Functions

Returns a specification to start this module under a supervisor.

Dispatch a payload from {network_id, from_node, from_port} to every downstream producer. Also notifies any sink listener subscribed to the source endpoint — sinks observe what a port emitted, regardless of whether downstream edges exist.

Return targets for a given source port; used by tests.

Return the canonical Broadway pipeline name for a {network, node}.

Compute the canonical producer process name for a {network, node, port}. The compiler hardcodes this so generated code and the router agree.

Register an edge table for a network. edges is a list of {{from_node, from_port}, [{to_node, to_port}]} pairs.

Register a sink listener (a pid) for a particular exposed port.

Look up a sink. Returns {:ok, pid} or :error.

Start the router. Singleton, name-registered.

Types

edges()

@type edges() :: [{endpoint(), [endpoint()]}]

endpoint()

@type endpoint() :: {atom(), atom()}

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

dispatch(network_id, from_node, from_port, payload)

@spec dispatch(atom(), atom(), atom(), term()) ::
  :ok | {:error, [{endpoint(), :no_producer | {:error, :timeout}}]}

Dispatch a payload from {network_id, from_node, from_port} to every downstream producer. Also notifies any sink listener subscribed to the source endpoint — sinks observe what a port emitted, regardless of whether downstream edges exist.

Returns :ok when every push succeeded, or {:error, failures} where failures is a list of {endpoint, reason} for the edges whose push did not succeed. Each failure also emits a [:bloccs, :dispatch, :error] telemetry event before being collected.

lookup(network_id, from_node, from_port)

@spec lookup(atom(), atom(), atom()) :: [endpoint()]

Return targets for a given source port; used by tests.

pipeline_name(network_id, node_id)

@spec pipeline_name(atom(), atom()) :: atom()

Return the canonical Broadway pipeline name for a {network, node}.

producer_name(network_id, node_id, port)

@spec producer_name(atom(), atom(), atom()) :: atom()

Compute the canonical producer process name for a {network, node, port}. The compiler hardcodes this so generated code and the router agree.

register(network_id, edges)

@spec register(atom(), edges()) :: :ok

Register an edge table for a network. edges is a list of {{from_node, from_port}, [{to_node, to_port}]} pairs.

register_sink(network_id, node_id, port, pid)

@spec register_sink(atom(), atom(), atom(), pid()) :: :ok

Register a sink listener (a pid) for a particular exposed port.

sink_lookup(network_id, node_id, port)

Look up a sink. Returns {:ok, pid} or :error.

start_link(opts \\ [])

Start the router. Singleton, name-registered.