Runtime internals
Infrastructure called by compiler-generated pipelines — not part of the stable user API. You drive this through manifests, not by calling it directly; signatures may change between minor versions.
The execution core invoked from every compiler-emitted Broadway pipeline.
Generated handle_message/3 is a thin shim that builds a Config from the
node's manifest plus its routing identity and calls process/2. Keeping the
logic here — rather than templated into the codegen string — means inbound
schema validation, timeout, retry, idempotency, and telemetry are
unit-testable as ordinary library code instead of brittle generated text.
The contract with a node implementation:
pure_core(data, ctx) :: {:ok, intermediate} | {:error, reason}
effect_shell(mid, ctx) ::
{:emit, port, payload} # emit one message on one out-port
| {:emit, [{port, payload}, ...]} # split / multi-port: many messages at once
| :drop # filter: consume the message, emit nothing
| {:error, reason}A shell that returns :drop (or an empty emit list) is a filter — the
message is acked and a [:bloccs, :node, :dropped] event is emitted, but
nothing is dispatched downstream. A list of {port, payload} is a split —
each pair is dispatched independently (a port may even appear more than once),
so one invocation can fan a message out to several out-ports with distinct
payloads. The bare {:emit, port, payload} form is the common single-emit
case and is unchanged.
Summary
Functions
Prepare one message for an aggregate (batch) node: validate it against the
in-port schema, then tag it for the batcher. An invalid message is failed
immediately — it never enters the batch. Called from the generated
handle_message/3 of a batch node.
Record one arrival at a join node's in-port. Once the arrival's correlation key
([join].on) has a payload on every in-port, the join contract runs over
the correlated %{port => payload} map and the result is dispatched. A partial
arrival is acked and held; a payload missing the correlation field fails the
message. Called from the generated per-in-port handle_message/3 of a join.
Run one Broadway message through a node: validate inbound against the port
schema, then pure_core → effect_shell → router dispatch.
Run an aggregate (batch) node over a batch of messages. pure_core receives
the list of payloads (not a single one) and reduces them; effect_shell
emits the aggregate result (or splits / drops, like any node). Returns the
(possibly failed) messages — called from the generated handle_batch/4.
Functions
@spec batch_message(Broadway.Message.t(), Bloccs.Runtime.Config.t()) :: Broadway.Message.t()
Prepare one message for an aggregate (batch) node: validate it against the
in-port schema, then tag it for the batcher. An invalid message is failed
immediately — it never enters the batch. Called from the generated
handle_message/3 of a batch node.
@spec join_arrival(Broadway.Message.t(), Bloccs.Runtime.Config.t(), atom()) :: Broadway.Message.t()
Record one arrival at a join node's in-port. Once the arrival's correlation key
([join].on) has a payload on every in-port, the join contract runs over
the correlated %{port => payload} map and the result is dispatched. A partial
arrival is acked and held; a payload missing the correlation field fails the
message. Called from the generated per-in-port handle_message/3 of a join.
@spec process(Broadway.Message.t(), Bloccs.Runtime.Config.t()) :: Broadway.Message.t()
Run one Broadway message through a node: validate inbound against the port
schema, then pure_core → effect_shell → router dispatch.
Returns the (possibly failed) Broadway.Message. A failed message carries a
structured reason; it never reaches pure_core/effect_shell if inbound
validation rejects it.
@spec process_batch([Broadway.Message.t()], Bloccs.Runtime.Config.t()) :: [ Broadway.Message.t() ]
Run an aggregate (batch) node over a batch of messages. pure_core receives
the list of payloads (not a single one) and reduces them; effect_shell
emits the aggregate result (or splits / drops, like any node). Returns the
(possibly failed) messages — called from the generated handle_batch/4.
Best-effort / at-least-once: a failed aggregate fails every message in the
batch, and Broadway re-delivers per its config. The batch path does not run
per-message retry / idempotency / timeout (the validator rejects those on a
[batch] node).