Jido.AI.PendingInputServer (Jido AI v2.2.0)

Copy Markdown View Source

Per-run FIFO queue for ReAct steering input.

The server is owned by the parent ReAct strategy process for the active run. Inputs are queued synchronously, drained by the runtime before LLM turns, and the queue can be sealed at terminal boundaries to reject late arrivals.

Summary

Functions

Returns a specification to start this module under a supervisor.

Returns all queued input items in FIFO order and clears the queue.

Returns all queued input items in FIFO order and clears the queue.

Enqueues a user-style input item for later consumption by the runtime.

Returns true when the queue currently contains at least one item.

Seals the queue so future enqueue attempts are rejected.

Seals the queue only when it is empty.

Starts a pending-input queue without linking it to the caller.

Starts a pending-input queue for a single active run.

Stops the queue process, ignoring shutdown races.

Types

input_item()

@type input_item() :: %{
  optional(:id) => String.t(),
  :content => String.t(),
  optional(:source) => term(),
  optional(:refs) => map() | nil,
  optional(:at_ms) => integer()
}

state()

@type state() :: %{
  owner: pid(),
  owner_ref: reference(),
  request_id: String.t() | nil,
  queue: :queue.queue(map()),
  queue_len: non_neg_integer(),
  max_queue_size: pos_integer(),
  sealed?: boolean()
}

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

drain(server, timeout \\ 5000)

@spec drain(GenServer.server(), timeout()) :: [map()]

Returns all queued input items in FIFO order and clears the queue.

drain_result(server, timeout \\ 5000)

@spec drain_result(GenServer.server(), timeout()) :: {:ok, [map()]} | {:error, term()}

Returns all queued input items in FIFO order and clears the queue.

Unlike drain/2, this preserves queue availability failures so callers can fail the active run instead of silently dropping steering state.

enqueue(server, input_item, timeout \\ 5000)

@spec enqueue(GenServer.server(), input_item(), timeout()) :: :ok | {:error, term()}

Enqueues a user-style input item for later consumption by the runtime.

has_pending?(server, timeout \\ 5000)

@spec has_pending?(GenServer.server(), timeout()) :: boolean()

Returns true when the queue currently contains at least one item.

seal(server, timeout \\ 5000)

@spec seal(GenServer.server(), timeout()) :: :ok | {:error, term()}

Seals the queue so future enqueue attempts are rejected.

seal_if_empty(server, timeout \\ 5000)

@spec seal_if_empty(GenServer.server(), timeout()) ::
  :sealed | :pending | {:error, term()}

Seals the queue only when it is empty.

Returns :sealed when the queue is empty and has been sealed, or :pending when items remain and the queue must be drained before completion.

start(opts)

@spec start(keyword()) :: GenServer.on_start()

Starts a pending-input queue without linking it to the caller.

The queue already monitors its :owner, so callers that want fault isolation should prefer this entrypoint over start_link/1.

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts a pending-input queue for a single active run.

stop(server)

@spec stop(GenServer.server()) :: :ok

Stops the queue process, ignoring shutdown races.