ExMCP.SessionManager (ex_mcp v0.9.2)

View Source

Session management for streamable HTTP connections.

This module provides session management capabilities for MCP servers using streamable HTTP transports like Server-Sent Events (SSE). It handles session lifecycle, event buffering, and session resumption through Last-Event-ID support.

Features

  • Session lifecycle management (create, update, terminate)
  • Event buffering and replay for connection resumption
  • Last-Event-ID support for seamless reconnection
  • Session expiration and cleanup
  • Memory-efficient event storage with configurable limits
  • Session health monitoring and metrics

Session Lifecycle

  1. Session Creation: New sessions are created when SSE connections are established
  2. Event Storage: Events are buffered with unique IDs for potential replay
  3. Session Resumption: Clients can reconnect using Last-Event-ID header
  4. Session Termination: Sessions are terminated on explicit DELETE or timeout

Configuration

  • :max_events_per_session - Maximum events to buffer per session (default: 1000)
  • :session_ttl_seconds - Session TTL in seconds (default: 3600)
  • :cleanup_interval_ms - Cleanup interval in milliseconds (default: 60000)
  • :storage_backend - Storage backend (:ets or :persistent_term, default: :ets)

Usage

# Start the session manager
{:ok, _pid} = ExMCP.SessionManager.start_link([])

# Create a new session
session_id = ExMCP.SessionManager.create_session(%{
  transport: :sse,
  client_info: %{user_agent: "my-client/1.0"}
})

# Store an event
ExMCP.SessionManager.store_event(session_id, %{
  id: "event-123",
  type: "notification",
  data: %{message: "Hello"},
  timestamp: System.system_time(:microsecond)
})

# Replay events after a specific event ID
events = ExMCP.SessionManager.replay_events_after(session_id, "event-122")

# Terminate session
ExMCP.SessionManager.terminate_session(session_id)

Summary

Functions

Returns a specification to start this module under a supervisor.

Creates a new session with the given metadata.

Gets session information.

Gets session statistics.

Lists all active sessions.

Replays events for a session after the given event ID.

Replays events for a session after the given event ID and sends them to the handler.

Starts the session manager with optional configuration.

Stores an event for the given session.

Terminates a session and cleans up its events.

Updates session metadata or activity timestamp.

Types

config()

@type config() :: %{
  max_events_per_session: pos_integer(),
  session_ttl_seconds: pos_integer(),
  cleanup_interval_ms: pos_integer(),
  storage_backend: :ets | :persistent_term
}

event_data()

@type event_data() :: %{
  id: event_id(),
  session_id: session_id(),
  type: String.t(),
  data: term(),
  timestamp: integer()
}

event_id()

@type event_id() :: String.t()

session_data()

@type session_data() :: %{
  id: session_id(),
  transport: atom(),
  client_info: map(),
  created_at: integer(),
  last_activity: integer(),
  event_count: non_neg_integer(),
  status: :active | :terminated
}

session_id()

@type session_id() :: String.t()

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

create_session(metadata \\ %{})

@spec create_session(map()) :: session_id()

Creates a new session with the given metadata.

Returns a unique session ID that can be used for subsequent operations.

get_session(session_id)

@spec get_session(session_id()) ::
  {:ok, session_data()} | {:error, :session_not_found}

Gets session information.

get_stats()

@spec get_stats() :: %{
  total_sessions: non_neg_integer(),
  active_sessions: non_neg_integer(),
  total_events: non_neg_integer(),
  memory_usage: non_neg_integer()
}

Gets session statistics.

list_sessions()

@spec list_sessions() :: [session_data()]

Lists all active sessions.

replay_events_after(session_id, last_event_id \\ nil)

@spec replay_events_after(session_id(), event_id() | nil) ::
  [event_data()] | {:error, :session_not_found}

Replays events for a session after the given event ID.

This is used when clients reconnect with a Last-Event-ID header to resume from where they left off.

replay_events_after(session_id, last_event_id, handler_pid)

@spec replay_events_after(session_id(), event_id() | nil, pid()) ::
  :ok | {:error, :session_not_found}

Replays events for a session after the given event ID and sends them to the handler.

This is the callback function referenced in SSEHandler for session replay.

start_link(opts \\ [])

@spec start_link(keyword()) :: GenServer.on_start()

Starts the session manager with optional configuration.

store_event(session_id, event_data)

@spec store_event(session_id(), event_data()) :: :ok | {:error, :session_not_found}

Stores an event for the given session.

Events are stored with their ID, type, data, and timestamp for potential replay during session resumption.

terminate_session(session_id)

@spec terminate_session(session_id()) :: :ok

Terminates a session and cleans up its events.

This should be called when SSE connections are explicitly closed or when DELETE requests are received.

update_session(session_id, updates)

@spec update_session(session_id(), map()) :: :ok | {:error, :session_not_found}

Updates session metadata or activity timestamp.