barrel_mcp_session (barrel_mcp v2.0.2)

View Source

MCP Session Management.

Provides ETS-based session management for MCP Streamable HTTP transport. Sessions track client connections, protocol versions, and activity.

Summary

Functions

Push a notifications/<kind>/list_changed envelope to every session that has an active SSE channel. Tolerates a missing session manager (e.g. during stdio-only operation).

Cancel an in-flight tool call. Sends {cancel, RequestId} to the worker and {cancelled, RequestId} to the waiter, then drops the entry. Idempotent: a missing entry returns ok.

Cleanup sessions older than TTL milliseconds. Routes through the gen_server (the table owner under the new protected visibility); the handler deletes expired entries inline.

Drop an in-flight entry (called by the waiter after a normal completion).

Create a new session.

Delete a session.

Deliver a JSON-RPC response from the client back to the waiting caller. Called by the HTTP handler when an inbound POST contains a result or error for a server-initiated id.

Send elicitation/create to the client behind a session and wait for the response. The session must (a) exist, (b) have an active sse_pid, and (c) have declared elicitation capability in initialize.

Return SSE events newer than LastId (oldest first), or truncated when LastId is older than the oldest buffered event.

Generate a unique session ID.

Get a session by ID.

Read the current log level for a session. Defaults to info before any logging/setLevel is received.

Look up the negotiated protocol version for a session.

Whether a session declared elicitation capability in its initialize request.

Whether a session declared roots capability in its initialize request.

Whether a session declared sampling capability in its initialize request.

List all sessions.

List session ids whose client declared elicitation capability.

List session ids whose client declared roots capability.

List session ids whose client declared sampling capability.

Numeric priority for the eight RFC 5424 levels (debug=0, emergency=7). Higher = more severe. Used for filtering: a notification at priority N is delivered iff N >= configured level.

Push a notifications/progress envelope to a specific session over its SSE channel. Token is the progressToken the client supplied on the originating request.

Record an in-flight tool call so a later notifications/cancelled can find the worker and waiter.

Append an SSE event to the session's ring buffer for later replay via Last-Event-ID.

Send roots/list to the client behind a session and wait for the response. The session must (a) exist, (b) have an active sse_pid, and (c) have declared roots capability in initialize.

Send sampling/createMessage to the client behind a session and wait for the response. The session must (a) exist, (b) have an active sse_pid, and (c) have declared sampling capability in initialize.

Set the client_capabilities map for a session. Called from the protocol handler after parsing the initialize request.

Set the per-session log level (driven by logging/setLevel). Level is one of the eight RFC 5424 levels accepted by the MCP spec; rejects anything else with {error, invalid_level}.

Record the negotiated protocol version on a session. Called by the HTTP transport after a successful initialize so later requests on the same session can fall back to it when the client omits the MCP-Protocol-Version header.

Configure the maximum number of SSE events buffered per session for replay.

Set the SSE process pid for a session.

Start the session manager.

Subscribe a session to resource updates for a given URI.

Return all session ids that subscribed to a URI.

Update last activity timestamp.

Types

handler_type/0

-type handler_type() :: tool | resource | prompt | resource_template | completion.

log_level/0

-type log_level() :: debug | info | notice | warning | error | critical | alert | emergency.

Functions

broadcast_list_changed(Kind)

-spec broadcast_list_changed(handler_type()) -> ok.

Push a notifications/<kind>/list_changed envelope to every session that has an active SSE channel. Tolerates a missing session manager (e.g. during stdio-only operation).

cancel_in_flight(SessionId, RequestId)

-spec cancel_in_flight(binary(), integer() | binary()) -> ok.

Cancel an in-flight tool call. Sends {cancel, RequestId} to the worker and {cancelled, RequestId} to the waiter, then drops the entry. Idempotent: a missing entry returns ok.

cleanup_expired(TTL)

-spec cleanup_expired(pos_integer()) -> non_neg_integer().

Cleanup sessions older than TTL milliseconds. Routes through the gen_server (the table owner under the new protected visibility); the handler deletes expired entries inline.

clear_in_flight(SessionId, RequestId)

-spec clear_in_flight(binary(), integer() | binary()) -> ok.

Drop an in-flight entry (called by the waiter after a normal completion).

create(Opts)

-spec create(Opts) -> {ok, binary()} when Opts :: #{client_info => map(), protocol_version => binary()}.

Create a new session.

delete(SessionId)

-spec delete(binary()) -> ok.

Delete a session.

deliver_response(Id, Response)

-spec deliver_response(binary() | integer(), map()) -> ok | {error, unknown_id}.

Deliver a JSON-RPC response from the client back to the waiting caller. Called by the HTTP handler when an inbound POST contains a result or error for a server-initiated id.

elicit_create(SessionId, Params, Opts)

-spec elicit_create(binary(), map(), map()) ->
                       {ok, map()} | {error, timeout | not_supported | no_sse | not_found | term()}.

Send elicitation/create to the client behind a session and wait for the response. The session must (a) exist, (b) have an active sse_pid, and (c) have declared elicitation capability in initialize.

events_since(SessionId, LastId)

-spec events_since(binary(), binary()) -> {ok, [{binary(), map()}]} | truncated | {error, not_found}.

