Aerospike.Cluster.NodeTransport behaviour (Aerospike Driver v0.3.1)

Copy Markdown View Source

Behaviour for per-node I/O, isolating cluster logic from transport details.

The Tender, Router, and command modules call this behaviour — they must never reach :gen_tcp directly. The same contract is served by a real TCP implementation in production and a scripted fake in tests, so every cluster-logic path is exercised deterministically without sockets.

This behaviour is intentionally narrow: info/2, unary command/4, multi-frame command_stream/4, optional login/2, and optional stream open/read/close callbacks are the execution shapes supported today. Streaming replies and fan-out orchestration are separate contracts, not hidden variants of command/4.

Implementations are free to choose their own conn representation (a socket, a pid, a reference, a struct). The opaque type prevents transport internals from leaking into callers.

Summary

Types

Authentication mode used by transports that implement login/2.

Options accepted by command/4.

Keyword list accepted by command/4 and command_stream/4.

Opaque connection handle returned by connect/3.

Connection option accepted by connect/3.

Keyword list of connection options accepted by connect/3.

Login option accepted by login/2.

Keyword list accepted by login/2.

Parsed admin-protocol login reply returned by login/2.

Opaque stream handle returned by stream_open/4.

Option accepted by stream_open/4.

Keyword list accepted by stream_open/4.

Callbacks

Closes the connection. Must be idempotent — calling close/1 on an already-closed handle returns :ok.

Sends a pre-encoded request and returns the full reply bytes.

Sends a pre-encoded request and reads a multi-frame reply through the terminal marker, returning the concatenated reply frames or bodies appropriate for opts[:message_type].

Opens a connection to host:port.

Issues one info request containing commands and returns the parsed key/value map from the server's reply.

Runs the admin-protocol login (or session-token authenticate) handshake on an already-connected socket and returns the parsed reply.

Closes an open stream handle. Must be idempotent.

Opens a streaming request and returns an opaque handle for reading frames.

Reads the next frame from an open stream.

Types

auth_mode()

@type auth_mode() :: :internal | :external | :pki

Authentication mode used by transports that implement login/2.

command_option()

@type command_option() ::
  {:use_compression, boolean()}
  | {:message_type, :as_msg | :admin}
  | {:attempt, non_neg_integer()}

Options accepted by command/4.

  • :use_compression — when true, requests whose encoded size exceeds a fixed 128-byte threshold are wrapped in a type-4 (AS_MSG_COMPRESSED) proto frame before being sent. Smaller requests are sent plain even when the flag is set. Implementations that ignore compression must still accept the option without error. Defaults to false.
  • :message_type:as_msg (default) or :admin. :admin is the Aerospike admin-protocol reply type used by security commands.
  • :attempt — zero-based retry attempt index attached to telemetry metadata. Defaults to 0 when omitted.

command_opts()

@type command_opts() :: [command_option()]

Keyword list accepted by command/4 and command_stream/4.

conn()

@type conn() :: term()

Opaque connection handle returned by connect/3.

connect_option()

@type connect_option() :: {atom(), term()}

Connection option accepted by connect/3.

Implementations may support additional keys. The built-in TCP/TLS transports document their concrete option sets in Aerospike.Transport.Tcp and Aerospike.Transport.Tls.

connect_opts()

@type connect_opts() :: [connect_option()]

Keyword list of connection options accepted by connect/3.

login_option()

@type login_option() ::
  {:user, String.t()}
  | {:password, String.t()}
  | {:auth_mode, auth_mode()}
  | {:session_token, binary()}
  | {:login_timeout_ms, pos_integer()}

Login option accepted by login/2.

  • :user — username. Required for password login and session-token authenticate, omitted for PKI login.
  • :password — cleartext password for internal/external login.
  • :auth_mode:internal, :external, or :pki.
  • :session_token — token used to run AUTHENTICATE instead of LOGIN.
  • :login_timeout_ms — read deadline applied to the login reply.

login_opts()

@type login_opts() :: [login_option()]

Keyword list accepted by login/2.

login_reply()

@type login_reply() ::
  :ok_no_token
  | {:session, binary(), non_neg_integer() | nil}
  | :security_not_enabled

Parsed admin-protocol login reply returned by login/2.

  • :ok_no_token — server accepted the login but issued no session token (e.g. PKI with no user mapping).
  • {:session, token, ttl_seconds_or_nil} — server accepted the login and issued token. ttl_seconds is the server-reported TTL (nil if omitted).
  • :security_not_enabled — server has security disabled (result code 52); callers treat this as success with no token.

stream()

@type stream() :: term()

Opaque stream handle returned by stream_open/4.

stream_option()

