# Erlang-native asyncio Event Loop This guide covers the Erlang-native asyncio event loop implementation that provides high-performance async I/O for Python applications running within erlang_python. ## Overview The `ErlangEventLoop` is a custom asyncio event loop backed by Erlang's scheduler using `enif_select` for I/O multiplexing. This replaces Python's polling-based event loop with true event-driven callbacks integrated into the BEAM VM. All asyncio functionality is available through the unified `erlang` module: ```python import erlang # Preferred way to run async code erlang.run(main()) ``` ### Key Benefits - **Sub-millisecond latency** - Events are delivered immediately via Erlang messages instead of polling every 10ms - **Zero CPU usage when idle** - No busy-waiting or polling overhead - **Full GIL release during waits** - Python's Global Interpreter Lock is released while waiting for events - **Native Erlang scheduler integration** - I/O events are handled by BEAM's scheduler ### Architecture ``` ┌──────────────────────────────────────────────────────────────────────────────┐ │ ErlangEventLoop Architecture │ ├──────────────────────────────────────────────────────────────────────────────┤ │ │ │ Python (asyncio) Erlang (BEAM) │ │ ──────────────── ───────────── │ │ │ │ ┌──────────────────┐ ┌────────────────────────────────────┐ │ │ │ ErlangEventLoop │ │ py_event_worker │ │ │ │ │ │ │ │ │ │ call_later() ──┼─{timer,ms,id}─▶│ erlang:send_after(ms, self, {}) │ │ │ │ call_at() │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ add_reader() ──┼──{add_fd,fd}──▶│ enif_select(fd, READ) │ │ │ │ add_writer() │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ │◀──{fd_ready}───│ handle_info({select, ...}) │ │ │ │ │◀──{timeout}────│ handle_info({timeout, ...}) │ │ │ │ │ │ │ │ │ │ _run_once() │ └────────────────────────────────────┘ │ │ │ │ │ │ │ │ ▼ │ ┌────────────────────────────────────┐ │ │ │ process pending │ │ │ │ │ │ callbacks │ │ │ │ │ └──────────────────┘ │ │ │ │ │ │ │ │ ┌──────────────────┐ └────────────────────────────────────┘ │ │ │ asyncio (via │ │ │ │ erlang.run()) │ ┌────────────────────────────────────┐ │ │ │ sleep() │ │ asyncio.sleep() uses call_later() │ │ │ │ gather() │─call_later()──▶│ which triggers erlang:send_after │ │ │ │ wait_for() │ │ │ │ │ │ create_task() │ └────────────────────────────────────┘ │ │ └──────────────────┘ │ │ │ └──────────────────────────────────────────────────────────────────────────────┘ ``` **Components:** | Component | Role | |-----------|------| | `ErlangEventLoop` | Python asyncio event loop using Erlang for I/O and timers | | `py_event_worker` | Erlang gen_server handling FDs, timers, and task processing | | `erlang.run()` | Entry point to run asyncio code with the Erlang event loop | ## Usage Patterns ### Pattern 1: `erlang.run()` (Recommended) The preferred way to run async code, matching uvloop's API: ```python import erlang async def main(): await asyncio.sleep(1.0) # Uses erlang:send_after internally print("Done!") # Simple and clean erlang.run(main()) ``` ### Pattern 2: With `asyncio.Runner` (Python 3.11+) ```python import asyncio import erlang with asyncio.Runner(loop_factory=erlang.new_event_loop) as runner: runner.run(main()) ``` ### Pattern 3: `erlang.install()` (Deprecated in Python 3.12+) This pattern installs the ErlangEventLoopPolicy globally. It's deprecated in Python 3.12+ because `asyncio.run()` no longer respects global policies: ```python import asyncio import erlang erlang.install() # Deprecated in 3.12+, use erlang.run() instead asyncio.run(main()) ``` ### Pattern 4: Manual Loop Management For cases where you need direct control: ```python import asyncio import erlang loop = erlang.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(main()) finally: loop.close() ``` ## Execution Mode Detection The `erlang` module can detect the Python execution mode: ```python from erlang import detect_mode, ExecutionMode mode = detect_mode() if mode == ExecutionMode.FREE_THREADED: print("Running in free-threaded mode (no GIL)") elif mode == ExecutionMode.SUBINTERP: print("Running in subinterpreter with per-interpreter GIL") else: print("Running with shared GIL") ``` **ExecutionMode values:** - `FREE_THREADED` - Python 3.13+ with `Py_GIL_DISABLED` (no GIL) - `SUBINTERP` - Python 3.12+ running in a subinterpreter - `SHARED_GIL` - Traditional Python with shared GIL ## TCP Support ### Client Connections Use `create_connection()` to establish TCP client connections: ```python import asyncio class EchoClientProtocol(asyncio.Protocol): def __init__(self, message, on_con_lost): self.message = message self.on_con_lost = on_con_lost def connection_made(self, transport): transport.write(self.message.encode()) def data_received(self, data): print(f'Received: {data.decode()}') def connection_lost(self, exc): self.on_con_lost.set_result(True) async def main(): loop = asyncio.get_running_loop() on_con_lost = loop.create_future() transport, protocol = await loop.create_connection( lambda: EchoClientProtocol('Hello, World!', on_con_lost), host='127.0.0.1', port=8888 ) try: await on_con_lost finally: transport.close() ``` ### TCP Servers Use `create_server()` to create TCP servers: ```python import asyncio class EchoServerProtocol(asyncio.Protocol): def connection_made(self, transport): peername = transport.get_extra_info('peername') print(f'Connection from {peername}') self.transport = transport def data_received(self, data): message = data.decode() print(f'Received: {message}') # Echo back self.transport.write(data) def connection_lost(self, exc): print('Connection closed') async def main(): loop = asyncio.get_running_loop() server = await loop.create_server( EchoServerProtocol, host='127.0.0.1', port=8888, reuse_address=True ) async with server: await server.serve_forever() ``` ### Transport Class The `_ErlangSocketTransport` class implements the asyncio Transport interface with these features: - Non-blocking I/O using Erlang's `enif_select` - Write buffering with automatic drain - Connection lifecycle management (`connection_made`, `connection_lost`, `eof_received`) - Extra info access via `get_extra_info()` (socket, sockname, peername) ## UDP/Datagram Support The event loop provides full UDP/datagram support through `create_datagram_endpoint()`. ### Creating UDP Endpoints ```python import asyncio class EchoUDPProtocol(asyncio.DatagramProtocol): def connection_made(self, transport): self.transport = transport print(f'Listening on {transport.get_extra_info("sockname")}') def datagram_received(self, data, addr): message = data.decode() print(f'Received {message!r} from {addr}') # Echo back to sender self.transport.sendto(data, addr) def error_received(self, exc): print(f'Error received: {exc}') def connection_lost(self, exc): print('Connection closed') async def main(): loop = asyncio.get_running_loop() # Create UDP server transport, protocol = await loop.create_datagram_endpoint( EchoUDPProtocol, local_addr=('127.0.0.1', 9999) ) try: await asyncio.sleep(3600) # Run for an hour finally: transport.close() ``` ### Parameters The `create_datagram_endpoint()` method accepts these parameters: | Parameter | Type | Description | |-----------|------|-------------| | `protocol_factory` | callable | Factory function returning a `DatagramProtocol` | | `local_addr` | tuple | Local `(host, port)` to bind to | | `remote_addr` | tuple | Remote `(host, port)` to connect to (optional) | | `family` | int | Socket family (`AF_INET` or `AF_INET6`) | | `proto` | int | Socket protocol number | | `flags` | int | `getaddrinfo` flags | | `reuse_address` | bool | Allow reuse of local address (`SO_REUSEADDR`) | | `reuse_port` | bool | Allow reuse of local port (`SO_REUSEPORT`) | | `allow_broadcast` | bool | Allow sending to broadcast addresses (`SO_BROADCAST`) | | `sock` | socket | Pre-existing socket to use (overrides other options) | ### DatagramProtocol Callbacks Implement these callbacks in your `DatagramProtocol`: ```python class MyDatagramProtocol(asyncio.DatagramProtocol): def connection_made(self, transport): """Called when the endpoint is ready.""" self.transport = transport def datagram_received(self, data, addr): """Called when a datagram is received. Args: data: The received bytes addr: The sender's (host, port) tuple """ pass def error_received(self, exc): """Called when a send or receive operation fails. Args: exc: The exception that occurred """ pass def connection_lost(self, exc): """Called when the transport is closed. Args: exc: Exception if abnormal close, None otherwise """ pass ``` ### Connected vs Unconnected UDP **Unconnected UDP** (default): Each datagram can be sent to any destination: ```python # Server can send to any client transport, protocol = await loop.create_datagram_endpoint( MyProtocol, local_addr=('0.0.0.0', 9999) ) # Send to different destinations transport.sendto(b'Hello', ('192.168.1.100', 5000)) transport.sendto(b'World', ('192.168.1.101', 5000)) ``` **Connected UDP**: The endpoint is bound to a specific remote address: ```python # Client connected to specific server transport, protocol = await loop.create_datagram_endpoint( MyProtocol, remote_addr=('127.0.0.1', 9999) ) # Can use sendto without address transport.sendto(b'Hello') # Goes to connected address ``` ### Example: UDP Echo Server and Client **Server:** ```python import asyncio from erlang import ErlangEventLoop class EchoServerProtocol(asyncio.DatagramProtocol): def connection_made(self, transport): self.transport = transport sockname = transport.get_extra_info('sockname') print(f'UDP Echo Server listening on {sockname}') def datagram_received(self, data, addr): print(f'Received {len(data)} bytes from {addr}') # Echo back self.transport.sendto(data, addr) async def run_server(): loop = asyncio.get_running_loop() transport, _ = await loop.create_datagram_endpoint( EchoServerProtocol, local_addr=('127.0.0.1', 9999) ) try: await asyncio.sleep(3600) finally: transport.close() asyncio.run(run_server()) ``` **Client:** ```python import asyncio class EchoClientProtocol(asyncio.DatagramProtocol): def __init__(self, message, on_response): self.message = message self.on_response = on_response self.transport = None def connection_made(self, transport): self.transport = transport print(f'Sending: {self.message}') transport.sendto(self.message.encode()) def datagram_received(self, data, addr): print(f'Received: {data.decode()} from {addr}') self.on_response.set_result(data) def error_received(self, exc): print(f'Error: {exc}') async def run_client(): loop = asyncio.get_running_loop() on_response = loop.create_future() transport, _ = await loop.create_datagram_endpoint( lambda: EchoClientProtocol('Hello UDP!', on_response), remote_addr=('127.0.0.1', 9999) ) try: response = await asyncio.wait_for(on_response, timeout=5.0) print(f'Echo response: {response.decode()}') finally: transport.close() asyncio.run(run_client()) ``` ### Broadcast UDP Enable broadcast for sending to broadcast addresses: ```python transport, protocol = await loop.create_datagram_endpoint( MyProtocol, local_addr=('0.0.0.0', 0), allow_broadcast=True ) # Send to broadcast address transport.sendto(b'Broadcast message', ('255.255.255.255', 9999)) ``` ## Performance The event loop includes several optimizations for high-throughput applications. ### Built-in Optimizations | Optimization | Description | Impact | |-------------|-------------|--------| | **Cached function lookups** | `ast.literal_eval` cached at module init | Avoids import per callback | | **O(1) timer cancellation** | Handle-to-callback reverse map | Was O(n) iteration | | **O(1) duplicate detection** | Hash set for pending events | Was O(n) linear scan | | **Lock-free event consumption** | Detach queue under lock, process outside | Reduced contention | | **Object pooling** | Reuse event structures via freelist | Fewer allocations | | **Deque method caching** | Pre-bound `append`/`popleft` methods | Avoids attribute lookup | ### Performance Build For maximum performance, rebuild with the `PERF_BUILD` cmake option: ```bash # Clean build with performance optimizations rm -rf _build/cmake mkdir -p _build/cmake && cd _build/cmake cmake ../../c_src -DPERF_BUILD=ON cmake --build . ``` This enables: - `-O3` optimization level - Link-Time Optimization (LTO) - `-march=native` (CPU-specific optimizations) - `-ffast-math` and `-funroll-loops` **Note:** Performance builds are not portable across different CPU architectures due to `-march=native`. ### Benchmarking Run the event loop benchmarks to measure performance: ```bash python3 examples/benchmark_event_loop.py ``` Example output: ``` Timer throughput: 150,000 timers/sec Callback dispatch: 200,000 callbacks/sec I/O ready detection: <1ms latency ``` ## Low-level APIs The event loop is backed by NIFs (Native Implemented Functions) that provide direct access to Erlang's event system. These are primarily for internal use and testing. ### Event Loop NIFs From Erlang, you can access the low-level event loop operations: ```erlang %% Create a new event loop instance {ok, LoopRef} = py_nif:event_loop_new(). %% Add a reader callback for a file descriptor {ok, FdRef} = py_nif:add_reader(LoopRef, Fd, CallbackId). %% Remove a reader ok = py_nif:remove_reader(LoopRef, FdRef). %% Poll for events (returns number of events ready) NumEvents = py_nif:poll_events(LoopRef, TimeoutMs). %% Get pending callback events Pending = py_nif:get_pending(LoopRef). %% Returns: [{CallbackId, read|write}, ...] %% Destroy the event loop py_nif:event_loop_destroy(LoopRef). ``` ### UDP Socket NIFs (for testing) ```erlang %% Create a UDP socket bound to a port {ok, {Fd, Port}} = py_nif:create_test_udp_socket(0). % 0 = ephemeral port %% Send data via UDP ok = py_nif:sendto_test_udp(Fd, <<"hello">>, <<"127.0.0.1">>, 9999). %% Receive data {ok, {Data, {Host, Port}}} = py_nif:recvfrom_test_udp(Fd, MaxBytes). %% Set broadcast option ok = py_nif:set_udp_broadcast(Fd, true). %% Close the socket py_nif:close_test_fd(Fd). ``` ## Integration with Erlang The event loop integrates with Erlang's message passing system through a worker process: ```erlang %% Start the event worker {ok, LoopRef} = py_nif:event_loop_new(), {ok, WorkerPid} = py_event_worker:start_link(<<"worker">>, LoopRef), ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid). ``` Events are delivered as Erlang messages, enabling the event loop to participate in BEAM's supervision trees and distributed computing capabilities. ## Event Loop Architecture Each `ErlangEventLoop` instance has its own isolated capsule with a dedicated pending queue. This ensures that timers and FD events are properly routed to the correct loop instance. ### Multi-threaded Example ```python from erlang import ErlangEventLoop import threading def run_tasks(loop_id): """Each thread gets its own event loop.""" loop = ErlangEventLoop() results = [] def callback(): results.append(loop_id) # Schedule callbacks - isolated to this loop loop.call_soon(callback) loop.call_later(0.01, callback) # Process events import time deadline = time.time() + 0.5 while time.time() < deadline and len(results) < 2: loop._run_once() time.sleep(0.01) loop.close() return results # Run in separate threads t1 = threading.Thread(target=run_tasks, args=('loop_a',)) t2 = threading.Thread(target=run_tasks, args=('loop_b',)) t1.start() t2.start() t1.join() t2.join() # Each thread only sees its own callbacks ``` ### Internal Architecture Each event loop has an associated worker process that handles timer and FD events: ``` ┌─────────────────────────────────────────────────────────────────┐ │ py_event_worker │ │ │ │ Receives: │ │ - Timer expirations from erlang:send_after │ │ - FD ready events from enif_select │ │ - task_ready messages for processing tasks │ │ │ │ Dispatches events to the loop's pending queue │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌───────────┐ │ Loop │ │ pending │ └───────────┘ ``` Each loop has its own pending queue, ensuring callbacks are processed only by the loop that scheduled them. The worker dispatches timer, FD events, and tasks to the correct loop. ## Erlang Timer Integration When using `erlang.run()` to execute asyncio code, standard asyncio functions like `asyncio.sleep()` are automatically backed by Erlang's native timer system for maximum performance. ### Overview Unlike Python's standard polling-based event loop, the Erlang event loop uses `erlang:send_after/3` for timers and integrates directly with the BEAM scheduler. This eliminates Python event loop overhead (~0.5-1ms per operation) and provides more precise timing. ### Architecture ``` ┌─────────────────────────────────────────────────────────────────────────┐ │ asyncio.sleep() via ErlangEventLoop │ │ │ │ Python Erlang │ │ ────── ────── │ │ │ │ ┌─────────────────┐ ┌─────────────────────────────────┐ │ │ │ asyncio.sleep │ │ py_event_worker │ │ │ │ (0.1) │ │ │ │ │ └────────┬────────┘ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ ┌─────────────────┐ │ │ │ │ │ ErlangEventLoop │──{timer,100, │ erlang:send_after(100ms) │ │ │ │ call_later() │ Id}─────▶│ │ │ │ │ └────────┬────────┘ │ ▼ │ │ │ │ │ handle_info({timeout, ...}) │ │ │ ┌────────▼────────┐ │ │ │ │ │ │ Yield to event │ │ ▼ │ │ │ │ loop (dirty │ │ py_nif:dispatch_timer() │ │ │ │ scheduler │◀─────────────│ │ │ │ │ │ released) │ callback └─────────┼───────────────────────┘ │ │ └────────┬────────┘ │ │ │ │ │ │ │ ▼ ▼ │ │ ┌─────────────────┐ ┌─────────────────────────────────┐ │ │ │ Resume after │ │ Timer callback dispatched to │ │ │ │ timer fires │ │ Python pending queue │ │ │ └─────────────────┘ └─────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┘ ``` **Key features:** - **Dirty scheduler released during sleep** - Python yields to event loop, freeing the dirty NIF thread - **BEAM scheduler integration** - Uses Erlang's native timer system - **Zero CPU usage** - No polling, event-driven callback - **Sub-millisecond precision** - Timers managed by BEAM scheduler ### Basic Usage ```python import erlang import asyncio async def my_handler(): # Sleep using Erlang's timer system await asyncio.sleep(0.1) # 100ms - uses erlang:send_after internally return "done" # Run a coroutine with Erlang event loop result = erlang.run(my_handler()) ``` ### API Reference When using `erlang.run()` or the Erlang event loop, all standard asyncio functions work seamlessly with Erlang's backend. #### erlang.sleep(seconds) Sleep for the specified duration. Works in both async and sync contexts. ```python import erlang # Async context - yields to event loop async def async_handler(): await erlang.sleep(0.1) # Uses asyncio.sleep() internally return "done" # Sync context - blocks Python, releases dirty scheduler def sync_handler(): erlang.sleep(0.1) # Suspends Erlang process via receive/after return "done" ``` **Behavior by Context:** | Context | Mechanism | Effect | |---------|-----------|--------| | Async (`await erlang.sleep()`) | `asyncio.sleep()` via `call_later()` | Yields to event loop, dirty scheduler released | | Sync (`erlang.sleep()`) | `erlang.call('_py_sleep')` with `receive/after` | Blocks Python, Erlang process suspends, dirty scheduler released | Both modes allow other Erlang processes and Python contexts to run during the sleep. #### asyncio.sleep(delay) Sleep for the specified delay. Uses Erlang's `erlang:send_after/3` internally. ```python import erlang import asyncio async def example(): # Simple sleep - uses Erlang timer system await asyncio.sleep(0.05) # 50ms erlang.run(example()) ``` #### erlang.run(coro) Run a coroutine to completion using an ErlangEventLoop. ```python import erlang import asyncio async def main(): await asyncio.sleep(0.01) return 42 result = erlang.run(main()) assert result == 42 ``` #### asyncio.gather(*coros, return_exceptions=False) Run coroutines concurrently and gather results. ```python import erlang import asyncio async def task(n): await asyncio.sleep(0.01) return n * 2 async def main(): results = await asyncio.gather(task(1), task(2), task(3)) assert results == [2, 4, 6] erlang.run(main()) ``` #### asyncio.wait_for(coro, timeout) Wait for a coroutine with a timeout. ```python import erlang import asyncio async def fast_task(): await asyncio.sleep(0.01) return 'done' async def main(): try: result = await asyncio.wait_for(fast_task(), timeout=1.0) except asyncio.TimeoutError: print("Task timed out") erlang.run(main()) ``` #### asyncio.create_task(coro, *, name=None) Create a task to run a coroutine in the background. ```python import erlang import asyncio async def background_work(): await asyncio.sleep(0.1) return 'background_done' async def main(): task = asyncio.create_task(background_work()) # Do other work while task runs await asyncio.sleep(0.05) # Wait for task to complete result = await task assert result == 'background_done' erlang.run(main()) ``` #### erlang.spawn_task(coro, *, name=None) Spawn an async task from both sync and async contexts. This is useful for fire-and-forget background work from synchronous Python code called by Erlang. ```python import erlang # From sync code called by Erlang def handle_request(data): # This works even though there's no running event loop erlang.spawn_task(process_async(data)) return 'ok' # From async code async def handler(): # Also works in async context erlang.spawn_task(background_work()) await other_work() async def process_async(data): await asyncio.sleep(0.1) # Do async processing... async def background_work(): await asyncio.sleep(0.1) # Do background work... ``` **Key features:** - Works in sync context where `asyncio.get_running_loop()` would fail - Returns `asyncio.Task` for optional await/cancel - Automatically wakes up the event loop to ensure the task runs promptly - Works with both ErlangEventLoop and standard asyncio loops #### asyncio.wait(fs, *, timeout=None, return_when=ALL_COMPLETED) Wait for multiple futures/tasks. ```python import erlang import asyncio async def main(): tasks = [ asyncio.create_task(asyncio.sleep(0.01)) for i in range(3) ] done, pending = await asyncio.wait( tasks, return_when=asyncio.ALL_COMPLETED ) assert len(done) == 3 assert len(pending) == 0 erlang.run(main()) ``` #### Event Loop Functions ```python import erlang import asyncio # Create a new Erlang-backed event loop loop = erlang.new_event_loop() # Set the current event loop asyncio.set_event_loop(loop) # Get the running loop (raises RuntimeError if none) loop = asyncio.get_running_loop() ``` #### Context Manager for Timeouts ```python import erlang import asyncio async def main(): async with asyncio.timeout(1.0): await slow_operation() # Raises TimeoutError if > 1s erlang.run(main()) ``` ### Performance Comparison | Operation | Standard asyncio | Erlang Event Loop | Improvement | |-----------|------------------|-------------------|-------------| | sleep(1ms) | ~1.5ms | ~1.1ms | ~27% faster | | Event loop overhead | ~0.5-1ms | ~0 | Erlang scheduler | | Timer precision | 10ms polling | Sub-ms | BEAM scheduler | | Idle CPU | Polling | Zero | Event-driven | ### When to Use Erlang Event Loop **Use `erlang.run()` when:** - You need precise sub-millisecond timing - Your app makes many small sleep calls - You want to eliminate Python event loop overhead - Your app is running inside erlang_python **Use standard `asyncio.run()` when:** - You're running outside the Erlang VM - Testing Python code in isolation ## Async Worker Backend (Internal) The `py:async_call/3,4` and `py:await/1,2` APIs use an event-driven backend based on `py_event_loop`. ### Architecture ``` ┌─────────────┐ ┌─────────────────┐ ┌──────────────────────┐ │ Erlang │ │ C NIF │ │ py_event_loop │ │ py:async_ │ │ (no thread) │ │ (Erlang process) │ │ call() │ │ │ │ │ └──────┬──────┘ └────────┬────────┘ └──────────┬───────────┘ │ │ │ │ 1. Message to │ │ │ event_loop │ │ │─────────────────────┼────────────────────────>│ │ │ │ │ 2. Return Ref │ │ │<────────────────────┼─────────────────────────│ │ │ │ │ │ enif_select (wait) │ │ │ ┌───────────────────┐ │ │ │ │ Run Python │ │ │ │ │ erlang.send(pid, │ │ │ │ │ result) │ │ │ │ └───────────────────┘ │ │ │ │ │ 3. {async_result} │ │ │<──────────────────────────────────────────────│ │ (direct erlang.send from Python) │ │ │ │ ``` ### Key Components | Component | Role | |-----------|------| | `py_event_loop_pool` | Pool manager for event loop-based async execution | | `run_async/2` (internal) | Submit coroutine to event loop | | `_run_and_send` | Python wrapper that sends result via `erlang.send()` | | `nif_event_loop_run_async` | NIF for direct coroutine submission | ### Performance Benefits | Aspect | Previous (pthread) | Current (event_loop) | |--------|-------------------|---------------------| | Latency | ~10-20ms polling | <1ms (enif_select) | | CPU idle | 100 wakeups/sec | Zero | | Threads | N pthreads | 0 extra threads | | GIL | Acquire/release in thread | Already held in callback | | Shutdown | pthread_join (blocking) | Clean Erlang messages | The event-driven model eliminates the polling overhead of the previous pthread+usleep implementation, resulting in significantly lower latency for async operations. ## Erlang Callbacks from Python Python code can call registered Erlang functions using `erlang.call()`. This enables Python handlers to leverage Erlang's concurrency and I/O capabilities. ### erlang.call() - Blocking Callbacks `erlang.call(name, *args)` calls a registered Erlang function and blocks until it returns. ```python import erlang def handler(): # Call Erlang function - blocks until complete result = erlang.call('my_callback', arg1, arg2) return process(result) ``` **Behavior:** - Blocks the current Python execution until the Erlang callback completes - Code executes exactly once (no replay) - The callback can release the dirty scheduler by using Erlang's `receive` (e.g., `erlang.sleep()`, `channel.receive()`) - Quick callbacks hold the dirty scheduler; callbacks that wait via `receive` release it ### Explicit Scheduling API For long-running operations or when you need to release the dirty scheduler, use the explicit scheduling functions. These return `ScheduleMarker` objects that **must be returned from your handler** to take effect. #### erlang.schedule(callback_name, *args) Release the dirty scheduler and continue via an Erlang callback. ```python import erlang # Register callback in Erlang: # py_callback:register(<<"compute">>, fun([X]) -> X * 2 end). def handler(x): # Returns ScheduleMarker - MUST be returned from handler return erlang.schedule('compute', x) # Nothing after this executes - Erlang callback continues ``` The result is transparent to the caller: ```erlang %% Caller just gets the callback result {ok, 10} = py:call('__main__', 'handler', [5]). ``` #### erlang.schedule_py(module, func, args=None, kwargs=None) Release the dirty scheduler and continue by calling a Python function. ```python import erlang def compute(x, multiplier=2): return x * multiplier def handler(x): # Schedule Python function - releases dirty scheduler return erlang.schedule_py('__main__', 'compute', [x], {'multiplier': 3}) ``` This is useful for: - Breaking up long computations - Allowing other Erlang processes to run - Cooperative multitasking #### erlang.schedule_inline(module, func, args=None, kwargs=None) Release the dirty scheduler and continue by calling a Python function via `enif_schedule_nif()` - bypassing Erlang messaging entirely. ```python import erlang def process_chunk(data, offset=0, results=None): """Process data in chunks with inline continuations.""" if results is None: results = [] chunk_end = min(offset + 100, len(data)) for i in range(offset, chunk_end): results.append(transform(data[i])) if chunk_end < len(data): # Continue inline - no Erlang messaging overhead return erlang.schedule_inline( '__main__', 'process_chunk', args=[data, chunk_end, results] ) return results ``` **When to use `schedule_inline` vs `schedule_py`:** | Aspect | `schedule_inline` | `schedule_py` | |--------|-------------------|---------------| | Flow | Python -> NIF -> enif_schedule_nif -> Python | Python -> NIF -> Erlang message -> Python | | Speed | ~3x faster for tight loops | Slower due to messaging | | Use case | Pure Python chains, no Erlang interaction | When you need Erlang messaging between steps | | Overhead | Minimal (direct NIF continuation) | Higher (gen_server call) | **Important:** `schedule_inline` captures the caller's globals/locals, ensuring correct namespace resolution even with subinterpreters. #### erlang.consume_time_slice(percent) Check if the NIF time slice is exhausted. Returns `True` if you should yield, `False` if more time remains. ```python import erlang def long_computation(items, start_idx=0): results = [] for i in range(start_idx, len(items)): results.append(process(items[i])) # Check if we should yield (1% of time slice per iteration) if erlang.consume_time_slice(1): # Time slice exhausted - save progress and reschedule return erlang.schedule_py( '__main__', 'long_computation', [items], {'start_idx': i + 1} ) return results ``` **Parameters:** - `percent` (1-100): How much of the time slice was consumed by recent work **Returns:** - `True`: Time slice exhausted, you should yield - `False`: More time remains, continue processing ### When to Use Each Pattern | Pattern | Use When | Dirty Scheduler | |---------|----------|-----------------| | `erlang.call()` | Quick operations or callbacks that use `receive` | Held (unless callback suspends via `receive`) | | `erlang.schedule()` | Need to call Erlang callback and always release scheduler | Released | | `erlang.schedule_py()` | Long Python computation, need Erlang interaction between steps | Released | | `erlang.schedule_inline()` | Tight Python loops, no Erlang interaction needed (~3x faster) | Released | | `consume_time_slice()` | Fine-grained control over yielding | N/A (checks time slice) | ### Example: Cooperative Long-Running Task ```python import erlang def process_batch(items, batch_size=100, offset=0): """Process items in batches, yielding between batches.""" end = min(offset + batch_size, len(items)) # Process this batch for i in range(offset, end): expensive_operation(items[i]) if end < len(items): # More work to do - yield and continue return erlang.schedule_py( '__main__', 'process_batch', [items], {'batch_size': batch_size, 'offset': end} ) return 'done' ``` ### Important Notes 1. **Must return the marker**: `schedule()` and `schedule_py()` return `ScheduleMarker` objects that must be returned from your handler function. Calling them without returning has no effect: ```python def wrong(): erlang.schedule('callback', arg) # No effect! return "oops" # This is returned instead def correct(): return erlang.schedule('callback', arg) # Works ``` 2. **Cannot be nested**: The schedule marker must be the direct return value. You cannot return it from a nested function: ```python def outer(): def inner(): return erlang.schedule('callback', arg) return inner() # Works - marker propagates up def broken(): def inner(): erlang.schedule('callback', arg) # Wrong - not returned inner() return "oops" ``` ## Limitations ### Subprocess Operations Not Supported The `ErlangEventLoop` does not support subprocess operations: ```python # These will raise NotImplementedError: loop.subprocess_shell(...) loop.subprocess_exec(...) # asyncio.create_subprocess_* will also fail await asyncio.create_subprocess_shell(...) await asyncio.create_subprocess_exec(...) ``` **Why?** Subprocess operations use `fork()` which would corrupt the Erlang VM. See [Security](security.md) for details. **Alternative:** Use Erlang ports (`open_port/2`) for subprocess management. You can register an Erlang function that runs shell commands and call it from Python via `erlang.call()`. ### Signal Handling Not Supported The `ErlangEventLoop` does not support signal handlers: ```python # These will raise NotImplementedError: loop.add_signal_handler(signal.SIGTERM, handler) loop.remove_signal_handler(signal.SIGTERM) ``` **Why?** Signal handling should be done at the Erlang VM level. The BEAM has its own signal handling infrastructure that's integrated with supervisors and the OTP design patterns. **Alternative:** Handle signals in Erlang using the `kernel` application's signal handling or write a port program that forwards signals to Erlang processes. ## Protocol-Based I/O For building custom servers with low-level protocol handling, see the [Reactor](reactor.md) module. The reactor provides FD-based protocol handling where Erlang manages I/O scheduling via `enif_select` and Python implements protocol logic. ## Async Task API (Erlang) The `py_event_loop` module provides a high-level API for submitting async Python tasks from Erlang. This API is inspired by uvloop and uses a thread-safe task queue, allowing task submission from any dirty scheduler without blocking. ### Architecture ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ Async Task Submission │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ Erlang Process C NIF Layer py_event_worker │ │ ─────────────── ───────────── ───────────────── │ │ │ │ py_event_loop: nif_submit_task handle_info(task_ready) │ │ create_task(M,F,A) │ │ │ │ │ │ Thread-safe enqueue │ │ │ │──────────────────▶ (pthread_mutex) │ │ │ │ │ │ │ │ │ │ enif_send(task_ready)──▶ │ │ │ │ │ │ │ │ │ │ py_nif:process_ready │ │ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ │ Run Python coro │ │ │ │ │ │ │ │ │◀─────────────────────────────────────────────────┘ │ │ │ {async_result, Ref, {ok, Result}} │ │ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘ ``` **Key Features:** - Thread-safe submission from any dirty scheduler via `enif_send` - Non-blocking task creation - Message-based result delivery - Fire-and-forget support ### API Reference #### py_event_loop:run/3,4 Blocking execution of an async Python function. Submits the task and waits for the result. ```erlang %% Basic usage {ok, Result} = py_event_loop:run(my_module, my_async_func, [arg1, arg2]). %% With options (timeout, kwargs) {ok, Result} = py_event_loop:run(aiohttp, get, [Url], #{ timeout => 10000, kwargs => #{headers => #{}} }). ``` **Parameters:** - `Module` - Python module name (atom or binary) - `Func` - Python function name (atom or binary) - `Args` - List of positional arguments - `Opts` - Options map (optional): - `timeout` - Timeout in milliseconds (default: 5000) - `kwargs` - Keyword arguments map (default: #{}) **Returns:** - `{ok, Result}` - Task completed successfully - `{error, Reason}` - Task failed or timed out #### py_event_loop:create_task/3,4 Non-blocking task submission. Returns immediately with a reference for awaiting the result later. ```erlang %% Submit task Ref = py_event_loop:create_task(my_module, my_async_func, [arg1]). %% Do other work while task runs... do_other_work(), %% Await result when needed {ok, Result} = py_event_loop:await(Ref). ``` **Parameters:** - `Module` - Python module name (atom or binary) - `Func` - Python function name (atom or binary) - `Args` - List of positional arguments - `Kwargs` - Keyword arguments map (optional, default: #{}) **Returns:** - `reference()` - Task reference for awaiting #### py_event_loop:await/1,2 Wait for an async task result. ```erlang %% Default timeout (5 seconds) {ok, Result} = py_event_loop:await(Ref). %% Custom timeout {ok, Result} = py_event_loop:await(Ref, 10000). %% Infinite timeout {ok, Result} = py_event_loop:await(Ref, infinity). ``` **Parameters:** - `Ref` - Task reference from `create_task` - `Timeout` - Timeout in milliseconds or `infinity` (optional, default: 5000) **Returns:** - `{ok, Result}` - Task completed successfully - `{error, Reason}` - Task failed with error - `{error, timeout}` - Timeout waiting for result #### py_event_loop:spawn_task/3,4 Fire-and-forget task execution. Submits the task but does not wait for or return the result. ```erlang %% Background logging ok = py_event_loop:spawn_task(logger, log_event, [EventData]). %% With kwargs ok = py_event_loop:spawn_task(metrics, record, [Name, Value], #{tags => Tags}). ``` **Parameters:** - `Module` - Python module name (atom or binary) - `Func` - Python function name (atom or binary) - `Args` - List of positional arguments - `Kwargs` - Keyword arguments map (optional, default: #{}) **Returns:** - `ok` - Task submitted (result is discarded) ### Example: Concurrent HTTP Requests ```erlang %% Submit multiple requests concurrently Refs = [ py_event_loop:create_task(aiohttp, get, [<<"https://api.example.com/users">>]), py_event_loop:create_task(aiohttp, get, [<<"https://api.example.com/posts">>]), py_event_loop:create_task(aiohttp, get, [<<"https://api.example.com/comments">>]) ], %% Await all results Results = [py_event_loop:await(Ref, 10000) || Ref <- Refs]. ``` ### Example: Background Processing ```erlang %% Fire-and-forget analytics handle_request(Request) -> %% Process request... Response = process(Request), %% Log analytics in background (don't wait) ok = py_event_loop:spawn_task(analytics, track_event, [ <<"page_view">>, #{path => Request#request.path, user_id => Request#request.user_id} ]), Response. ``` ### Thread Safety The async task API is fully thread-safe: - `create_task` and `spawn_task` can be called from any Erlang process, including processes running on dirty schedulers - Task submission uses `enif_send` which is safe to call from any thread - The task queue uses mutex protection for thread-safe enqueueing - Results are delivered via standard Erlang message passing This means you can safely call `py_event_loop:create_task` from within a callback that's already running on a dirty NIF scheduler. ## Event Loop Pool The `py_event_loop_pool` module provides a pool of event loops for parallel Python coroutine execution. Inspired by libuv's "one loop per thread" model, each loop has its own worker and maintains event ordering. ### Process Affinity All tasks from the same Erlang process are routed to the same event loop (via PID hash). This guarantees that timers and related async operations from a single process execute in order. ``` ┌─► [loop_1] ──► [worker_1] ──► ordered execution [process] ──► [hash(PID)] ─┼─► [loop_2] ──► [worker_2] ──► ordered execution └─► [loop_N] ──► [worker_N] ──► ordered execution ``` ### API The pool provides the same API as `py_event_loop`, but with automatic load distribution: ```erlang %% Get event loop for current process (always the same loop for same PID) {ok, LoopRef} = py_event_loop_pool:get_loop(). %% Submit task and await result Ref = py_event_loop_pool:create_task(math, sqrt, [16.0]), {ok, 4.0} = py_event_loop_pool:await(Ref). %% Blocking call {ok, 4.0} = py_event_loop_pool:run(math, sqrt, [16.0]). %% Fire-and-forget ok = py_event_loop_pool:spawn_task(logger, info, [<<"message">>]). %% Pool statistics #{num_loops := N, supported := true} = py_event_loop_pool:get_stats(). ``` ### Configuration Configure the pool size via application environment: ```erlang %% sys.config [ {erlang_python, [ %% Number of event loops (default: erlang:system_info(schedulers)) {event_loop_pool_size, 8} ]} ]. ``` ### When to Use | Use Case | Module | |----------|--------| | Single caller, ordered tasks | `py_event_loop` | | Multiple callers, parallel execution | `py_event_loop_pool` | | High throughput, many concurrent processes | `py_event_loop_pool` | ### Performance Benchmarks on 14-core system with Python 3.14: | Pattern | Throughput | |---------|------------| | Sequential (single loop) | 83k tasks/sec | | Sequential (pool) | 150k tasks/sec | | Concurrent (50 processes) | 164k tasks/sec | | Fire-and-collect (10k tasks) | 417k tasks/sec | ### Example: Parallel Processing ```erlang %% Process items in parallel using multiple Erlang processes %% Each process gets its own event loop for ordered execution process_batch(Items) -> Parent = self(), Pids = [spawn_link(fun() -> Results = [begin Ref = py_event_loop_pool:create_task(processor, handle, [Item]), py_event_loop_pool:await(Ref) end || Item <- Chunk], Parent ! {done, self(), Results} end) || Chunk <- partition(Items, 100)], [receive {done, Pid, R} -> R end || Pid <- Pids]. ``` ## See Also - [Reactor](reactor.md) - Low-level FD-based protocol handling - [Security](security.md) - Sandbox and blocked operations - [Threading](threading.md) - For `erlang.async_call()` in asyncio contexts - [Streaming](streaming.md) - For working with Python generators - [Getting Started](getting-started.md) - Basic usage guide