ReqManagedAgents.Provider behaviour (ReqManagedAgents v0.1.0)

Copy Markdown View Source

Contract a streaming agent backend implements so one ReqManagedAgents.Session loop can drive ANY provider. A provider owns its transport mode and invocation end-to-end:

  • :streaming (push) — the server holds a connection open and pushes events; the client posts inputs out-of-band; a turn ends on a boundary event (turn_boundary?/1).
  • :request_response (pull) — the client calls (poll_turn/2) and the server answers the whole turn; resume re-sends the conversation delta.

The canonical vocabulary uses Anthropic's custom_tool_use / custom_tool_result terms and names ONLY the client-side / return-of-control species; server-side tools are observe-only.

Raw preservation. Normalization is additive, never lossy: turn_outcome carries the raw, JSON-decoded provider events it was derived from alongside the normalized fields, so a consumer can always cross-reference the provider's own wire documentation.

Summary

Types

Provider-private connection / session handle.

A raw, decoded provider event (string-keyed wire map).

Provider-private handle to a provisioned, reusable server-side resource.

Provider-private input that drives the next turn.

A provider-agnostic agent definition — the input to provisioning and the cache key.

Callbacks

Input that kicks off the conversation (the initial user message).

Fold a turn's accumulated events into the canonical turn outcome (carries raw events).

Establish the connection/session; for :streaming, open the event stream to subscriber.

:request_response only — run one turn synchronously: send input, return the turn's events.

Create (or look up) the provider-side agent resource for spec; return a durable handle.

:streaming only — post input; events arrive asynchronously at the subscriber.

:streaming only — after a stream drop, re-open the stream (delivering to subscriber) and return any unanswered tool calls to re-drive locally, plus the grown seen set.

Input that resumes the loop after local tools ran (the mode's resume contract).

Delete the provider-side resource named by handle. opts carries the client / test seam.

:streaming only — does this event close a turn (so accumulated events form one)?

Input for a follow-up user message into a running session.

Functions

Extract a canonical %ToolResult{} from a Tools.run/6 wire event (user.custom_tool_result shape), given the tool-use id it answers.

Types

conn()

@type conn() :: term()

Provider-private connection / session handle.

event()

@type event() :: %{required(String.t()) => term()}

A raw, decoded provider event (string-keyed wire map).

handle()

@type handle() :: term()

Provider-private handle to a provisioned, reusable server-side resource.

input()

@type input() :: term()

Provider-private input that drives the next turn.

spec()

@type spec() :: %{
  system_prompt: String.t(),
  tools: [map()],
  terminal_tool: String.t() | nil,
  model_config: term()
}

A provider-agnostic agent definition — the input to provisioning and the cache key.

terminal()

@type terminal() :: :end_turn | :requires_action | :terminated

Callbacks

kickoff_input(opts)

@callback kickoff_input(opts :: keyword()) :: input()

Input that kicks off the conversation (the initial user message).

mode()

@callback mode() :: :streaming | :request_response

normalize(list)

@callback normalize([event()]) :: ReqManagedAgents.TurnResult.t()

Fold a turn's accumulated events into the canonical turn outcome (carries raw events).

open(opts, subscriber)

@callback open(opts :: keyword(), subscriber :: pid()) :: {:ok, conn()} | {:error, term()}

Establish the connection/session; for :streaming, open the event stream to subscriber.

poll_turn(conn, input)

(optional)
@callback poll_turn(conn(), input()) :: {:ok, [event()], conn()} | {:error, term()}

:request_response only — run one turn synchronously: send input, return the turn's events.

provision(spec, opts)

@callback provision(spec(), opts :: keyword()) :: {:ok, handle()} | {:error, term()}

Create (or look up) the provider-side agent resource for spec; return a durable handle.

push_input(conn, input)

(optional)
@callback push_input(conn(), input()) :: :ok | {:error, term()}

:streaming only — post input; events arrive asynchronously at the subscriber.

reconnect(conn, subscriber, seen)

(optional)
@callback reconnect(conn(), subscriber :: pid(), seen :: MapSet.t()) ::
  {:ok, conn(), [ReqManagedAgents.ToolUse.t()], MapSet.t()} | {:error, term()}

:streaming only — after a stream drop, re-open the stream (delivering to subscriber) and return any unanswered tool calls to re-drive locally, plus the grown seen set.

resume_input(tool_uses, results)

@callback resume_input(
  tool_uses :: [ReqManagedAgents.ToolUse.t()],
  results :: [ReqManagedAgents.ToolResult.t()]
) :: input()

Input that resumes the loop after local tools ran (the mode's resume contract).

teardown(handle, opts)

(optional)
@callback teardown(handle(), opts :: keyword()) :: :ok | {:error, term()}

Delete the provider-side resource named by handle. opts carries the client / test seam.

turn_boundary?(event)

(optional)
@callback turn_boundary?(event()) :: boolean()

:streaming only — does this event close a turn (so accumulated events form one)?

user_input(text)

@callback user_input(text :: String.t()) :: input()

Input for a follow-up user message into a running session.

Functions

result_of(id, tool_event)

@spec result_of(String.t(), event()) :: ReqManagedAgents.ToolResult.t()

Extract a canonical %ToolResult{} from a Tools.run/6 wire event (user.custom_tool_result shape), given the tool-use id it answers.