Aerospike.Transport.Tcp (Aerospike Driver v0.3.1)

Copy Markdown View Source

Plaintext :gen_tcp implementation of Aerospike.Cluster.NodeTransport.

One socket per connection, no pooling, no compression on by default. Each call is a single request/response over a passive (active: false) socket with :packet, :raw framing. The Aerospike 8-byte protocol header determines body length, so framing is owned by this module — not by :gen_tcp.

The connection struct stores the socket tagged with the module that owns it (:gen_tcp here; :ssl when the handle is produced by Aerospike.Transport.Tls). Every post-connect operation dispatches through that module, so framing, compression, auth, and telemetry are shared between the plaintext and TLS transports — the only real difference is how the socket was opened. Streaming follows the same split: the transport owns raw frame delivery and the stream handle only carries socket lifecycle state. Aerospike.Transport.Tls delegates its command/4, command_stream/4, info/2, stream_open/4, stream_read/2, stream_close/1, close/1, and login/2 callbacks here for that reason.

Reads use :gen_tcp.recv(socket, N, timeout) with N > 0, which blocks inside the VM until exactly N bytes arrive, so server-side TCP fragmentation is transparent: the header and body are read in two exact recv calls regardless of how the kernel delivers them. Coalescing the two reads is not possible because the body length lives in the header, and a per-connection read buffer would only pay off for pipelined in-flight requests, which this transport does not support.

The read deadline is supplied per command call by the caller rather than stored on the connection, so a retry layer can budget each attempt independently. info/2 still uses the default connect-time timeout because it is only issued from the Tender path, which has no per-call deadline of its own.

Failures are returned as {:error, %Aerospike.Error{}} — sockets are not reused after an error and the caller is expected to close/1 them.

connect/3 options

Every option is a key in the opts keyword list. Unknown keys are ignored so the same keyword list can be shared across transport implementations.

  • :connect_timeout_ms — milliseconds to wait for the TCP handshake and for the :gen_tcp.send/2 write buffer to drain. Defaults to 5000 ms.
  • :info_timeout — read deadline applied to every info/2 call. Defaults to :connect_timeout_ms so a caller that sets one value gets consistent behaviour across connect and info probes.
  • :tcp_nodelay — boolean. When true (default), the socket is opened with {:nodelay, true} so small info probes are not delayed by Nagle. Set false to let the kernel coalesce writes.
  • :tcp_keepalive — boolean. When true (default), the socket is opened with {:keepalive, true} so the kernel probes a half-open peer independently of the driver's tend loop.
  • :tcp_sndbuf — positive integer. When set, translates to {:sndbuf, n}. Left unset (default nil) the kernel picks its own send-buffer size.
  • :tcp_rcvbuf — positive integer. When set, translates to {:recbuf, n} (the :gen_tcp spelling of SO_RCVBUF). Left unset (default nil) the kernel picks its own receive-buffer size.
  • :node_name — opaque label stashed on the returned connection handle and attached to every telemetry event emitted for that handle. nil (default) when the caller does not know the node name yet — e.g. seed bootstrap and peer-discovery probes open sockets before the node info key has been read.
  • :auth_mode:internal (default), :external, or :pki. External auth includes the clear password in the login frame and should only be used over TLS. PKI sends an empty login frame and relies on the TLS client certificate.
  • :user / :password — internal/external session login credentials. When both are present, connect/3 runs the admin-protocol login handshake immediately after the TCP handshake and returns the authenticated socket. On a server with security disabled the login result code SECURITY_NOT_ENABLED (52) is treated as a successful no-op and the socket is returned as usual. Any other non-zero result closes the socket and surfaces as an %Aerospike.Error{}.
  • :session_token — opaque session token issued by an earlier login. When present, connect/3 sends an AUTHENTICATE command on the fresh socket instead of the full password handshake. :user must be present alongside the token. The transport returns {:error, %Aerospike.Error{code: :expired_session}} when the server rejects the token; the caller is expected to retry with :user/:password to acquire a fresh token.
  • :login_timeout_ms — read deadline applied to the login reply. Defaults to :connect_timeout_ms.

See :inet.setopts/2 for the underlying semantics. Opt translation happens once in connect/3; callers pass the public names above.

Telemetry

Every command/4 and command_stream/4 emit a [:aerospike, :command, :send] span around the socket write and a [:aerospike, :command, :recv] span around the response read. Every info/2 emits a [:aerospike, :info, :rpc] span around the full round trip. Metadata keys follow Aerospike.Telemetry's taxonomy (:node_name, :attempt, :deadline_ms for commands; :node_name, :commands for info). command/4 callers that do not pass an :attempt key (every retry-driver call does) default to 0 in metadata.

Summary

Types

Command option accepted by command/4 and command_stream/4.

Keyword list accepted by command/4 and command_stream/4.

Opaque TCP connection handle.

TCP connection option accepted by connect/3.

Keyword list accepted by connect/3.

Keyword list accepted by login/2.

Opaque stream handle owned by a dedicated socket worker.

Keyword list accepted by stream_open/4.

t()

Concrete connection handle returned by connect/3.

Functions

Closes the TCP or TLS socket.

Sends one pre-encoded command frame and returns one decoded response body.

Sends one pre-encoded command frame and reads a bounded multi-frame response.

Opens a plaintext TCP connection to an Aerospike node.

Sends one or more info commands and returns the decoded response map.

Runs the admin-protocol login or authenticate handshake on an open socket.

