ExMCP.Transport.Beam.Correlation (ex_mcp v0.10.0)

View Source

Request-response correlation using Registry for efficient lookups.

Provides automatic cleanup when processes die and supports distributed scenarios. Uses Registry's built-in process linking for automatic cleanup.

Features

  • Automatic cleanup when processes terminate
  • Efficient O(1) lookup performance
  • Distributed operation support
  • Timeout handling with automatic cleanup
  • Request tracking and metrics

Usage

# Register a pending request
correlation_id = Correlation.register_request(self(), %{timeout: 5000})

# Send request with correlation ID
send_request(message, correlation_id)

# Wait for response
case Correlation.wait_for_response(correlation_id, 5000) do
  {:ok, response} -> handle_response(response)
  {:error, :timeout} -> handle_timeout()
end

Summary

Functions

Cancels a pending request.

Returns a specification to start this module under a supervisor.

Gets information about a specific request.

Gets correlation statistics.

Lists all active request correlation IDs.

Registers a new request and returns a correlation ID.

Sends a response to a pending request.

Starts the correlation manager.

Waits for a response to a specific correlation ID.

Types

correlation_id()

@type correlation_id() :: reference()

request_info()

@type request_info() :: %{
  requester_pid: pid(),
  started_at: integer(),
  timeout: non_neg_integer(),
  metadata: map()
}

Functions

cancel_request(correlation_id)

@spec cancel_request(correlation_id()) :: :ok | {:error, :not_found}

Cancels a pending request.

Examples

iex> Correlation.cancel_request(correlation_id)
:ok

iex> Correlation.cancel_request(invalid_id)
{:error, :not_found}

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

get_request_info(correlation_id)

@spec get_request_info(correlation_id()) ::
  {:ok, request_info()} | {:error, :not_found}

Gets information about a specific request.

get_stats()

@spec get_stats() :: map()

Gets correlation statistics.

list_active_requests()

@spec list_active_requests() :: [correlation_id()]

Lists all active request correlation IDs.

register_request(requester_pid, metadata \\ %{})

@spec register_request(pid(), map()) :: correlation_id()

Registers a new request and returns a correlation ID.

Examples

iex> correlation_id = Correlation.register_request(self())
#Reference<0.123.456.789>

iex> correlation_id = Correlation.register_request(self(), %{timeout: 10000})
#Reference<0.123.456.790>

send_response(correlation_id, response)

@spec send_response(correlation_id(), term()) :: :ok | {:error, :not_found}

Sends a response to a pending request.

Examples

iex> Correlation.send_response(correlation_id, {:ok, result})
:ok

iex> Correlation.send_response(invalid_id, {:error, reason})
{:error, :not_found}

start_link(opts \\ [])

@spec start_link(keyword()) :: GenServer.on_start()

Starts the correlation manager.

wait_for_response(correlation_id, timeout \\ 30000)

@spec wait_for_response(correlation_id(), non_neg_integer()) ::
  {:ok, term()} | {:error, :timeout | :not_found}

Waits for a response to a specific correlation ID.

Examples

iex> Correlation.wait_for_response(correlation_id, 5000)
{:ok, %{result: "success"}}

iex> Correlation.wait_for_response(correlation_id, 1000)
{:error, :timeout}