@type stream_option() :: {:use_compression, boolean()} | {:attempt, non_neg_integer()}

Option accepted by stream_open/4.

The built-in TCP/TLS transports accept :use_compression and :attempt, matching command_opts/0; implementations may ignore keys they do not support.

stream_opts()

@type stream_opts() :: [stream_option()]

Keyword list accepted by stream_open/4.

Callbacks

close(conn)

@callback close(conn()) :: :ok

Closes the connection. Must be idempotent — calling close/1 on an already-closed handle returns :ok.

command(conn, request, deadline_ms, opts)

@callback command(
  conn(),
  request :: iodata(),
  deadline_ms :: non_neg_integer(),
  opts :: command_opts()
) :: {:ok, binary()} | {:error, Aerospike.Error.t()}

Sends a pre-encoded request and returns the full reply bytes.

The request is expected to be complete wire bytes (proto header + body) produced by the command encoder. The reply is the full response payload; framing/parsing is the caller's responsibility.

deadline_ms is a per-socket-read deadline in milliseconds applied to each :gen_tcp.recv/3 call (header and body are read separately on passive {:packet, :raw} sockets; see Aerospike.Transport.Tcp). It is deliberately separate from the caller's total-operation budget: a slow node can blow its read deadline without the caller having to track a monotonic deadline manually. The caller remains responsible for the overall operation budget — the transport does not enforce it.

opts is a keyword list documented by command_opts/0.

Single request, single response — streaming and multi-frame replies (scan, query) are out of scope for this behaviour.

command_stream(conn, request, deadline_ms, opts)

@callback command_stream(
  conn(),
  request :: iodata(),
  deadline_ms :: non_neg_integer(),
  opts :: command_opts()
) :: {:ok, binary()} | {:error, Aerospike.Error.t()}

Sends a pre-encoded request and reads a multi-frame reply through the terminal marker, returning the concatenated reply frames or bodies appropriate for opts[:message_type].

This seam exists for batch-style requests that still complete as one bounded command but arrive as multiple protocol frames. Unary callers must continue using command/4; long-lived incremental consumers should use the stream callbacks.

connect(host, port, opts)

@callback connect(host :: String.t(), port :: :inet.port_number(), opts :: connect_opts()) ::
  {:ok, conn()} | {:error, Aerospike.Error.t()}

Opens a connection to host:port.

On success, returns an opaque connection handle suitable for passing to the other callbacks. On failure, returns an Aerospike.Error tagged with a transport-appropriate code (e.g. :connection_error, :timeout).

info(conn, commands)

@callback info(conn(), commands :: [String.t()]) ::
  {:ok, %{required(String.t()) => String.t()}} | {:error, Aerospike.Error.t()}

Issues one info request containing commands and returns the parsed key/value map from the server's reply.

The single-round-trip shape matches the Aerospike info protocol: all commands ship in one request, all results come back in one response.

login(conn, opts)

(optional)
@callback login(conn(), opts :: login_opts()) ::
  {:ok, login_reply()} | {:error, Aerospike.Error.t()}

Runs the admin-protocol login (or session-token authenticate) handshake on an already-connected socket and returns the parsed reply.

Optional — transports that do not support authentication raise UndefinedFunctionError; callers that use this callback must ensure the configured transport implements it.

Opts:

  • :user — username (required when :session_token is absent).
  • :password — cleartext password (required when :session_token is absent; the transport hashes it as the server requires).
  • :auth_mode:internal, :external, or :pki.
  • :session_token — when present, runs AUTHENTICATE instead of a fresh LOGIN. :user is still required.
  • :login_timeout_ms — read deadline applied to the login reply. Transport-specific default.

stream_close(stream)

(optional)
@callback stream_close(stream()) :: :ok

Closes an open stream handle. Must be idempotent.

stream_open(conn, request, deadline_ms, opts)

(optional)
@callback stream_open(
  conn(),
  request :: iodata(),
  deadline_ms :: non_neg_integer(),
  opts :: stream_opts()
) :: {:ok, stream()} | {:error, Aerospike.Error.t()}

Opens a streaming request and returns an opaque handle for reading frames.

The transport owns the long-lived socket state and delivery mechanics; the caller owns any parsing of returned frames. The request bytes are already encoded when they reach the transport.

stream_read(stream, deadline_ms)

(optional)
@callback stream_read(stream(), deadline_ms :: non_neg_integer()) ::
  {:ok, binary()} | :done | {:error, Aerospike.Error.t()}

Reads the next frame from an open stream.

Returns {:ok, frame} for a delivered frame, :done when the stream has reached end-of-stream, or {:error, error} for a transport-class failure.