ExMCP.Transport.Beam.Cluster (ex_mcp v0.9.2)
View SourceNative BEAM clustering support for distributed MCP servers.
Provides service discovery, load balancing, and fault tolerance for MCP servers running across multiple Erlang nodes. Leverages BEAM's distributed computing capabilities for seamless clustering.
Features
- Service Discovery: Automatic registration and discovery of MCP servers
- Load Balancing: Distribute client connections across server instances
- Health Monitoring: Monitor service health and remove failed instances
- Fault Tolerance: Handle node failures and network partitions
- Dynamic Membership: Add/remove nodes from cluster at runtime
- Circuit Breakers: Protect against cascading failures
Architecture
The cluster uses a distributed registry pattern with the following components:
- Service Registry: Tracks available MCP services across nodes
- Health Monitor: Monitors service health and availability
- Load Balancer: Routes client requests to appropriate servers
- Partition Detector: Detects and handles network partitions
- Membership Manager: Manages cluster node membership
Example Usage
# Start a cluster coordinator
{:ok, cluster} = Cluster.start_link(%{
node_name: :mcp_cluster,
discovery_strategy: :distributed_registry,
health_check_interval: 5000
})
# Register an MCP service
service_info = %{
name: "calculator",
version: "1.0.0",
capabilities: ["tools"],
node: node(),
pid: server_pid
}
{:ok, service_id} = Cluster.register_service(cluster, service_info)
# Discover available services
{:ok, services} = Cluster.discover_services(cluster, %{name: "calculator"})
# Get a service instance with load balancing
{:ok, service} = Cluster.get_service(cluster, "calculator", strategy: :round_robin)
Summary
Functions
Adds a node to the cluster.
Returns a specification to start this module under a supervisor.
Discovers services matching the given criteria.
Gets a specific service by ID.
Gets cluster statistics and health information.
Heals a simulated network partition.
Lists all nodes in the cluster.
Records a failure for a service (used by circuit breakers).
Records a success for a service (used by circuit breakers).
Registers an MCP service in the cluster.
Removes a node from the cluster.
Simulates a node failure for testing purposes.
Simulates a network partition for testing.
Starts a cluster coordinator with the given configuration.
Stops the cluster coordinator.
Removes a service from the cluster registry.
Updates an existing service registration.
Types
@type cluster_config() :: %{ node_name: atom() | nil, discovery_strategy: :local_registry | :distributed_registry | :dns | :consul, health_check_enabled: boolean(), health_check_interval: non_neg_integer(), service_timeout: non_neg_integer(), node_monitoring: boolean(), failure_detection_timeout: non_neg_integer(), partition_detection: boolean(), merge_strategy: :last_writer_wins | :first_writer_wins | :manual, cluster_management: boolean(), failover_enabled: boolean() }
Functions
@spec add_node(GenServer.server(), atom(), map()) :: :ok | {:error, term()}
Adds a node to the cluster.
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec discover_services(GenServer.server(), map()) :: {:ok, [service_info()]} | {:error, term()}
Discovers services matching the given criteria.
Filter Options
:name- Service name to match:version- Service version to match:capabilities- Required capabilities (list):node- Specific node to search:exclude_circuit_broken- Exclude services with open circuit breakers:healthy_only- Only return healthy services
@spec get_service(GenServer.server(), String.t()) :: {:ok, service_info()} | {:error, :not_found}
Gets a specific service by ID.
@spec get_stats(GenServer.server()) :: {:ok, map()}
Gets cluster statistics and health information.
@spec heal_partition(GenServer.server()) :: :ok
Heals a simulated network partition.
@spec list_nodes(GenServer.server()) :: {:ok, [map()]}
Lists all nodes in the cluster.
@spec record_failure(GenServer.server(), String.t()) :: :ok
Records a failure for a service (used by circuit breakers).
@spec record_success(GenServer.server(), String.t()) :: :ok
Records a success for a service (used by circuit breakers).
@spec register_service(GenServer.server(), service_info()) :: {:ok, String.t()} | {:error, term()}
Registers an MCP service in the cluster.
Returns a unique service ID that can be used for updates and removal.
@spec remove_node(GenServer.server(), atom()) :: :ok
Removes a node from the cluster.
@spec simulate_node_failure(GenServer.server(), atom()) :: :ok
Simulates a node failure for testing purposes.
@spec simulate_partition(GenServer.server(), [atom()], [atom()]) :: :ok
Simulates a network partition for testing.
@spec start_link(cluster_config() | map()) :: GenServer.on_start()
Starts a cluster coordinator with the given configuration.
@spec stop(GenServer.server()) :: :ok
Stops the cluster coordinator.
@spec unregister_service(GenServer.server(), String.t()) :: :ok
Removes a service from the cluster registry.
@spec update_service(GenServer.server(), String.t(), service_info()) :: :ok | {:error, term()}
Updates an existing service registration.