DatagroutConduit.Transport.Ws (DataGrout Conduit v0.6.0)

Copy Markdown View Source

WebSocket transport for datagrout-jsonrpc.v1.

Manages a single wss:// connection with concurrent JSON-RPC request multiplexing and server-push subscriptions. Concurrent requests are correlated by JSON-RPC id with no head-of-line blocking.

Usage

{:ok, ws} = DatagroutConduit.Transport.Ws.start_link(
  url: "wss://gateway.datagrout.ai/servers/<uuid>/ws",
  auth: {:bearer, "token"}
)

{:ok, result} = DatagroutConduit.Transport.Ws.send_request(ws, "tools/list")

Push subscriptions

Subscribe to dotted-namespace topics and receive events as messages in the calling process's mailbox:

{:ok, sub_id} = DatagroutConduit.Transport.Ws.subscribe(ws, "agents.my-agent-id.events")

receive do
  {:subscription_event, ^sub_id, event} ->
    IO.inspect(event)
end

:ok = DatagroutConduit.Transport.Ws.unsubscribe(ws, sub_id)

Wire protocol

  • Subprotocol: datagrout-jsonrpc.v1
  • Connect URL: wss://<gateway>/servers/<uuid>/ws
  • Frame format: JSON-RPC 2.0, text frames only
  • Auth: Authorization: Bearer <token> in the upgrade headers

Summary

Functions

Returns a specification to start this module under a supervisor.

Send a JSON-RPC request over the WebSocket and wait for the response.

Start a supervised WebSocket transport process.

Subscribe to a dotted-namespace push topic.

Cancel a server-side push subscription.

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

send_request(pid, method, params \\ nil, timeout \\ 30000)

@spec send_request(GenServer.server(), String.t(), map() | nil, timeout()) ::
  {:ok, term()} | {:error, term()}

Send a JSON-RPC request over the WebSocket and wait for the response.

Returns {:ok, result} or {:error, reason}.

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Start a supervised WebSocket transport process.

subscribe(pid, topic, timeout \\ 10000)

@spec subscribe(GenServer.server(), String.t(), timeout()) ::
  {:ok, String.t()} | {:error, term()}

Subscribe to a dotted-namespace push topic.

Events are delivered as {:subscription_event, subscription_id, event} messages to the calling process's mailbox, where event is a map with :event and :data keys.

Returns {:ok, subscription_id} on success.

unsubscribe(pid, subscription_id, timeout \\ 10000)

@spec unsubscribe(GenServer.server(), String.t(), timeout()) :: :ok | {:error, term()}

Cancel a server-side push subscription.

Removes the subscription locally and notifies the server. Returns :ok.