ExMCP.Transport.Beam.Correlation (ex_mcp v0.10.0)
View SourceRequest-response correlation using Registry for efficient lookups.
Provides automatic cleanup when processes die and supports distributed scenarios. Uses Registry's built-in process linking for automatic cleanup.
Features
- Automatic cleanup when processes terminate
- Efficient O(1) lookup performance
- Distributed operation support
- Timeout handling with automatic cleanup
- Request tracking and metrics
Usage
# Register a pending request
correlation_id = Correlation.register_request(self(), %{timeout: 5000})
# Send request with correlation ID
send_request(message, correlation_id)
# Wait for response
case Correlation.wait_for_response(correlation_id, 5000) do
{:ok, response} -> handle_response(response)
{:error, :timeout} -> handle_timeout()
end
Summary
Functions
Cancels a pending request.
Returns a specification to start this module under a supervisor.
Gets information about a specific request.
Gets correlation statistics.
Lists all active request correlation IDs.
Registers a new request and returns a correlation ID.
Sends a response to a pending request.
Starts the correlation manager.
Waits for a response to a specific correlation ID.
Types
@type correlation_id() :: reference()
@type request_info() :: %{ requester_pid: pid(), started_at: integer(), timeout: non_neg_integer(), metadata: map() }
Functions
@spec cancel_request(correlation_id()) :: :ok | {:error, :not_found}
Cancels a pending request.
Examples
iex> Correlation.cancel_request(correlation_id)
:ok
iex> Correlation.cancel_request(invalid_id)
{:error, :not_found}
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec get_request_info(correlation_id()) :: {:ok, request_info()} | {:error, :not_found}
Gets information about a specific request.
@spec get_stats() :: map()
Gets correlation statistics.
@spec list_active_requests() :: [correlation_id()]
Lists all active request correlation IDs.
@spec register_request(pid(), map()) :: correlation_id()
Registers a new request and returns a correlation ID.
Examples
iex> correlation_id = Correlation.register_request(self())
#Reference<0.123.456.789>
iex> correlation_id = Correlation.register_request(self(), %{timeout: 10000})
#Reference<0.123.456.790>
@spec send_response(correlation_id(), term()) :: :ok | {:error, :not_found}
Sends a response to a pending request.
Examples
iex> Correlation.send_response(correlation_id, {:ok, result})
:ok
iex> Correlation.send_response(invalid_id, {:error, reason})
{:error, :not_found}
@spec start_link(keyword()) :: GenServer.on_start()
Starts the correlation manager.
@spec wait_for_response(correlation_id(), non_neg_integer()) :: {:ok, term()} | {:error, :timeout | :not_found}
Waits for a response to a specific correlation ID.
Examples
iex> Correlation.wait_for_response(correlation_id, 5000)
{:ok, %{result: "success"}}
iex> Correlation.wait_for_response(correlation_id, 1000)
{:error, :timeout}