A GenStage producer used as the input port of a compiled node.
Broadway names producer processes itself (e.g.
MyPipeline.Broadway.Producer_0), so we additionally register a stable
canonical name in Bloccs.Registry keyed by {network_id, node_id, in_port}.
push/3 resolves through the registry.
Back-pressure (v0.4)
push/3 is synchronous and bounded by the port's buffer. When the
buffer is full the caller is parked — the call does not return until a
downstream consumer drains space — so a slow consumer propagates back-pressure
up the graph instead of dropping messages. An unbounded port (buffer: nil)
never parks. The wait is bounded by config :bloccs, :push_timeout (default
:infinity); on timeout push/3 returns {:error, :timeout} rather than
blocking forever.
Summary
Functions
Push a payload into the producer, blocking if the buffer is full until space
frees (see "Back-pressure"). meta becomes the Broadway.Message's
:metadata (the runtime uses :bloccs_attempt for retries).
Re-enqueue a payload after delay_ms (used by the runtime to schedule a retry
with backoff; the timer lives in the supervised producer process). If the
buffer is full when the timer fires, the enqueue is retried shortly rather
than dropped. Returns :ok or :no_producer.
Start a producer. The :name option is the canonical Registry key the router
pushes to; :buffer (optional) bounds the queue for back-pressure.
Functions
Push a payload into the producer, blocking if the buffer is full until space
frees (see "Back-pressure"). meta becomes the Broadway.Message's
:metadata (the runtime uses :bloccs_attempt for retries).
Returns :ok, :no_producer (name not registered), or {:error, :timeout}
(waited past :push_timeout).
@spec push_after(atom(), term(), map(), non_neg_integer()) :: :ok | :no_producer
Re-enqueue a payload after delay_ms (used by the runtime to schedule a retry
with backoff; the timer lives in the supervised producer process). If the
buffer is full when the timer fires, the enqueue is retried shortly rather
than dropped. Returns :ok or :no_producer.
@spec start_link(keyword()) :: GenServer.on_start()
Start a producer. The :name option is the canonical Registry key the router
pushes to; :buffer (optional) bounds the queue for back-pressure.