Bloccs.Producer (bloccs v0.1.0)

Copy Markdown View Source

A GenStage producer used as the input port of a compiled node.

Broadway names producer processes itself (e.g. MyPipeline.Broadway.Producer_0), so we additionally register a stable canonical name in Bloccs.Registry keyed by {network_id, node_id, in_port}. push/3 resolves through the registry.

Back-pressure (v0.4)

push/3 is synchronous and bounded by the port's buffer. When the buffer is full the caller is parked — the call does not return until a downstream consumer drains space — so a slow consumer propagates back-pressure up the graph instead of dropping messages. An unbounded port (buffer: nil) never parks. The wait is bounded by config :bloccs, :push_timeout (default :infinity); on timeout push/3 returns {:error, :timeout} rather than blocking forever.

Summary

Functions

Push a payload into the producer, blocking if the buffer is full until space frees (see "Back-pressure"). meta becomes the Broadway.Message's :metadata (the runtime uses :bloccs_attempt for retries).

Re-enqueue a payload after delay_ms (used by the runtime to schedule a retry with backoff; the timer lives in the supervised producer process). If the buffer is full when the timer fires, the enqueue is retried shortly rather than dropped. Returns :ok or :no_producer.

Start a producer. The :name option is the canonical Registry key the router pushes to; :buffer (optional) bounds the queue for back-pressure.

Functions

push(name, payload, meta \\ %{})

@spec push(atom(), term(), map()) :: :ok | :no_producer | {:error, :timeout}

Push a payload into the producer, blocking if the buffer is full until space frees (see "Back-pressure"). meta becomes the Broadway.Message's :metadata (the runtime uses :bloccs_attempt for retries).

Returns :ok, :no_producer (name not registered), or {:error, :timeout} (waited past :push_timeout).

push_after(name, payload, meta, delay_ms)

@spec push_after(atom(), term(), map(), non_neg_integer()) :: :ok | :no_producer

Re-enqueue a payload after delay_ms (used by the runtime to schedule a retry with backoff; the timer lives in the supervised producer process). If the buffer is full when the timer fires, the enqueue is retried shortly rather than dropped. Returns :ok or :no_producer.

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Start a producer. The :name option is the canonical Registry key the router pushes to; :buffer (optional) bounds the queue for back-pressure.