Runs the optional login/authenticate handshake after a TLS upgrade.

Closes a stream handle.

Returns true when a stream frame is marked as the terminal frame.

Sends a streaming request and returns a stream handle for incremental reads.

Reads the next frame from a stream opened by stream_open/4.

Wraps an upgraded SSL socket in the TCP transport connection struct.

Types

command_option()

Command option accepted by command/4 and command_stream/4.

See Aerospike.Cluster.NodeTransport.command_option/0 for the transport behaviour contract.

command_opts()

@type command_opts() :: [command_option()]

Keyword list accepted by command/4 and command_stream/4.

conn()

@opaque conn()

Opaque TCP connection handle.

connect_option()

@type connect_option() ::
  {:connect_timeout_ms, pos_integer()}
  | {:info_timeout, pos_integer()}
  | {:login_timeout_ms, pos_integer()}
  | {:tcp_nodelay, boolean()}
  | {:tcp_keepalive, boolean()}
  | {:tcp_sndbuf, pos_integer() | nil}
  | {:tcp_rcvbuf, pos_integer() | nil}
  | {:node_name, String.t() | nil}
  | {:auth_mode, Aerospike.Cluster.NodeTransport.auth_mode()}
  | {:user, String.t()}
  | {:password, String.t()}
  | {:session_token, binary()}

TCP connection option accepted by connect/3.

Timeout and socket tuning keys are applied during the TCP handshake. Auth keys trigger the admin-protocol login/authenticate handshake after connect. :node_name is an internal telemetry label injected by pool startup.

connect_opts()

@type connect_opts() :: [connect_option()]

Keyword list accepted by connect/3.

login_opts()

Keyword list accepted by login/2.

stream()

@opaque stream()

Opaque stream handle owned by a dedicated socket worker.

stream_opts()

Keyword list accepted by stream_open/4.

t()

@type t() :: %Aerospike.Transport.Tcp{
  info_timeout: non_neg_integer(),
  node_name: String.t() | nil,
  socket: :gen_tcp.socket() | :ssl.sslsocket(),
  socket_mod: :gen_tcp | :ssl
}

Concrete connection handle returned by connect/3.

Functions

close(tcp)

@spec close(conn()) :: :ok

Closes the TCP or TLS socket.

command(conn, request, deadline_ms, opts \\ [])

@spec command(conn(), iodata(), non_neg_integer(), command_opts()) ::
  {:ok, binary()} | {:error, Aerospike.Error.t()}

Sends one pre-encoded command frame and returns one decoded response body.

Accepts :use_compression, :message_type, and :attempt in opts.

command_stream(conn, request, deadline_ms, opts \\ [])

@spec command_stream(conn(), iodata(), non_neg_integer(), command_opts()) ::
  {:ok, binary()} | {:error, Aerospike.Error.t()}

Sends one pre-encoded command frame and reads a bounded multi-frame response.

Accepts the same options as command/4.

connect(host, port, opts \\ [])

@spec connect(String.t(), :inet.port_number(), connect_opts()) ::
  {:ok, conn()} | {:error, Aerospike.Error.t()}

Opens a plaintext TCP connection to an Aerospike node.

Options mirror the cluster :connect_opts accepted by Aerospike.start_link/1, including timeouts, TCP socket tuning, node name, and optional auth credentials. See connect_option/0 and the module docs for supported keys.

info(conn, commands)

@spec info(conn(), [String.t()]) ::
  {:ok, %{required(String.t()) => String.t()}} | {:error, Aerospike.Error.t()}

Sends one or more info commands and returns the decoded response map.

login(conn, opts)

Runs the admin-protocol login or authenticate handshake on an open socket.

The Tender uses this callback to obtain and cache session tokens. Callers own the socket and are expected to close it after an error. See Aerospike.Cluster.NodeTransport.login_opts/0 for supported keys.

maybe_login_after_handshake(conn, opts, host, port)

@spec maybe_login_after_handshake(
  conn(),
  connect_opts(),
  String.t(),
  :inet.port_number()
) ::
  {:ok, conn()} | {:error, Aerospike.Error.t()}

Runs the optional login/authenticate handshake after a TLS upgrade.

This package-internal helper lets Aerospike.Transport.Tls share the TCP transport's auth behavior.

stream_close(stream)

@spec stream_close(stream()) :: :ok

Closes a stream handle.

stream_last_frame?(arg1)

@spec stream_last_frame?(binary()) :: boolean()

Returns true when a stream frame is marked as the terminal frame.

stream_open(conn, request, deadline_ms, opts \\ [])

@spec stream_open(conn(), iodata(), non_neg_integer(), stream_opts()) ::
  {:ok, stream()} | {:error, Aerospike.Error.t()}

Sends a streaming request and returns a stream handle for incremental reads.

Accepts :use_compression and :attempt in opts; unsupported keys are ignored.

stream_read(stream, deadline_ms)

@spec stream_read(stream(), non_neg_integer()) ::
  {:ok, binary()} | :done | {:error, Aerospike.Error.t()}

Reads the next frame from a stream opened by stream_open/4.

wrap_ssl_socket(ssl_socket, opts)

@spec wrap_ssl_socket(:ssl.sslsocket(), connect_opts()) :: conn()

Wraps an upgraded SSL socket in the TCP transport connection struct.

This package-internal helper is used by Aerospike.Transport.Tls after the TLS handshake succeeds.