# Reactor Module The `erlang.reactor` module provides low-level FD-based protocol handling for building custom servers. It enables Python to implement protocol logic while Erlang handles I/O scheduling via `enif_select`. ## Overview The reactor pattern separates I/O multiplexing (handled by Erlang) from protocol logic (handled by Python). This provides: - **Efficient I/O** - Erlang's `enif_select` for event notification - **Protocol flexibility** - Python implements the protocol state machine - **Zero-copy buffers** - ReactorBuffer provides zero-copy data access via buffer protocol - **Works with any fd** - TCP, UDP, Unix sockets, pipes, etc. ### Architecture ``` ┌──────────────────────────────────────────────────────────────────────┐ │ Reactor Architecture │ ├──────────────────────────────────────────────────────────────────────┤ │ │ │ Erlang (BEAM) Python │ │ ───────────── ────── │ │ │ │ ┌─────────────────────┐ ┌─────────────────────────────┐ │ │ │ py_reactor_context │ │ erlang.reactor │ │ │ │ │ │ │ │ │ │ accept() ──────────┼──fd_handoff─▶│ init_connection(fd, info) │ │ │ │ │ │ │ │ │ │ │ enif_select(READ) │ │ ▼ │ │ │ │ │ │ │ Protocol.connection_made() │ │ │ │ ▼ │ │ │ │ │ │ {select, fd, READ} │ │ │ │ │ │ │ │ │ │ │ │ │ └─────────────┼─on_read_ready│ Protocol.data_received() │ │ │ │ │ │ │ │ │ │ │ action = "write_ │◀─────────────┼───────┘ │ │ │ │ pending" │ │ │ │ │ │ │ │ │ │ │ │ │ enif_select(WRITE) │ │ │ │ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ │ │ {select, fd, WRITE}│ │ │ │ │ │ │ │ │ │ │ │ │ └─────────────┼on_write_ready│ Protocol.write_ready() │ │ │ │ │ │ │ │ │ └─────────────────────┘ └─────────────────────────────┘ │ │ │ └──────────────────────────────────────────────────────────────────────┘ ``` ## Protocol Base Class The `Protocol` class is the base for implementing custom protocols: ```python import erlang.reactor as reactor class Protocol(reactor.Protocol): """Base protocol attributes and methods.""" # Set by reactor on connection fd: int # File descriptor client_info: dict # Connection metadata from Erlang write_buffer: bytearray # Buffer for outgoing data closed: bool # Whether connection is closed ``` ### Lifecycle Methods #### `connection_made(fd, client_info)` Called when a file descriptor is handed off from Erlang. ```python def connection_made(self, fd: int, client_info: dict): """Called when fd is handed off from Erlang. Args: fd: File descriptor for the connection client_info: Dict with connection metadata - 'addr': Client IP address - 'port': Client port - 'type': Connection type (tcp, udp, unix, etc.) """ # Initialize per-connection state self.request_buffer = bytearray() ``` #### `data_received(data) -> action` Called when data has been read from the fd. The `data` argument is a `ReactorBuffer` - a bytes-like object that supports zero-copy access via the buffer protocol. ```python def data_received(self, data: bytes) -> str: """Handle received data. Args: data: A bytes-like object (ReactorBuffer) supporting: - Buffer protocol: memoryview(data) for zero-copy access - Indexing/slicing: data[0], data[0:10] - Bytes methods: data.startswith(), data.find(), data.decode() - Comparison: data == b'...' - Conversion: bytes(data) creates a copy Returns: Action string indicating what to do next """ self.request_buffer.extend(data) if self.request_complete(): self.prepare_response() return "write_pending" return "continue" # Need more data ``` #### `write_ready() -> action` Called when the fd is ready for writing. ```python def write_ready(self) -> str: """Handle write readiness. Returns: Action string indicating what to do next """ if not self.write_buffer: return "read_pending" written = self.write(bytes(self.write_buffer)) del self.write_buffer[:written] if self.write_buffer: return "continue" # More to write return "read_pending" # Done writing ``` #### `connection_lost()` Called when the connection is closed. ```python def connection_lost(self): """Called when connection closes. Override to perform cleanup. """ # Clean up resources self.cleanup() ``` ### I/O Helper Methods #### `read(size) -> bytes` Read from the file descriptor: ```python data = self.read(65536) # Read up to 64KB if not data: return "close" # EOF or error ``` #### `write(data) -> int` Write to the file descriptor: ```python written = self.write(response_bytes) del self.write_buffer[:written] # Remove written bytes ``` ## Zero-Copy ReactorBuffer The `data` argument passed to `data_received()` is a `ReactorBuffer` - a special bytes-like type that provides zero-copy access to read data. The data is read by the NIF before acquiring the GIL, and wrapped in a ReactorBuffer that exposes the memory via Python's buffer protocol. ### Benefits - **No data copying** - Data goes directly from kernel to Python without intermediate copies - **Transparent compatibility** - ReactorBuffer acts like `bytes` in all common operations - **Memory efficiency** - Large payloads don't require extra allocations ### Supported Operations ReactorBuffer supports all common bytes operations: ```python def data_received(self, data): # Buffer protocol - zero-copy access mv = memoryview(data) first_byte = mv[0] # Indexing and slicing header = data[0:4] last_byte = data[-1] # Bytes methods if data.startswith(b'GET'): method = 'GET' pos = data.find(b'\r\n') count = data.count(b'/') # String conversion text = data.decode('utf-8') # Comparison if data == b'PING': self.write_buffer.extend(b'PONG') # Convert to bytes (creates a copy) data_copy = bytes(data) # 'in' operator if b'HTTP' in data: self.handle_http() # Length size = len(data) # Extend bytearray (uses buffer protocol) self.request_buffer.extend(data) return "continue" ``` ### Performance Considerations The zero-copy benefit is in the NIF read path - data is read directly into a buffer that Python wraps without copying. This avoids the overhead of creating a Python bytes object for every read. - **NIF read path**: Data goes directly from kernel to Python without intermediate copies - **Parsing operations**: `startswith()`, `find()` etc. are optimized C implementations - **Direct memoryview access**: Use `data.memoryview()` for maximum zero-copy performance - **Creating bytes**: Call `bytes(data)` only when you need a persistent copy ```python # For maximum performance, use memoryview slicing for comparisons mv = data.memoryview() if mv[:3] == b'GET': # Process GET request ``` ## Action Return Values Protocol methods return action strings that tell Erlang what to do next: | Action | Description | Erlang Behavior | |--------|-------------|-----------------| | `"continue"` | Keep current mode | Re-register same event | | `"write_pending"` | Ready to write | Switch to write mode (`enif_select` WRITE) | | `"read_pending"` | Ready to read | Switch to read mode (`enif_select` READ) | | `"close"` | Close connection | Close fd and call `connection_lost()` | ## Factory Pattern Register a protocol factory to create protocol instances for each connection: ```python import erlang.reactor as reactor class MyProtocol(reactor.Protocol): # ... implementation # Set the factory - called for each new connection reactor.set_protocol_factory(MyProtocol) # Get the protocol instance for an fd proto = reactor.get_protocol(fd) ``` ## Complete Example: Echo Protocol Here's a complete echo server protocol: ```python import erlang.reactor as reactor class EchoProtocol(reactor.Protocol): """Simple echo protocol - sends back whatever it receives.""" def connection_made(self, fd, client_info): super().connection_made(fd, client_info) print(f"Connection from {client_info.get('addr')}:{client_info.get('port')}") def data_received(self, data): """Echo received data back to client.""" if not data: return "close" # Buffer the data for writing self.write_buffer.extend(data) return "write_pending" def write_ready(self): """Write buffered data.""" if not self.write_buffer: return "read_pending" written = self.write(bytes(self.write_buffer)) del self.write_buffer[:written] if self.write_buffer: return "continue" # More data to write return "read_pending" # Done, wait for more input def connection_lost(self): print(f"Connection closed: fd={self.fd}") # Register the factory reactor.set_protocol_factory(EchoProtocol) ``` ## Example: HTTP Protocol (Simplified) ```python import erlang.reactor as reactor class SimpleHTTPProtocol(reactor.Protocol): """Minimal HTTP/1.0 protocol.""" def __init__(self): super().__init__() self.request_buffer = bytearray() def data_received(self, data): self.request_buffer.extend(data) # Check for end of headers if b'\r\n\r\n' in self.request_buffer: self.handle_request() return "write_pending" return "continue" def handle_request(self): """Parse request and prepare response.""" request = self.request_buffer.decode('utf-8', errors='replace') first_line = request.split('\r\n')[0] method, path, _ = first_line.split(' ', 2) # Simple response body = f"Hello! You requested {path}" response = ( f"HTTP/1.0 200 OK\r\n" f"Content-Length: {len(body)}\r\n" f"Content-Type: text/plain\r\n" f"\r\n" f"{body}" ) self.write_buffer.extend(response.encode()) def write_ready(self): if not self.write_buffer: return "close" # HTTP/1.0: close after response written = self.write(bytes(self.write_buffer)) del self.write_buffer[:written] if self.write_buffer: return "continue" return "close" reactor.set_protocol_factory(SimpleHTTPProtocol) ``` ## Passing Sockets from Erlang to Python ### Method 1: Socket FD Handoff to Reactor The most efficient way is to hand off the socket's file descriptor directly: ```erlang %% Erlang: Accept and hand off to Python reactor {ok, ClientSock} = gen_tcp:accept(ListenSock), {ok, {Addr, Port}} = inet:peername(ClientSock), %% Get the raw file descriptor {ok, Fd} = inet:getfd(ClientSock), %% Hand off to Python - Erlang no longer owns this socket py_reactor_context:handoff(Fd, #{ addr => inet:ntoa(Addr), port => Port, type => tcp }). ``` ```python # Python: Protocol handles the fd import erlang.reactor as reactor class MyProtocol(reactor.Protocol): def data_received(self, data): # self.fd is the socket fd from Erlang self.write_buffer.extend(b"Got: " + data) return "write_pending" reactor.set_protocol_factory(MyProtocol) ``` ### Method 2: Pass Socket FD to asyncio For asyncio-based code, pass the fd and wrap it in Python: ```erlang %% Erlang: Get fd and pass to Python {ok, ClientSock} = gen_tcp:accept(ListenSock), {ok, Fd} = inet:getfd(ClientSock), %% Call Python with the fd Ctx = py:context(1), py:call(Ctx, my_handler, handle_connection, [Fd]). ``` ```python # Python: Wrap fd in asyncio import asyncio import socket async def handle_connection(fd: int): # Create socket from fd (Python takes ownership) sock = socket.socket(fileno=fd) sock.setblocking(False) # Use asyncio streams reader, writer = await asyncio.open_connection(sock=sock) data = await reader.read(1024) writer.write(b"Echo: " + data) await writer.drain() writer.close() await writer.wait_closed() def handle_connection_sync(fd: int): """Sync wrapper for Erlang call.""" asyncio.run(handle_connection(fd)) ``` ### Method 3: Pass Socket Object via Pickle (Not Recommended) For simple cases, you can pickle socket info, but this is less efficient: ```erlang %% Erlang: Pass connection info {ok, {Addr, Port}} = inet:peername(ClientSock), py:call(Ctx, my_handler, connect_to, [Addr, Port]). ``` ```python # Python: Create new connection (less efficient - new socket) import socket def connect_to(addr: str, port: int): sock = socket.create_connection((addr, port)) # ... use socket ``` ### Socket Ownership When passing an fd from Erlang to Python, you must decide who owns it: **Option 1: Transfer ownership to Python** Erlang gives up the fd entirely. Don't close the Erlang socket. ```erlang {ok, ClientSock} = gen_tcp:accept(ListenSock), {ok, Fd} = inet:getfd(ClientSock), py_reactor_context:handoff(Fd, #{type => tcp}). %% Don't close ClientSock - Python owns the fd now ``` **Option 2: Duplicate the fd (recommended)** Use `py:dup_fd/1` to create an independent copy. Both sides can close their own fd. ```erlang {ok, ClientSock} = gen_tcp:accept(ListenSock), {ok, Fd} = inet:getfd(ClientSock), {ok, DupFd} = py:dup_fd(Fd), py_reactor_context:handoff(DupFd, #{type => tcp}), gen_tcp:close(ClientSock). %% Safe - Python has its own fd copy ``` This is safer because: 1. **Erlang controls its socket lifecycle** - GC won't affect Python 2. **Python has its own fd** - Independent of Erlang's socket 3. **No double-close issues** - Each side manages its own fd ## Integration with Erlang ### From Erlang: Starting a Reactor Server ```erlang %% In your Erlang code -module(my_server). -export([start/1]). start(Port) -> %% Set up the Python protocol factory first Ctx = py:context(1), ok = py:exec(Ctx, <<" import erlang.reactor as reactor from my_protocols import MyProtocol reactor.set_protocol_factory(MyProtocol) ">>), %% Start accepting connections {ok, ListenSock} = gen_tcp:listen(Port, [binary, {active, false}, {reuseaddr, true}]), accept_loop(ListenSock). accept_loop(ListenSock) -> {ok, ClientSock} = gen_tcp:accept(ListenSock), {ok, {Addr, Port}} = inet:peername(ClientSock), %% Hand off to Python reactor {ok, Fd} = inet:getfd(ClientSock), py_reactor_context:handoff(Fd, #{ addr => inet:ntoa(Addr), port => Port, type => tcp }), accept_loop(ListenSock). ``` ### How FDs Are Passed from Erlang to Python 1. Erlang accepts a connection and gets the socket fd 2. Erlang calls `py_reactor_context:handoff(Fd, ClientInfo)` 3. The NIF calls Python's `reactor.init_connection(fd, client_info)` 4. Protocol factory creates a new Protocol instance 5. `enif_select` is registered for read events on the fd 6. When events occur, Python callbacks handle the protocol logic ## Module API Reference ### `set_protocol_factory(factory)` Set the factory function for creating protocols. ```python reactor.set_protocol_factory(MyProtocol) # or with a custom factory reactor.set_protocol_factory(lambda: MyProtocol(custom_arg)) ``` ### `get_protocol(fd)` Get the protocol instance for a file descriptor. ```python proto = reactor.get_protocol(fd) if proto: print(f"Protocol state: {proto.client_info}") ``` ### `init_connection(fd, client_info)` Internal - called by NIF on fd handoff. ### `on_read_ready(fd, data)` Internal - called by NIF when fd is readable. The `data` argument is a `ReactorBuffer` containing the bytes read from the fd. ### `on_write_ready(fd)` Internal - called by NIF when fd is writable. ### `close_connection(fd)` Internal - called by NIF to close connection. ## Subinterpreter Support The reactor supports isolated subinterpreters via `py_reactor_context`. Each subinterpreter has its own reactor module cache, ensuring protocol factories are isolated between contexts. ```erlang %% Create context with subinterpreter mode {ok, Ctx1} = py_reactor_context:start_link(1, subinterp, #{ setup_code => <<" import erlang.reactor as reactor reactor.set_protocol_factory(EchoProtocol) ">> }), %% Create another context with different protocol {ok, Ctx2} = py_reactor_context:start_link(2, subinterp, #{ setup_code => <<" import erlang.reactor as reactor reactor.set_protocol_factory(HttpProtocol) ">> }). ``` Each context runs in its own subinterpreter with isolated protocol factory and connection state. This enables running multiple protocol handlers in the same BEAM VM without interference. ## See Also - [Asyncio](asyncio.md) - Higher-level asyncio event loop for Python - [Security](security.md) - Security sandbox documentation - [Getting Started](getting-started.md) - Basic usage guide