Behaviours: gen_server.
Authors: Steve Roques.
WebSocket Client for em_disco Connectivity
Manages a single persistent WebSocket connection to ONE em_disco node. em_filter_sup starts one server per configured disco node.
1. init/1 sends self() ! connect and returns immediately.
2. handle_info(connect, ...) opens a Gun connection, upgrades to
WebSocket and completes the 2-step handshake.
3. On connection loss, a reconnect is scheduled after
reconnect_interval_ms milliseconds (application env).
The server process never stops due to transient network failures — reconnection is handled internally. ETS memory (if configured) survives reconnects because it is owned by the server process.
em_disco requires a JWT passed as ?token=<jwt> in the WebSocket
upgrade URL. The token is read from (in order of priority):
1. jwt_token key in the agent Config map
2. jwt_token application env in the em_filter application
If no token is configured the upgrade will be rejected with 401; the server logs a warning and schedules a reconnect.
tcp — plain WebSocket (ws://) tls — TLS WebSocket (wss://)
TLS uses verify_peer with the system CA store. SNI is set to the target host so wildcard certificates are validated correctly.| code_change/3 | |
| handle_call/3 | Handle a direct HTTP query forwarded by em_filter_http. |
| handle_cast/2 | No asynchronous casts handled. |
| handle_info/2 | Handles incoming WebSocket frames from em_disco. |
| init/1 | Initialises the server state and schedules the first connection attempt. |
| start_link/5 | Starts a server linked to one specific disco node. |
| terminate/2 | Cancels the reconnect timer, closes the Gun connection, and deletes the ETS memory table if one was created. |
code_change(OldVsn, State, Extra) -> any()
handle_call(Req::term(), From::{pid(), term()}, State::#state{agent_name = atom(), handler_module = module(), config = map(), host = string(), port = inet:port_number(), transport = tcp | tls, conn_pid = pid() | undefined, stream_ref = reference() | undefined, memory = map(), memory_table = atom() | undefined, reconnect_timer = reference() | undefined}) -> {reply, {ok, binary()} | {error, term()} | ok, #state{agent_name = atom(), handler_module = module(), config = map(), host = string(), port = inet:port_number(), transport = tcp | tls, conn_pid = pid() | undefined, stream_ref = reference() | undefined, memory = map(), memory_table = atom() | undefined, reconnect_timer = reference() | undefined}}
Handle a direct HTTP query forwarded by em_filter_http.
Delegates to the same dispatch/2 function used by the WebSocket
path. The agent's handler module and memory are therefore shared
between both transports — the caller's transport is invisible to
the handler.
{ok, Result} where Result is the JSON binary produced by
handler_module:handle/2. em_filter_http decodes this before
embedding it in its response body.
handle_cast(Msg::term(), State::#state{agent_name = atom(), handler_module = module(), config = map(), host = string(), port = inet:port_number(), transport = tcp | tls, conn_pid = pid() | undefined, stream_ref = reference() | undefined, memory = map(), memory_table = atom() | undefined, reconnect_timer = reference() | undefined}) -> {noreply, #state{agent_name = atom(), handler_module = module(), config = map(), host = string(), port = inet:port_number(), transport = tcp | tls, conn_pid = pid() | undefined, stream_ref = reference() | undefined, memory = map(), memory_table = atom() | undefined, reconnect_timer = reference() | undefined}}
No asynchronous casts handled.
handle_info(Info, State) -> any()
Handles incoming WebSocket frames from em_disco.
Onlyquery frames are processed — registration ack frames
(registered, agent_registered) are silently ignored.
init(X1::{atom(), module(), map(), string(), inet:port_number(), tcp | tls}) -> {ok, #state{agent_name = atom(), handler_module = module(), config = map(), host = string(), port = inet:port_number(), transport = tcp | tls, conn_pid = pid() | undefined, stream_ref = reference() | undefined, memory = map(), memory_table = atom() | undefined, reconnect_timer = reference() | undefined}}
Initialises the server state and schedules the first connection attempt.
Memory is loaded from ETS ifmemory => ets is configured, otherwise
starts as an empty map. The actual WebSocket connection is deferred to
the first handle_info(connect, ...) call.
start_link(AgentName::atom(), HandlerModule::module(), Config::map(), X4::{string(), inet:port_number(), tcp | tls}, Index::pos_integer()) -> {ok, pid()} | {error, term()}
Starts a server linked to one specific disco node.
Index controls the registered process name:
1 → <agent>_server
2, 3, … → <agent>_server_<N>
whereis(my_agent_server) works for the common
single-node case while still supporting multi-node setups.
terminate(Reason::term(), State::#state{agent_name = atom(), handler_module = module(), config = map(), host = string(), port = inet:port_number(), transport = tcp | tls, conn_pid = pid() | undefined, stream_ref = reference() | undefined, memory = map(), memory_table = atom() | undefined, reconnect_timer = reference() | undefined}) -> ok
Cancels the reconnect timer, closes the Gun connection, and deletes the ETS memory table if one was created.
Generated by EDoc