Bloccs.Producer (bloccs v0.2.0)

Copy Markdown View Source

A GenStage producer used as the input port of a compiled node.

Broadway names producer processes itself (e.g. MyPipeline.Broadway.Producer_0), so we additionally register a stable canonical name in Bloccs.Registry keyed by {network_id, node_id, in_port}. push/3 resolves through the registry.

Back-pressure (v0.4)

push/3 is synchronous and bounded by the port's buffer. When the buffer is full the caller is parked — the call does not return until a downstream consumer drains space — so a slow consumer propagates back-pressure up the graph instead of dropping messages. An unbounded port (buffer: nil) never parks. The wait is bounded by config :bloccs, :push_timeout (default :infinity); on timeout push/3 returns {:error, :timeout} rather than blocking forever.

Summary

Types

A point-in-time snapshot of a producer's queue, for observability.

Functions

Push a payload into the producer, blocking if the buffer is full until space frees (see "Back-pressure"). meta becomes the Broadway.Message's :metadata (the runtime uses :bloccs_attempt for retries).

Re-enqueue a payload after delay_ms (used by the runtime to schedule a retry with backoff; the timer lives in the supervised producer process). If the buffer is full when the timer fires, the enqueue is retried shortly rather than dropped. Returns :ok or :no_producer.

Start a producer. The :name option is the canonical Registry key the router pushes to; :buffer (optional) bounds the queue for back-pressure.

Snapshot a producer's queue state by canonical name. A bounded, safe call (5s timeout) — never reach into the process with :sys. Returns :no_producer if the name isn't registered, {:error, :timeout} if the producer is wedged.

Types

stats()

@type stats() :: %{
  size: non_neg_integer(),
  buffer: pos_integer() | :unbounded,
  blocked: non_neg_integer(),
  pending_demand: non_neg_integer(),
  utilization: float()
}

A point-in-time snapshot of a producer's queue, for observability.

buffer is the configured maximum (:unbounded when the port has no buffer); utilization is size / buffer (0.0 when unbounded).

Functions

push(name, payload, meta \\ %{})

@spec push(atom(), term(), map()) :: :ok | :no_producer | {:error, :timeout}

Push a payload into the producer, blocking if the buffer is full until space frees (see "Back-pressure"). meta becomes the Broadway.Message's :metadata (the runtime uses :bloccs_attempt for retries).

Returns :ok, :no_producer (name not registered), or {:error, :timeout} (waited past :push_timeout).

push_after(name, payload, meta, delay_ms)

@spec push_after(atom(), term(), map(), non_neg_integer()) :: :ok | :no_producer

Re-enqueue a payload after delay_ms (used by the runtime to schedule a retry with backoff; the timer lives in the supervised producer process). If the buffer is full when the timer fires, the enqueue is retried shortly rather than dropped. Returns :ok or :no_producer.

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Start a producer. The :name option is the canonical Registry key the router pushes to; :buffer (optional) bounds the queue for back-pressure.

stats(name)

@spec stats(atom()) :: {:ok, stats()} | :no_producer | {:error, :timeout}

Snapshot a producer's queue state by canonical name. A bounded, safe call (5s timeout) — never reach into the process with :sys. Returns :no_producer if the name isn't registered, {:error, :timeout} if the producer is wedged.