Module em_filter_server

WebSocket Client for em_disco Connectivity.

Behaviours: gen_server.

Authors: Steve Roques.

Description

WebSocket Client for em_disco Connectivity

Manages a single persistent WebSocket connection to ONE em_disco node. em_filter_sup starts one server per configured disco node.

Connection lifecycle

1. init/1 sends self() ! connect and returns immediately. 2. handle_info(connect, ...) opens a Gun connection, upgrades to WebSocket and completes the 2-step handshake. 3. On connection loss, a reconnect is scheduled after reconnect_interval_ms milliseconds (application env).

The server process never stops due to transient network failures — reconnection is handled internally. ETS memory (if configured) survives reconnects because it is owned by the server process.

Authentication

em_disco requires a JWT passed as ?token=<jwt> in the WebSocket upgrade URL. The token is read from (in order of priority): 1. jwt_token key in the agent Config map 2. jwt_token application env in the em_filter application

If no token is configured the upgrade will be rejected with 401; the server logs a warning and schedules a reconnect.

Transport

tcp — plain WebSocket (ws://) tls — TLS WebSocket (wss://)

TLS uses verify_peer with the system CA store. SNI is set to the target host so wildcard certificates are validated correctly.

Function Index

code_change/3
handle_call/3Handle a direct HTTP query forwarded by em_filter_http.
handle_cast/2No asynchronous casts handled.
handle_info/2Handles incoming WebSocket frames from em_disco.
init/1Initialises the server state and schedules the first connection attempt.
start_link/5Starts a server linked to one specific disco node.
terminate/2Cancels the reconnect timer, closes the Gun connection, and deletes the ETS memory table if one was created.

Function Details

code_change/3

code_change(OldVsn, State, Extra) -> any()

handle_call/3

handle_call(Req::term(), From::{pid(), term()}, State::#state{agent_name = atom(), handler_module = module(), config = map(), host = string(), port = inet:port_number(), transport = tcp | tls, conn_pid = pid() | undefined, stream_ref = reference() | undefined, memory = map(), memory_table = atom() | undefined, reconnect_timer = reference() | undefined}) -> {reply, {ok, binary()} | {error, term()} | ok, #state{agent_name = atom(), handler_module = module(), config = map(), host = string(), port = inet:port_number(), transport = tcp | tls, conn_pid = pid() | undefined, stream_ref = reference() | undefined, memory = map(), memory_table = atom() | undefined, reconnect_timer = reference() | undefined}}

Handle a direct HTTP query forwarded by em_filter_http.

Delegates to the same dispatch/2 function used by the WebSocket path. The agent's handler module and memory are therefore shared between both transports — the caller's transport is invisible to the handler.

Returns {ok, Result} where Result is the JSON binary produced by handler_module:handle/2. em_filter_http decodes this before embedding it in its response body.

handle_cast/2

handle_cast(Msg::term(), State::#state{agent_name = atom(), handler_module = module(), config = map(), host = string(), port = inet:port_number(), transport = tcp | tls, conn_pid = pid() | undefined, stream_ref = reference() | undefined, memory = map(), memory_table = atom() | undefined, reconnect_timer = reference() | undefined}) -> {noreply, #state{agent_name = atom(), handler_module = module(), config = map(), host = string(), port = inet:port_number(), transport = tcp | tls, conn_pid = pid() | undefined, stream_ref = reference() | undefined, memory = map(), memory_table = atom() | undefined, reconnect_timer = reference() | undefined}}

No asynchronous casts handled.

handle_info/2

handle_info(Info, State) -> any()

Handles incoming WebSocket frames from em_disco.

Only query frames are processed — registration ack frames (registered, agent_registered) are silently ignored.

init/1

init(X1::{atom(), module(), map(), string(), inet:port_number(), tcp | tls}) -> {ok, #state{agent_name = atom(), handler_module = module(), config = map(), host = string(), port = inet:port_number(), transport = tcp | tls, conn_pid = pid() | undefined, stream_ref = reference() | undefined, memory = map(), memory_table = atom() | undefined, reconnect_timer = reference() | undefined}}

Initialises the server state and schedules the first connection attempt.

Memory is loaded from ETS if memory => ets is configured, otherwise starts as an empty map. The actual WebSocket connection is deferred to the first handle_info(connect, ...) call.

start_link/5

start_link(AgentName::atom(), HandlerModule::module(), Config::map(), X4::{string(), inet:port_number(), tcp | tls}, Index::pos_integer()) -> {ok, pid()} | {error, term()}

Starts a server linked to one specific disco node.

Index controls the registered process name: 1 → <agent>_server 2, 3, … → <agent>_server_<N>

This ensures whereis(my_agent_server) works for the common single-node case while still supporting multi-node setups.

terminate/2

terminate(Reason::term(), State::#state{agent_name = atom(), handler_module = module(), config = map(), host = string(), port = inet:port_number(), transport = tcp | tls, conn_pid = pid() | undefined, stream_ref = reference() | undefined, memory = map(), memory_table = atom() | undefined, reconnect_timer = reference() | undefined}) -> ok

Cancels the reconnect timer, closes the Gun connection, and deletes the ETS memory table if one was created.


Generated by EDoc