Urchin.Session (Urchin v0.2.0)

Copy Markdown View Source

Per-session state for the Streamable HTTP transport.

A session is created when a client completes the initialize handshake and is addressed thereafter by its MCP-Session-Id. It tracks:

  • negotiated protocol version, client info and capabilities
  • the requested minimum log level and resource subscriptions
  • the connected GET ("general") SSE stream plus a replay buffer for resumption
  • in-flight POST requests, so notifications/cancelled can stop them
  • outbound server-to-client requests, so client responses can be correlated

Transport (Plug) processes and handler tasks talk to this process; it never owns an HTTP connection itself.

Summary

Functions

Drops a pending outbound request (e.g. on timeout).

Returns true if the given in-flight request has been cancelled.

Returns a specification to start this module under a supervisor.

Removes a completed in-flight POST request.

Generates a cryptographically random, visible-ASCII session id.

Handles a client-originated notification or response delivered over POST.

Sends an unsolicited notification to the client on the general stream.

Registers (or replaces) the GET general stream, returning events to replay.

Allocates an outbound request id and records the caller awaiting the response.

Sets the minimum log level the client wishes to receive.

Returns a snapshot of the data needed to build a request context.

Starts a session under the session supervisor and returns its id and pid.

Registers an in-flight POST request so it can be cancelled, returning a unique per-session SSE stream id for that request's response stream.

Records a resource subscription.

Returns the set of subscribed resource URIs.

Terminates a session process.

Removes a resource subscription.

Looks up a live session pid by id.

Functions

cancel_outbound(pid, id)

@spec cancel_outbound(pid(), String.t()) :: :ok

Drops a pending outbound request (e.g. on timeout).

cancelled?(pid, request_id)

@spec cancelled?(pid(), Urchin.JSONRPC.id()) :: boolean()

Returns true if the given in-flight request has been cancelled.

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

finish_request(pid, request_id)

@spec finish_request(pid(), Urchin.JSONRPC.id()) :: :ok

Removes a completed in-flight POST request.

generate_id()

@spec generate_id() :: String.t()

Generates a cryptographically random, visible-ASCII session id.

handle_client_message(pid, message)

@spec handle_client_message(pid(), Urchin.JSONRPC.decoded()) :: :ok

Handles a client-originated notification or response delivered over POST.

Notifications (notifications/cancelled, notifications/initialized, ...) update session state; responses are correlated to a pending outbound request.

notify(pid, method, params \\ nil)

@spec notify(pid(), String.t(), map() | nil) :: :ok

Sends an unsolicited notification to the client on the general stream.

register_general_stream(pid, owner, resume_from \\ nil)

@spec register_general_stream(pid(), pid(), {String.t(), non_neg_integer()} | nil) ::
  {:ok, String.t(), [{String.t(), iodata()}]}

Registers (or replaces) the GET general stream, returning events to replay.

register_outbound(pid, caller)

@spec register_outbound(pid(), pid()) :: String.t()

Allocates an outbound request id and records the caller awaiting the response.

set_log_level(pid, level)

@spec set_log_level(pid(), String.t()) :: :ok

Sets the minimum log level the client wishes to receive.

snapshot(pid)

@spec snapshot(pid()) :: map()

Returns a snapshot of the data needed to build a request context.

start(opts)

@spec start(keyword()) :: {:ok, String.t(), pid()} | {:error, term()}

Starts a session under the session supervisor and returns its id and pid.

start_request(pid, request_id, task_pid, owner_pid)

@spec start_request(pid(), Urchin.JSONRPC.id(), pid(), pid()) :: String.t()

Registers an in-flight POST request so it can be cancelled, returning a unique per-session SSE stream id for that request's response stream.

subscribe(pid, uri)

@spec subscribe(pid(), String.t()) :: :ok

Records a resource subscription.

subscriptions(pid)

@spec subscriptions(pid()) :: MapSet.t()

Returns the set of subscribed resource URIs.

terminate(pid)

@spec terminate(pid()) :: :ok

Terminates a session process.

unsubscribe(pid, uri)

@spec unsubscribe(pid(), String.t()) :: :ok

Removes a resource subscription.

whereis(id)

@spec whereis(String.t()) :: pid() | nil

Looks up a live session pid by id.