Bloccs.Runtime (bloccs v0.2.0)

Copy Markdown View Source

Runtime internals

Infrastructure called by compiler-generated pipelines — not part of the stable user API. You drive this through manifests, not by calling it directly; signatures may change between minor versions.

The execution core invoked from every compiler-emitted Broadway pipeline.

Generated handle_message/3 is a thin shim that builds a Config from the node's manifest plus its routing identity and calls process/2. Keeping the logic here — rather than templated into the codegen string — means inbound schema validation, timeout, retry, idempotency, and telemetry are unit-testable as ordinary library code instead of brittle generated text.

The contract with a node implementation:

pure_core(data, ctx)  :: {:ok, intermediate} | {:error, reason}
effect_shell(mid, ctx) ::
    {:emit, port, payload}            # emit one message on one out-port
  | {:emit, [{port, payload}, ...]}   # split / multi-port: many messages at once
  | :drop                             # filter: consume the message, emit nothing
  | {:error, reason}

A shell that returns :drop (or an empty emit list) is a filter — the message is acked and a [:bloccs, :node, :dropped] event is emitted, but nothing is dispatched downstream. A list of {port, payload} is a split — each pair is dispatched independently (a port may even appear more than once), so one invocation can fan a message out to several out-ports with distinct payloads. The bare {:emit, port, payload} form is the common single-emit case and is unchanged.

Summary

Functions

Prepare one message for an aggregate (batch) node: validate it against the in-port schema, then tag it for the batcher. An invalid message is failed immediately — it never enters the batch. Called from the generated handle_message/3 of a batch node.

Record one arrival at a join node's in-port. Once the arrival's correlation key ([join].on) has a payload on every in-port, the join contract runs over the correlated %{port => payload} map and the result is dispatched. A partial arrival is acked and held; a payload missing the correlation field fails the message. Called from the generated per-in-port handle_message/3 of a join.

Run one Broadway message through a node: validate inbound against the port schema, then pure_coreeffect_shell → router dispatch.

Run an aggregate (batch) node over a batch of messages. pure_core receives the list of payloads (not a single one) and reduces them; effect_shell emits the aggregate result (or splits / drops, like any node). Returns the (possibly failed) messages — called from the generated handle_batch/4.

Functions

batch_message(msg, config)

Prepare one message for an aggregate (batch) node: validate it against the in-port schema, then tag it for the batcher. An invalid message is failed immediately — it never enters the batch. Called from the generated handle_message/3 of a batch node.

join_arrival(msg, cfg, port)

Record one arrival at a join node's in-port. Once the arrival's correlation key ([join].on) has a payload on every in-port, the join contract runs over the correlated %{port => payload} map and the result is dispatched. A partial arrival is acked and held; a payload missing the correlation field fails the message. Called from the generated per-in-port handle_message/3 of a join.

process(msg, cfg)

Run one Broadway message through a node: validate inbound against the port schema, then pure_coreeffect_shell → router dispatch.

Returns the (possibly failed) Broadway.Message. A failed message carries a structured reason; it never reaches pure_core/effect_shell if inbound validation rejects it.

process_batch(messages, cfg)

@spec process_batch([Broadway.Message.t()], Bloccs.Runtime.Config.t()) :: [
  Broadway.Message.t()
]

Run an aggregate (batch) node over a batch of messages. pure_core receives the list of payloads (not a single one) and reduces them; effect_shell emits the aggregate result (or splits / drops, like any node). Returns the (possibly failed) messages — called from the generated handle_batch/4.

Best-effort / at-least-once: a failed aggregate fails every message in the batch, and Broadway re-delivers per its config. The batch path does not run per-message retry / idempotency / timeout (the validator rejects those on a [batch] node).