Return SSE events newer than LastId (oldest first), or truncated when LastId is older than the oldest buffered event.

generate_id()

-spec generate_id() -> binary().

Generate a unique session ID.

get(SessionId)

-spec get(binary()) -> {ok, map()} | {error, not_found}.

Get a session by ID.

get_log_level(SessionId)

-spec get_log_level(binary()) -> {ok, log_level()} | {error, not_found}.

Read the current log level for a session. Defaults to info before any logging/setLevel is received.

get_protocol_version(SessionId)

-spec get_protocol_version(binary()) -> {ok, binary()} | {error, not_found}.

Look up the negotiated protocol version for a session.

get_sse_pid(SessionId)

-spec get_sse_pid(binary()) -> {ok, pid()} | {error, not_found | no_sse}.

handle_call(Request, From, State)

handle_cast(Msg, State)

handle_info(Info, State)

has_elicitation(SessionId)

-spec has_elicitation(binary()) -> boolean().

Whether a session declared elicitation capability in its initialize request.

has_roots(SessionId)

-spec has_roots(binary()) -> boolean().

Whether a session declared roots capability in its initialize request.

has_sampling(SessionId)

-spec has_sampling(binary()) -> boolean().

Whether a session declared sampling capability in its initialize request.

init(_)

list()

-spec list() -> [map()].

List all sessions.

list_elicitation_capable()

-spec list_elicitation_capable() -> [binary()].

List session ids whose client declared elicitation capability.

list_roots_capable()

-spec list_roots_capable() -> [binary()].

List session ids whose client declared roots capability.

list_sampling_capable()

-spec list_sampling_capable() -> [binary()].

List session ids whose client declared sampling capability.

log_level_priority(Level)

-spec log_level_priority(log_level() | binary()) -> 0..7 | error.

Numeric priority for the eight RFC 5424 levels (debug=0, emergency=7). Higher = more severe. Used for filtering: a notification at priority N is delivered iff N >= configured level.

notify_progress(SessionId, Token, Progress, Total)

-spec notify_progress(binary(), term(), number(), number() | undefined) -> ok.

Push a notifications/progress envelope to a specific session over its SSE channel. Token is the progressToken the client supplied on the originating request.

record_in_flight(SessionId, RequestId, WorkerPid, WaiterPid)

-spec record_in_flight(binary(), integer() | binary(), pid(), pid()) -> ok.

Record an in-flight tool call so a later notifications/cancelled can find the worker and waiter.

record_sse_event(SessionId, EventId, Payload)

-spec record_sse_event(binary(), binary(), map()) -> ok.

Append an SSE event to the session's ring buffer for later replay via Last-Event-ID.

roots_list(SessionId, Opts)

-spec roots_list(binary(), map()) ->
                    {ok, [map()]} | {error, timeout | not_supported | no_sse | not_found | term()}.

Send roots/list to the client behind a session and wait for the response. The session must (a) exist, (b) have an active sse_pid, and (c) have declared roots capability in initialize.

sampling_create_message(SessionId, Params, Opts)

-spec sampling_create_message(binary(), map(), map()) ->
                                 {ok, map(), map()} |
                                 {error, timeout | not_supported | no_sse | not_found | term()}.

Send sampling/createMessage to the client behind a session and wait for the response. The session must (a) exist, (b) have an active sse_pid, and (c) have declared sampling capability in initialize.

set_client_capabilities(SessionId, Capabilities)

-spec set_client_capabilities(binary(), map()) -> ok | {error, not_found}.

Set the client_capabilities map for a session. Called from the protocol handler after parsing the initialize request.

set_log_level(SessionId, Level)

-spec set_log_level(binary(), log_level() | binary()) -> ok | {error, not_found | invalid_level}.

Set the per-session log level (driven by logging/setLevel). Level is one of the eight RFC 5424 levels accepted by the MCP spec; rejects anything else with {error, invalid_level}.

set_protocol_version(SessionId, Version)

-spec set_protocol_version(binary(), binary()) -> ok | {error, not_found}.

Record the negotiated protocol version on a session. Called by the HTTP transport after a successful initialize so later requests on the same session can fall back to it when the client omits the MCP-Protocol-Version header.

set_sse_buffer_max(SessionId, Max)

-spec set_sse_buffer_max(binary(), pos_integer()) -> ok | {error, not_found}.

Configure the maximum number of SSE events buffered per session for replay.

set_sse_pid(SessionId, Pid)

-spec set_sse_pid(binary(), pid() | undefined) -> ok | {error, not_found}.

Set the SSE process pid for a session.

start_link()

-spec start_link() -> {ok, pid()} | {error, term()}.

Start the session manager.

subscribe_resource(SessionId, Uri)

-spec subscribe_resource(binary(), binary()) -> ok.

Subscribe a session to resource updates for a given URI.

subscribers_for(Uri)

-spec subscribers_for(binary()) -> [binary()].

Return all session ids that subscribed to a URI.

terminate(Reason, State)

unsubscribe_resource(SessionId, Uri)

-spec unsubscribe_resource(binary(), binary()) -> ok.

update_activity(SessionId)

-spec update_activity(binary()) -> ok | {error, not_found}.

Update last activity timestamp.