From 95fedda53469d65fd4cff0a04fdc7c9477b46322 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Fri, 20 Mar 2026 09:26:58 +0100 Subject: [PATCH] Remove legacy py_event_router module The py_event_worker now handles all event loop functionality including FD events, timers, and task processing. This simplifies the architecture by consolidating event handling into a single worker process. Changes: - Delete src/py_event_router.erl - Remove router_pid from py_event_loop state record - Remove set_shared_router NIF from py_nif.erl and C code - Simplify C code to always use worker_pid instead of router fallback - Update all tests to use py_event_worker - Update asyncio.md documentation --- CHANGELOG.md | 7 ++ c_src/py_event_loop.c | 183 ++++++++++++----------------------- c_src/py_event_loop.h | 16 +-- c_src/py_nif.c | 1 - docs/asyncio.md | 38 ++++---- src/py_event_loop.erl | 24 +---- src/py_event_router.erl | 168 -------------------------------- src/py_nif.erl | 10 +- test/py_event_loop_SUITE.erl | 116 +++++++++++----------- test/py_udp_e2e_SUITE.erl | 6 +- 10 files changed, 161 insertions(+), 408 deletions(-) delete mode 100644 src/py_event_router.erl diff --git a/CHANGELOG.md b/CHANGELOG.md index 964cc0f..f58a856 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,13 @@ contexts uses `thread_worker_call()` rather than suspension/resume protocol; re-entrant calls to the same OWN_GIL context are not supported +### Changed + +- **Removed py_event_router** - Removed legacy `py_event_router` module. The `py_event_worker` + now handles all event loop functionality including FD events, timers, and task processing. + This simplifies the architecture by consolidating event handling into a single worker process. + The `py_nif:set_shared_router/1` function has been removed. + ### Added - **Event Loop Pool** - Pool of event loops for parallel Python coroutine execution diff --git a/c_src/py_event_loop.c b/c_src/py_event_loop.c index f560655..fb6ff3d 100644 --- a/c_src/py_event_loop.c +++ b/c_src/py_event_loop.c @@ -30,7 +30,7 @@ * 1. Python calls add_reader(fd, callback) -> enif_select(fd, READ) * 2. Erlang scheduler monitors fd * 3. When fd is ready, Erlang sends {select, Res, Ref, ready_input} - * 4. py_event_router receives message, calls dispatch_callback NIF + * 4. py_event_worker receives message, calls dispatch_callback NIF * 5. Python callback is invoked */ @@ -108,12 +108,6 @@ typedef struct { /** @brief Event loop for this interpreter */ erlang_event_loop_t *event_loop; - /** @brief Shared router PID for loops created via _loop_new() */ - ErlNifPid shared_router; - - /** @brief Whether shared_router has been set */ - bool shared_router_valid; - /** @brief Isolation mode: 0=global, 1=per_loop */ int isolation_mode; @@ -133,21 +127,13 @@ typedef struct { } py_event_loop_module_state_t; /* ============================================================================ - * Global Shared Router + * Global Shared Worker * ============================================================================ * - * A global shared router that can be used by all interpreters (main and sub). - * This is separate from the per-module state to allow subinterpreters to - * access the router even when their module state doesn't have it set. - */ -static ErlNifPid g_global_shared_router; -static bool g_global_shared_router_valid = false; -static pthread_mutex_t g_global_router_mutex = PTHREAD_MUTEX_INITIALIZER; - -/* Global shared worker for scalable I/O model. + * Global shared worker for scalable I/O model. * Used by dispatch_timer to send task_ready, ensuring process_ready_tasks * is called after timer events. This centralizes the wakeup mechanism - * so both router-dispatched and worker-dispatched timers work correctly. + * so worker-dispatched timers work correctly. */ static ErlNifPid g_global_shared_worker; static bool g_global_shared_worker_valid = false; @@ -250,34 +236,34 @@ static bool callable_cache_insert(erlang_event_loop_t *loop, PyObject *callable); /** - * Try to acquire a router for the event loop. + * Try to acquire a worker for the event loop. * - * If the loop doesn't have a router/worker configured, check the global - * shared router and use it if available. This allows subinterpreters - * to use the main interpreter's router. + * If the loop doesn't have a worker configured, check the global + * shared worker and use it if available. This allows subinterpreters + * to use the main interpreter's worker. * * @param loop Event loop to check/update - * @return true if a router/worker is available, false otherwise + * @return true if a worker is available, false otherwise */ -static bool event_loop_ensure_router(erlang_event_loop_t *loop) { +static bool event_loop_ensure_worker(erlang_event_loop_t *loop) { if (loop == NULL) { return false; } - /* Already have a router or worker */ - if (loop->has_router || loop->has_worker) { + /* Already have a worker */ + if (loop->has_worker) { return true; } - /* Try to get the global shared router */ - pthread_mutex_lock(&g_global_router_mutex); - if (g_global_shared_router_valid) { - loop->router_pid = g_global_shared_router; - loop->has_router = true; + /* Try to get the global shared worker */ + pthread_mutex_lock(&g_global_worker_mutex); + if (g_global_shared_worker_valid) { + loop->worker_pid = g_global_shared_worker; + loop->has_worker = true; } - pthread_mutex_unlock(&g_global_router_mutex); + pthread_mutex_unlock(&g_global_worker_mutex); - return loop->has_router || loop->has_worker; + return loop->has_worker; } /** @@ -1303,10 +1289,10 @@ ERL_NIF_TERM nif_add_reader(ErlNifEnv *env, int argc, } /* Scalable I/O: prefer worker, fall back to router */ - if (!event_loop_ensure_router(loop)) { + if (!event_loop_ensure_worker(loop)) { return make_error(env, "no_router"); } - ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid; + ErlNifPid *target_pid = &loop->worker_pid; /* Allocate fd resource */ fd_resource_t *fd_res = enif_alloc_resource(FD_RESOURCE_TYPE, @@ -1415,10 +1401,10 @@ ERL_NIF_TERM nif_add_writer(ErlNifEnv *env, int argc, } /* Scalable I/O: prefer worker, fall back to router */ - if (!event_loop_ensure_router(loop)) { + if (!event_loop_ensure_worker(loop)) { return make_error(env, "no_router"); } - ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid; + ErlNifPid *target_pid = &loop->worker_pid; /* Allocate fd resource */ fd_resource_t *fd_res = enif_alloc_resource(FD_RESOURCE_TYPE, @@ -1529,10 +1515,10 @@ ERL_NIF_TERM nif_call_later(ErlNifEnv *env, int argc, } /* Scalable I/O: prefer worker, fall back to router */ - if (!event_loop_ensure_router(loop)) { + if (!event_loop_ensure_worker(loop)) { return make_error(env, "no_router"); } - ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid; + ErlNifPid *target_pid = &loop->worker_pid; /* Create timer reference */ ERL_NIF_TERM timer_ref = enif_make_ref(env); @@ -1569,10 +1555,10 @@ ERL_NIF_TERM nif_cancel_timer(ErlNifEnv *env, int argc, ERL_NIF_TERM timer_ref = argv[1]; /* Scalable I/O: prefer worker, fall back to router */ - if (!event_loop_ensure_router(loop)) { + if (!event_loop_ensure_worker(loop)) { return make_error(env, "no_router"); } - ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid; + ErlNifPid *target_pid = &loop->worker_pid; /* Send message to target: {cancel_timer, TimerRef} */ ERL_NIF_TERM msg = enif_make_tuple2(env, ATOM_CANCEL_TIMER, timer_ref); @@ -1789,7 +1775,7 @@ ERL_NIF_TERM nif_get_pending(ErlNifEnv *env, int argc, /** * dispatch_callback(LoopRef, CallbackId, Type) -> ok * - * Called by py_event_router when an event occurs. + * Called by py_event_worker when an event occurs. */ ERL_NIF_TERM nif_dispatch_callback(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { @@ -1860,7 +1846,7 @@ ERL_NIF_TERM nif_dispatch_timer(ErlNifEnv *env, int argc, * * Handles a select event by dispatching callback to pending queue. * This combines get_fd_callback_id + dispatch_callback into one NIF call. - * Called by py_event_router when receiving {select, FdRes, Ref, ready_input/output}. + * Called by py_event_worker when receiving {select, FdRes, Ref, ready_input/output}. * * NOTE: Does NOT auto-reselect to avoid infinite loops with level-triggered FDs. * Python should call start_reader/start_writer after processing the callback @@ -1971,7 +1957,7 @@ ERL_NIF_TERM nif_handle_fd_event_and_reselect(ErlNifEnv *env, int argc, /* Immediately reselect for next event. * Use ATOM_UNDEFINED instead of enif_make_ref to avoid per-event allocation. * The ref is ignored by the worker anyway. */ - ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid; + ErlNifPid *target_pid = &loop->worker_pid; int select_flags = is_read ? ERL_NIF_SELECT_READ : ERL_NIF_SELECT_WRITE; enif_select(env, (ErlNifEvent)fd_res->fd, select_flags, fd_res, target_pid, ATOM_UNDEFINED); @@ -3809,7 +3795,7 @@ ERL_NIF_TERM nif_reselect_reader(ErlNifEnv *env, int argc, /* Re-register with Erlang scheduler for read monitoring. * Use ATOM_UNDEFINED instead of enif_make_ref to avoid per-event allocation. */ - ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid; + ErlNifPid *target_pid = &loop->worker_pid; int ret = enif_select(env, (ErlNifEvent)fd_res->fd, ERL_NIF_SELECT_READ, fd_res, target_pid, ATOM_UNDEFINED); @@ -3851,7 +3837,7 @@ ERL_NIF_TERM nif_reselect_writer(ErlNifEnv *env, int argc, /* Re-register with Erlang scheduler for write monitoring. * Use ATOM_UNDEFINED instead of enif_make_ref to avoid per-event allocation. */ - ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid; + ErlNifPid *target_pid = &loop->worker_pid; int ret = enif_select(env, (ErlNifEvent)fd_res->fd, ERL_NIF_SELECT_WRITE, fd_res, target_pid, ATOM_UNDEFINED); @@ -3888,13 +3874,13 @@ ERL_NIF_TERM nif_reselect_reader_fd(ErlNifEnv *env, int argc, /* Use the loop stored in the fd resource */ erlang_event_loop_t *loop = fd_res->loop; - if (loop == NULL || !event_loop_ensure_router(loop)) { + if (loop == NULL || !event_loop_ensure_worker(loop)) { return make_error(env, "no_loop"); } /* Re-register with Erlang scheduler for read monitoring. * Use ATOM_UNDEFINED instead of enif_make_ref to avoid per-event allocation. */ - ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid; + ErlNifPid *target_pid = &loop->worker_pid; int ret = enif_select(env, (ErlNifEvent)fd_res->fd, ERL_NIF_SELECT_READ, fd_res, target_pid, ATOM_UNDEFINED); @@ -3931,13 +3917,13 @@ ERL_NIF_TERM nif_reselect_writer_fd(ErlNifEnv *env, int argc, /* Use the loop stored in the fd resource */ erlang_event_loop_t *loop = fd_res->loop; - if (loop == NULL || !event_loop_ensure_router(loop)) { + if (loop == NULL || !event_loop_ensure_worker(loop)) { return make_error(env, "no_loop"); } /* Re-register with Erlang scheduler for write monitoring. * Use ATOM_UNDEFINED instead of enif_make_ref to avoid per-event allocation. */ - ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid; + ErlNifPid *target_pid = &loop->worker_pid; int ret = enif_select(env, (ErlNifEvent)fd_res->fd, ERL_NIF_SELECT_WRITE, fd_res, target_pid, ATOM_UNDEFINED); @@ -4004,13 +3990,13 @@ ERL_NIF_TERM nif_start_reader(ErlNifEnv *env, int argc, } erlang_event_loop_t *loop = fd_res->loop; - if (loop == NULL || !event_loop_ensure_router(loop)) { + if (loop == NULL || !event_loop_ensure_worker(loop)) { return make_error(env, "no_loop"); } /* Register with Erlang scheduler for read monitoring */ /* Use worker_pid when available for scalable I/O */ - ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid; + ErlNifPid *target_pid = &loop->worker_pid; int ret = enif_select(env, (ErlNifEvent)fd_res->fd, ERL_NIF_SELECT_READ, fd_res, target_pid, ATOM_UNDEFINED); @@ -4079,13 +4065,13 @@ ERL_NIF_TERM nif_start_writer(ErlNifEnv *env, int argc, } erlang_event_loop_t *loop = fd_res->loop; - if (loop == NULL || !event_loop_ensure_router(loop)) { + if (loop == NULL || !event_loop_ensure_worker(loop)) { return make_error(env, "no_loop"); } /* Register with Erlang scheduler for write monitoring */ /* Use worker_pid when available for scalable I/O */ - ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid; + ErlNifPid *target_pid = &loop->worker_pid; int ret = enif_select(env, (ErlNifEvent)fd_res->fd, ERL_NIF_SELECT_WRITE, fd_res, target_pid, ATOM_UNDEFINED); @@ -5755,39 +5741,6 @@ ERL_NIF_TERM nif_set_isolation_mode(ErlNifEnv *env, int argc, return make_error(env, "invalid_mode"); } -/** - * Set the shared router PID for per-loop created loops. - * This router will be used by all loops created via _loop_new(). - * Stores in both module state (for the current interpreter) and - * global variable (for subinterpreters). - */ -ERL_NIF_TERM nif_set_shared_router(ErlNifEnv *env, int argc, - const ERL_NIF_TERM argv[]) { - (void)argc; - - ErlNifPid router_pid; - if (!enif_get_local_pid(env, argv[0], &router_pid)) { - return make_error(env, "invalid_pid"); - } - - /* Store in global variable (accessible from all interpreters) */ - pthread_mutex_lock(&g_global_router_mutex); - g_global_shared_router = router_pid; - g_global_shared_router_valid = true; - pthread_mutex_unlock(&g_global_router_mutex); - - /* Also store in module state for backward compatibility */ - PyGILState_STATE gstate = PyGILState_Ensure(); - py_event_loop_module_state_t *state = get_module_state(); - if (state != NULL) { - state->shared_router = router_pid; - state->shared_router_valid = true; - } - PyGILState_Release(gstate); - - return ATOM_OK; -} - /** * Set the shared worker PID for task_ready notifications. * This worker receives task_ready messages from dispatch_timer and other @@ -6023,7 +5976,7 @@ static PyObject *py_add_reader(PyObject *self, PyObject *args) { fd_res->reader_active = true; fd_res->writer_active = false; /* Use worker_pid when available for scalable I/O */ - fd_res->owner_pid = loop->has_worker ? loop->worker_pid : loop->router_pid; + fd_res->owner_pid = loop->worker_pid; /* Initialize lifecycle management fields */ atomic_store(&fd_res->closing_state, FD_STATE_OPEN); @@ -6032,7 +5985,7 @@ static PyObject *py_add_reader(PyObject *self, PyObject *args) { /* Register with enif_select using the loop's persistent msg_env */ /* Use worker_pid when available for scalable I/O */ - ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid; + ErlNifPid *target_pid = &loop->worker_pid; int ret = enif_select(loop->msg_env, (ErlNifEvent)fd, ERL_NIF_SELECT_READ, fd_res, target_pid, ATOM_UNDEFINED); @@ -6099,7 +6052,7 @@ static PyObject *py_add_writer(PyObject *self, PyObject *args) { fd_res->reader_active = false; fd_res->writer_active = true; /* Use worker_pid when available for scalable I/O */ - fd_res->owner_pid = loop->has_worker ? loop->worker_pid : loop->router_pid; + fd_res->owner_pid = loop->worker_pid; /* Initialize lifecycle management fields */ atomic_store(&fd_res->closing_state, FD_STATE_OPEN); @@ -6108,7 +6061,7 @@ static PyObject *py_add_writer(PyObject *self, PyObject *args) { /* Register with enif_select using the loop's persistent msg_env */ /* Use worker_pid when available for scalable I/O */ - ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid; + ErlNifPid *target_pid = &loop->worker_pid; int ret = enif_select(loop->msg_env, (ErlNifEvent)fd, ERL_NIF_SELECT_WRITE, fd_res, target_pid, ATOM_UNDEFINED); @@ -6156,7 +6109,7 @@ static PyObject *py_schedule_timer(PyObject *self, PyObject *args) { /* Use per-interpreter event loop lookup */ erlang_event_loop_t *loop = get_interpreter_event_loop(); - if (loop == NULL || !event_loop_ensure_router(loop)) { + if (loop == NULL || !event_loop_ensure_worker(loop)) { PyErr_SetString(PyExc_RuntimeError, "Event loop not initialized"); return NULL; } @@ -6180,7 +6133,7 @@ static PyObject *py_schedule_timer(PyObject *self, PyObject *args) { ); /* Use worker_pid when available for scalable I/O */ - ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid; + ErlNifPid *target_pid = &loop->worker_pid; int send_result = enif_send(NULL, target_pid, msg_env, msg); enif_free_env(msg_env); @@ -6203,7 +6156,7 @@ static PyObject *py_cancel_timer(PyObject *self, PyObject *args) { /* Use per-interpreter event loop lookup */ erlang_event_loop_t *loop = get_interpreter_event_loop(); - if (loop == NULL || !event_loop_ensure_router(loop)) { + if (loop == NULL || !event_loop_ensure_worker(loop)) { Py_RETURN_NONE; } @@ -6220,7 +6173,7 @@ static PyObject *py_cancel_timer(PyObject *self, PyObject *args) { ); /* Use worker_pid when available for scalable I/O */ - ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid; + ErlNifPid *target_pid = &loop->worker_pid; enif_send(NULL, target_pid, msg_env, msg); enif_free_env(msg_env); Py_RETURN_NONE; @@ -6511,13 +6464,6 @@ static PyObject *py_loop_new(PyObject *self, PyObject *args) { loop->interp_id = 0; /* Main interpreter */ #endif - /* Use shared router if available from module state (for per-loop mode) */ - py_event_loop_module_state_t *state = get_module_state(); - if (state != NULL && state->shared_router_valid) { - loop->router_pid = state->shared_router; - loop->has_router = true; - } - /* Create a capsule wrapping the loop pointer */ PyObject *capsule = PyCapsule_New(loop, LOOP_CAPSULE_NAME, loop_capsule_destructor); if (capsule == NULL) { @@ -6839,7 +6785,7 @@ static PyObject *py_add_reader_for(PyObject *self, PyObject *args) { return NULL; } - if (!event_loop_ensure_router(loop)) { + if (!event_loop_ensure_worker(loop)) { PyErr_SetString(PyExc_RuntimeError, "Event loop has no router or worker"); return NULL; } @@ -6857,13 +6803,13 @@ static PyObject *py_add_reader_for(PyObject *self, PyObject *args) { fd_res->reader_active = true; fd_res->writer_active = false; /* Use worker_pid when available for scalable I/O */ - fd_res->owner_pid = loop->has_worker ? loop->worker_pid : loop->router_pid; + fd_res->owner_pid = loop->worker_pid; atomic_store(&fd_res->closing_state, FD_STATE_OPEN); fd_res->monitor_active = false; fd_res->owns_fd = false; /* Use worker_pid when available for scalable I/O */ - ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid; + ErlNifPid *target_pid = &loop->worker_pid; int ret = enif_select(loop->msg_env, (ErlNifEvent)fd, ERL_NIF_SELECT_READ, fd_res, target_pid, ATOM_UNDEFINED); @@ -6920,7 +6866,7 @@ static PyObject *py_add_writer_for(PyObject *self, PyObject *args) { return NULL; } - if (!event_loop_ensure_router(loop)) { + if (!event_loop_ensure_worker(loop)) { PyErr_SetString(PyExc_RuntimeError, "Event loop has no router or worker"); return NULL; } @@ -6938,13 +6884,13 @@ static PyObject *py_add_writer_for(PyObject *self, PyObject *args) { fd_res->reader_active = false; fd_res->writer_active = true; /* Use worker_pid when available for scalable I/O */ - fd_res->owner_pid = loop->has_worker ? loop->worker_pid : loop->router_pid; + fd_res->owner_pid = loop->worker_pid; atomic_store(&fd_res->closing_state, FD_STATE_OPEN); fd_res->monitor_active = false; fd_res->owns_fd = false; /* Use worker_pid when available for scalable I/O */ - ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid; + ErlNifPid *target_pid = &loop->worker_pid; int ret = enif_select(loop->msg_env, (ErlNifEvent)fd, ERL_NIF_SELECT_WRITE, fd_res, target_pid, ATOM_UNDEFINED); @@ -7154,7 +7100,7 @@ static PyObject *py_schedule_timer_for(PyObject *self, PyObject *args) { target_loop = loop; } - if (!event_loop_ensure_router(target_loop)) { + if (!event_loop_ensure_worker(target_loop)) { PyErr_SetString(PyExc_RuntimeError, "Event loop has no router or worker"); return NULL; } @@ -7182,7 +7128,7 @@ static PyObject *py_schedule_timer_for(PyObject *self, PyObject *args) { ); /* Use worker_pid when available for scalable I/O */ - ErlNifPid *target_pid = target_loop->has_worker ? &target_loop->worker_pid : &target_loop->router_pid; + ErlNifPid *target_pid = &target_loop->worker_pid; int send_result = enif_send(NULL, target_pid, msg_env, msg); enif_free_env(msg_env); @@ -7210,7 +7156,7 @@ static PyObject *py_cancel_timer_for(PyObject *self, PyObject *args) { Py_RETURN_NONE; } - if (!event_loop_ensure_router(loop)) { + if (!event_loop_ensure_worker(loop)) { Py_RETURN_NONE; } @@ -7226,7 +7172,7 @@ static PyObject *py_cancel_timer_for(PyObject *self, PyObject *args) { ); /* Use worker_pid when available for scalable I/O */ - ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid; + ErlNifPid *target_pid = &loop->worker_pid; enif_send(NULL, target_pid, msg_env, msg); enif_free_env(msg_env); Py_RETURN_NONE; @@ -7430,7 +7376,6 @@ int create_py_event_loop_module(void) { py_event_loop_module_state_t *state = PyModule_GetState(module); if (state != NULL) { state->event_loop = NULL; - state->shared_router_valid = false; state->isolation_mode = 0; /* global mode by default */ /* Initialize reactor cache (will be populated lazily) */ state->reactor_module = NULL; @@ -7515,13 +7460,13 @@ int create_default_event_loop(ErlNifEnv *env) { loop->interp_id = 0; /* Main interpreter */ #endif - /* Try to use the global shared router if available (for subinterpreters) */ - pthread_mutex_lock(&g_global_router_mutex); - if (g_global_shared_router_valid) { - loop->router_pid = g_global_shared_router; - loop->has_router = true; + /* Try to use the global shared worker if available (for subinterpreters) */ + pthread_mutex_lock(&g_global_worker_mutex); + if (g_global_shared_worker_valid) { + loop->worker_pid = g_global_shared_worker; + loop->has_worker = true; } - pthread_mutex_unlock(&g_global_router_mutex); + pthread_mutex_unlock(&g_global_worker_mutex); /* Store in module state for Python code to access */ set_interpreter_event_loop(loop); diff --git a/c_src/py_event_loop.h b/c_src/py_event_loop.h index 2ff1210..52a355f 100644 --- a/c_src/py_event_loop.h +++ b/c_src/py_event_loop.h @@ -264,10 +264,10 @@ typedef struct { * - Synchronization primitives */ typedef struct erlang_event_loop { - /** @brief PID of the py_event_router gen_server (legacy) */ + /** @brief Legacy field - kept for binary compatibility */ ErlNifPid router_pid; - /** @brief Whether router_pid has been set */ + /** @brief Legacy field - kept for binary compatibility */ bool has_router; /** @brief PID of the py_event_worker gen_server (scalable I/O model) */ @@ -594,9 +594,9 @@ ERL_NIF_TERM nif_get_pending(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); /** - * @brief Dispatch a callback from the router + * @brief Dispatch a callback from the worker * - * Called by py_event_router when an event occurs. + * Called by py_event_worker when an event occurs. * * NIF: dispatch_callback(LoopRef, CallbackId, Type) -> ok */ @@ -978,14 +978,6 @@ int py_event_loop_init_python(ErlNifEnv *env, erlang_event_loop_t *loop); ERL_NIF_TERM nif_set_python_event_loop(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); -/** - * @brief Set the shared router PID for per-loop created loops - * - * NIF: set_shared_router(RouterPid) -> ok | {error, Reason} - */ -ERL_NIF_TERM nif_set_shared_router(ErlNifEnv *env, int argc, - const ERL_NIF_TERM argv[]); - /** * @brief Set the shared worker PID for task_ready notifications * diff --git a/c_src/py_nif.c b/c_src/py_nif.c index 917a116..a3d9764 100644 --- a/c_src/py_nif.c +++ b/c_src/py_nif.c @@ -6568,7 +6568,6 @@ static ErlNifFunc nif_funcs[] = { /* Python event loop integration */ {"set_python_event_loop", 1, nif_set_python_event_loop, 0}, {"set_isolation_mode", 1, nif_set_isolation_mode, 0}, - {"set_shared_router", 1, nif_set_shared_router, 0}, {"set_shared_worker", 1, nif_set_shared_worker, 0}, /* ASGI optimizations */ diff --git a/docs/asyncio.md b/docs/asyncio.md index 7243273..998445f 100644 --- a/docs/asyncio.md +++ b/docs/asyncio.md @@ -47,10 +47,10 @@ erlang.run(main()) │ │ _run_once() │ └────────────────────────────────────┘ │ │ │ │ │ │ │ │ ▼ │ ┌────────────────────────────────────┐ │ -│ │ process pending │ │ py_event_router │ │ +│ │ process pending │ │ │ │ │ │ callbacks │ │ │ │ -│ └──────────────────┘ │ Routes events to correct loop │ │ -│ │ based on resource backref │ │ +│ └──────────────────┘ │ │ │ +│ │ │ │ │ ┌──────────────────┐ └────────────────────────────────────┘ │ │ │ asyncio (via │ │ │ │ erlang.run()) │ ┌────────────────────────────────────┐ │ @@ -68,8 +68,7 @@ erlang.run(main()) | Component | Role | |-----------|------| | `ErlangEventLoop` | Python asyncio event loop using Erlang for I/O and timers | -| `py_event_worker` | Erlang gen_server managing FDs and timers for a Python context | -| `py_event_router` | Routes timer/FD events to the correct event loop instance | +| `py_event_worker` | Erlang gen_server handling FDs, timers, and task processing | | `erlang.run()` | Entry point to run asyncio code with the Erlang event loop | ## Usage Patterns @@ -541,13 +540,13 @@ py_nif:close_test_fd(Fd). ## Integration with Erlang -The event loop integrates with Erlang's message passing system through a router process: +The event loop integrates with Erlang's message passing system through a worker process: ```erlang -%% Start the event router +%% Start the event worker {ok, LoopRef} = py_nif:event_loop_new(), -{ok, RouterPid} = py_event_router:start_link(LoopRef), -ok = py_nif:event_loop_set_router(LoopRef, RouterPid). +{ok, WorkerPid} = py_event_worker:start_link(<<"worker">>, LoopRef), +ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid). ``` Events are delivered as Erlang messages, enabling the event loop to participate in BEAM's supervision trees and distributed computing capabilities. @@ -598,27 +597,28 @@ t2.join() ### Internal Architecture -A shared router process handles timer and FD events for all loops: +Each event loop has an associated worker process that handles timer and FD events: ``` ┌─────────────────────────────────────────────────────────────────┐ -│ py_event_router (shared) │ +│ py_event_worker │ │ │ │ Receives: │ │ - Timer expirations from erlang:send_after │ │ - FD ready events from enif_select │ +│ - task_ready messages for processing tasks │ │ │ -│ Dispatches to correct loop via resource backref │ +│ Dispatches events to the loop's pending queue │ └─────────────────────────────────────────────────────────────────┘ - ▲ ▲ ▲ - │ │ │ - ┌────┴────┐ ┌────┴────┐ ┌────┴────┐ - │ Loop A │ │ Loop B │ │ Loop C │ - │ pending │ │ pending │ │ pending │ - └─────────┘ └─────────┘ └─────────┘ + │ + ▼ + ┌───────────┐ + │ Loop │ + │ pending │ + └───────────┘ ``` -Each loop has its own pending queue, ensuring callbacks are processed only by the loop that scheduled them. The shared router dispatches timer and FD events to the correct loop based on the capsule backref. +Each loop has its own pending queue, ensuring callbacks are processed only by the loop that scheduled them. The worker dispatches timer, FD events, and tasks to the correct loop. ## Erlang Timer Integration diff --git a/src/py_event_loop.erl b/src/py_event_loop.erl index 7a04e18..65093f9 100644 --- a/src/py_event_loop.erl +++ b/src/py_event_loop.erl @@ -54,8 +54,7 @@ -record(state, { loop_ref :: reference() | undefined, worker_pid :: pid() | undefined, - worker_id :: binary(), - router_pid :: pid() | undefined + worker_id :: binary() }). %% ============================================================================ @@ -326,10 +325,6 @@ init([]) -> %% Set global shared worker for dispatch_timer task_ready notifications ok = py_nif:set_shared_worker(WorkerPid), - %% Also start legacy router for backward compatibility - {ok, RouterPid} = py_event_router:start_link(LoopRef), - ok = py_nif:set_shared_router(RouterPid), - %% Make the event loop available to Python ok = py_nif:set_python_event_loop(LoopRef), %% Set ErlangEventLoop as the default asyncio policy @@ -337,8 +332,7 @@ init([]) -> {ok, #state{ loop_ref = LoopRef, worker_pid = WorkerPid, - worker_id = WorkerId, - router_pid = RouterPid + worker_id = WorkerId }}; {error, Reason} -> {stop, {event_loop_init_failed, Reason}} @@ -392,14 +386,11 @@ handle_call(get_loop, _From, #state{loop_ref = undefined} = State) -> ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), ok = py_nif:event_loop_set_id(LoopRef, WorkerId), ok = py_nif:set_shared_worker(WorkerPid), - {ok, RouterPid} = py_event_router:start_link(LoopRef), - ok = py_nif:set_shared_router(RouterPid), ok = py_nif:set_python_event_loop(LoopRef), NewState = State#state{ loop_ref = LoopRef, worker_pid = WorkerPid, - worker_id = WorkerId, - router_pid = RouterPid + worker_id = WorkerId }, {reply, {ok, LoopRef}, NewState}; {error, _} = Error -> @@ -418,19 +409,14 @@ handle_cast(_Msg, State) -> handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, #state{loop_ref = LoopRef, worker_pid = WorkerPid, router_pid = RouterPid}) -> +terminate(_Reason, #state{loop_ref = LoopRef, worker_pid = WorkerPid}) -> %% Reset asyncio policy back to default before destroying the loop reset_default_policy(), - %% Clean up worker (scalable I/O model) + %% Clean up worker case WorkerPid of undefined -> ok; WPid -> py_event_worker:stop(WPid) end, - %% Clean up legacy router - case RouterPid of - undefined -> ok; - RPid -> py_event_router:stop(RPid) - end, %% Clean up event loop case LoopRef of undefined -> ok; diff --git a/src/py_event_router.erl b/src/py_event_router.erl deleted file mode 100644 index 844a0c5..0000000 --- a/src/py_event_router.erl +++ /dev/null @@ -1,168 +0,0 @@ -%% Copyright 2026 Benoit Chesneau -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - -%% @doc Event router for Erlang-native asyncio event loop. -%% -%% This gen_server receives: -%% - `{select, FdRes, Ref, ready_input|ready_output}' from enif_select -%% - `{start_timer, DelayMs, CallbackId, TimerRef}' from call_later NIF -%% - Timer expiration messages from erlang:send_after -%% -%% It dispatches these events to the Python event loop via dispatch_callback NIFs. --module(py_event_router). --behaviour(gen_server). - -%% API --export([ - start_link/1, - start_link/2, - stop/1 -]). - -%% gen_server callbacks --export([ - init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3 -]). - --record(state, { - loop_ref :: reference(), - %% Map of TimerRef -> {LoopRef, ErlangTimerRef, CallbackId} - %% LoopRef is included to dispatch to the correct loop for per-loop timers - timers = #{} :: #{non_neg_integer() => {reference(), reference(), non_neg_integer()}} -}). - -%% ============================================================================ -%% API -%% ============================================================================ - -%% @doc Start the event router with a loop reference. --spec start_link(reference()) -> {ok, pid()} | {error, term()}. -start_link(LoopRef) -> - start_link(LoopRef, []). - -%% @doc Start the event router with a loop reference and options. --spec start_link(reference(), list()) -> {ok, pid()} | {error, term()}. -start_link(LoopRef, Opts) -> - case proplists:get_value(name, Opts) of - undefined -> - gen_server:start_link(?MODULE, [LoopRef], []); - Name -> - gen_server:start_link({local, Name}, ?MODULE, [LoopRef], []) - end. - -%% @doc Stop the event router. --spec stop(pid()) -> ok. -stop(Pid) -> - gen_server:stop(Pid). - -%% ============================================================================ -%% gen_server callbacks -%% ============================================================================ - -init([LoopRef]) -> - process_flag(message_queue_data, off_heap), - process_flag(trap_exit, true), - {ok, #state{loop_ref = LoopRef}}. - -handle_call(_Request, _From, State) -> - {reply, {error, unknown_request}, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -%% Handle enif_select messages for read readiness -handle_info({select, FdRes, _Ref, ready_input}, State) -> - %% Combined dispatch + reselect in single NIF call - py_nif:handle_fd_event_and_reselect(FdRes, read), - {noreply, State}; - -%% Handle enif_select messages for write readiness -handle_info({select, FdRes, _Ref, ready_output}, State) -> - %% Combined dispatch + reselect in single NIF call - py_nif:handle_fd_event_and_reselect(FdRes, write), - {noreply, State}; - -%% Handle timer start request from call_later NIF (new format with LoopRef) -handle_info({start_timer, LoopRef, DelayMs, CallbackId, TimerRef}, State) -> - #state{timers = Timers} = State, - %% Create the actual Erlang timer - ErlTimerRef = erlang:send_after(DelayMs, self(), {timeout, TimerRef}), - %% Store LoopRef so we dispatch to the correct loop - NewTimers = maps:put(TimerRef, {LoopRef, ErlTimerRef, CallbackId}, Timers), - {noreply, State#state{timers = NewTimers}}; - -%% Handle timer start request (legacy format without LoopRef - uses state's loop_ref) -handle_info({start_timer, DelayMs, CallbackId, TimerRef}, State) -> - #state{loop_ref = LoopRef, timers = Timers} = State, - %% Create the actual Erlang timer - ErlTimerRef = erlang:send_after(DelayMs, self(), {timeout, TimerRef}), - NewTimers = maps:put(TimerRef, {LoopRef, ErlTimerRef, CallbackId}, Timers), - {noreply, State#state{timers = NewTimers}}; - -%% Handle timer cancellation -handle_info({cancel_timer, TimerRef}, State) -> - #state{timers = Timers} = State, - case maps:get(TimerRef, Timers, undefined) of - undefined -> - {noreply, State}; - {_LoopRef, ErlTimerRef, _CallbackId} -> - erlang:cancel_timer(ErlTimerRef), - NewTimers = maps:remove(TimerRef, Timers), - {noreply, State#state{timers = NewTimers}} - end; - -%% Handle timer expiration -handle_info({timeout, TimerRef}, State) -> - #state{timers = Timers} = State, - case maps:get(TimerRef, Timers, undefined) of - undefined -> - %% Timer was cancelled - {noreply, State}; - {LoopRef, _ErlTimerRef, CallbackId} -> - %% Dispatch the timer callback to the correct loop - py_nif:dispatch_timer(LoopRef, CallbackId), - %% Remove from active timers - NewTimers = maps:remove(TimerRef, Timers), - {noreply, State#state{timers = NewTimers}} - end; - -%% Handle select stop notifications -handle_info({select, _FdRes, _Ref, cancelled}, State) -> - %% fd monitoring was cancelled, nothing to do - {noreply, State}; - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, #state{timers = Timers}) -> - %% Cancel all pending timers - maps:foreach(fun(_TimerRef, {_LoopRef, ErlTimerRef, _CallbackId}) -> - erlang:cancel_timer(ErlTimerRef) - end, Timers), - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%% ============================================================================ -%% Internal functions -%% ============================================================================ - -%% Note: get_fd_callback_id is no longer needed locally since handle_fd_event -%% combines get_callback_id + dispatch + auto-reselect in a single NIF call. diff --git a/src/py_nif.erl b/src/py_nif.erl index b4bc958..5aca11e 100644 --- a/src/py_nif.erl +++ b/src/py_nif.erl @@ -152,7 +152,6 @@ %% Python event loop integration set_python_event_loop/1, set_isolation_mode/1, - set_shared_router/1, set_shared_worker/1, %% ASGI optimizations asgi_build_scope/1, @@ -913,7 +912,7 @@ reselect_writer_fd(_FdRes) -> %%% ============================================================================ %% @doc Handle a select event (dispatch + auto-reselect). -%% Called by py_event_router when receiving {select, FdRes, Ref, ready_input/output}. +%% Called by py_event_worker when receiving {select, FdRes, Ref, ready_input/output}. %% This combines get_fd_callback_id + dispatch_callback + reselect into one NIF call. %% Type: read | write -spec handle_fd_event(reference(), read | write) -> ok | {error, term()}. @@ -1069,13 +1068,6 @@ set_python_event_loop(_LoopRef) -> set_isolation_mode(_Mode) -> ?NIF_STUB. -%% @doc Set the shared router PID for per-loop created loops. -%% All loops created via _loop_new() in Python will use this router -%% for FD monitoring and timer operations. --spec set_shared_router(pid()) -> ok | {error, term()}. -set_shared_router(_RouterPid) -> - ?NIF_STUB. - %% @doc Set the shared worker PID for task_ready notifications. %% The worker receives task_ready messages from dispatch_timer and other %% event sources to trigger process_ready_tasks. diff --git a/test/py_event_loop_SUITE.erl b/test/py_event_loop_SUITE.erl index 4cb7c79..86ff2dd 100644 --- a/test/py_event_loop_SUITE.erl +++ b/test/py_event_loop_SUITE.erl @@ -16,7 +16,7 @@ -export([ test_event_loop_create_destroy/1, - test_event_loop_set_router/1, + test_event_loop_set_worker/1, test_add_remove_reader/1, test_add_remove_writer/1, test_call_later_basic/1, @@ -42,7 +42,7 @@ all() -> [ test_event_loop_create_destroy, - test_event_loop_set_router, + test_event_loop_set_worker, test_add_remove_reader, test_add_remove_writer, test_call_later_basic, @@ -126,18 +126,18 @@ test_event_loop_create_destroy(_Config) -> ok = py_nif:event_loop_destroy(LoopRef), ok. -test_event_loop_set_router(_Config) -> +test_event_loop_set_worker(_Config) -> {ok, LoopRef} = py_nif:event_loop_new(), - %% Start a router process - {ok, RouterPid} = py_event_router:start_link(LoopRef), - true = is_pid(RouterPid), + %% Start a worker process + {ok, WorkerPid} = py_event_worker:start_link(<<"test">>, LoopRef), + true = is_pid(WorkerPid), - %% Set router - ok = py_nif:event_loop_set_router(LoopRef, RouterPid), + %% Set worker + ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), %% Cleanup - py_event_router:stop(RouterPid), + py_event_worker:stop(WorkerPid), py_nif:event_loop_destroy(LoopRef), ok. @@ -147,8 +147,8 @@ test_event_loop_set_router(_Config) -> test_add_remove_reader(_Config) -> {ok, LoopRef} = py_nif:event_loop_new(), - {ok, RouterPid} = py_event_router:start_link(LoopRef), - ok = py_nif:event_loop_set_router(LoopRef, RouterPid), + {ok, WorkerPid} = py_event_worker:start_link(<<"test">>, LoopRef), + ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), %% Create a pipe for testing (pipes work properly with enif_select) {ok, {ReadFd, WriteFd}} = py_nif:create_test_pipe(), @@ -163,14 +163,14 @@ test_add_remove_reader(_Config) -> %% Cleanup py_nif:close_test_fd(ReadFd), py_nif:close_test_fd(WriteFd), - py_event_router:stop(RouterPid), + py_event_worker:stop(WorkerPid), py_nif:event_loop_destroy(LoopRef), ok. test_add_remove_writer(_Config) -> {ok, LoopRef} = py_nif:event_loop_new(), - {ok, RouterPid} = py_event_router:start_link(LoopRef), - ok = py_nif:event_loop_set_router(LoopRef, RouterPid), + {ok, WorkerPid} = py_event_worker:start_link(<<"test">>, LoopRef), + ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), {ok, {ReadFd, WriteFd}} = py_nif:create_test_pipe(), @@ -182,14 +182,14 @@ test_add_remove_writer(_Config) -> py_nif:close_test_fd(ReadFd), py_nif:close_test_fd(WriteFd), - py_event_router:stop(RouterPid), + py_event_worker:stop(WorkerPid), py_nif:event_loop_destroy(LoopRef), ok. test_fd_read_callback(_Config) -> {ok, LoopRef} = py_nif:event_loop_new(), - {ok, RouterPid} = py_event_router:start_link(LoopRef), - ok = py_nif:event_loop_set_router(LoopRef, RouterPid), + {ok, WorkerPid} = py_event_worker:start_link(<<"test">>, LoopRef), + ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), %% Create pipe - read end for monitoring, write end for triggering {ok, {ReadFd, WriteFd}} = py_nif:create_test_pipe(), @@ -201,7 +201,7 @@ test_fd_read_callback(_Config) -> %% Write data to the write end to trigger read readiness on read end ok = py_nif:write_test_fd(WriteFd, <<"hello">>), - %% Wait for callback dispatch (router receives {select, ...}) + %% Wait for callback dispatch (worker receives {select, ...}) timer:sleep(100), %% Get pending events @@ -218,7 +218,7 @@ test_fd_read_callback(_Config) -> ok = py_nif:remove_reader(LoopRef, FdRef), py_nif:close_test_fd(ReadFd), py_nif:close_test_fd(WriteFd), - py_event_router:stop(RouterPid), + py_event_worker:stop(WorkerPid), py_nif:event_loop_destroy(LoopRef), ok. @@ -228,8 +228,8 @@ test_fd_read_callback(_Config) -> test_call_later_basic(_Config) -> {ok, LoopRef} = py_nif:event_loop_new(), - {ok, RouterPid} = py_event_router:start_link(LoopRef), - ok = py_nif:event_loop_set_router(LoopRef, RouterPid), + {ok, WorkerPid} = py_event_worker:start_link(<<"test">>, LoopRef), + ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), CallbackId = 100, DelayMs = 50, @@ -255,14 +255,14 @@ test_call_later_basic(_Config) -> Elapsed = T2 - T1, true = Elapsed >= DelayMs, - py_event_router:stop(RouterPid), + py_event_worker:stop(WorkerPid), py_nif:event_loop_destroy(LoopRef), ok. test_call_later_cancel(_Config) -> {ok, LoopRef} = py_nif:event_loop_new(), - {ok, RouterPid} = py_event_router:start_link(LoopRef), - ok = py_nif:event_loop_set_router(LoopRef, RouterPid), + {ok, WorkerPid} = py_event_worker:start_link(<<"test">>, LoopRef), + ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), CallbackId = 101, {ok, TimerRef} = py_nif:call_later(LoopRef, 200, CallbackId), @@ -281,14 +281,14 @@ test_call_later_cancel(_Config) -> ), false = HasTimerCallback, - py_event_router:stop(RouterPid), + py_event_worker:stop(WorkerPid), py_nif:event_loop_destroy(LoopRef), ok. test_multiple_timers(_Config) -> {ok, LoopRef} = py_nif:event_loop_new(), - {ok, RouterPid} = py_event_router:start_link(LoopRef), - ok = py_nif:event_loop_set_router(LoopRef, RouterPid), + {ok, WorkerPid} = py_event_worker:start_link(<<"test">>, LoopRef), + ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), %% Create timers with different delays %% Use larger delays for CI reliability (especially on free-threaded Python) @@ -307,7 +307,7 @@ test_multiple_timers(_Config) -> true = lists:member(2, CallbackIds), true = lists:member(3, CallbackIds), - py_event_router:stop(RouterPid), + py_event_worker:stop(WorkerPid), py_nif:event_loop_destroy(LoopRef), ok. @@ -317,8 +317,8 @@ test_multiple_timers(_Config) -> test_poll_events_timeout(_Config) -> {ok, LoopRef} = py_nif:event_loop_new(), - {ok, RouterPid} = py_event_router:start_link(LoopRef), - ok = py_nif:event_loop_set_router(LoopRef, RouterPid), + {ok, WorkerPid} = py_event_worker:start_link(<<"test">>, LoopRef), + ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), %% Poll with short timeout, no events T1 = erlang:monotonic_time(millisecond), @@ -330,14 +330,14 @@ test_poll_events_timeout(_Config) -> Elapsed = T2 - T1, true = Elapsed >= 40, %% Allow some tolerance - py_event_router:stop(RouterPid), + py_event_worker:stop(WorkerPid), py_nif:event_loop_destroy(LoopRef), ok. test_event_loop_wakeup(_Config) -> {ok, LoopRef} = py_nif:event_loop_new(), - {ok, RouterPid} = py_event_router:start_link(LoopRef), - ok = py_nif:event_loop_set_router(LoopRef, RouterPid), + {ok, WorkerPid} = py_event_worker:start_link(<<"test">>, LoopRef), + ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), %% Spawn a process to wait Self = self(), @@ -362,14 +362,14 @@ test_event_loop_wakeup(_Config) -> ct:fail(poll_not_woken_up) end, - py_event_router:stop(RouterPid), + py_event_worker:stop(WorkerPid), py_nif:event_loop_destroy(LoopRef), ok. test_get_pending_clears_queue(_Config) -> {ok, LoopRef} = py_nif:event_loop_new(), - {ok, RouterPid} = py_event_router:start_link(LoopRef), - ok = py_nif:event_loop_set_router(LoopRef, RouterPid), + {ok, WorkerPid} = py_event_worker:start_link(<<"test">>, LoopRef), + ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), %% Create a timer that fires quickly %% Use 20ms minimum for CI reliability @@ -384,14 +384,14 @@ test_get_pending_clears_queue(_Config) -> Pending2 = py_nif:get_pending(LoopRef), 0 = length(Pending2), - py_event_router:stop(RouterPid), + py_event_worker:stop(WorkerPid), py_nif:event_loop_destroy(LoopRef), ok. test_concurrent_fd_and_timer(_Config) -> {ok, LoopRef} = py_nif:event_loop_new(), - {ok, RouterPid} = py_event_router:start_link(LoopRef), - ok = py_nif:event_loop_set_router(LoopRef, RouterPid), + {ok, WorkerPid} = py_event_worker:start_link(<<"test">>, LoopRef), + ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), %% Set up pipe and timer {ok, {ReadFd, WriteFd}} = py_nif:create_test_pipe(), @@ -431,7 +431,7 @@ test_concurrent_fd_and_timer(_Config) -> ok = py_nif:remove_reader(LoopRef, FdRef), py_nif:close_test_fd(ReadFd), py_nif:close_test_fd(WriteFd), - py_event_router:stop(RouterPid), + py_event_worker:stop(WorkerPid), py_nif:event_loop_destroy(LoopRef), ok. @@ -474,8 +474,8 @@ test_tcp_connect_accept(_Config) -> test_tcp_read_write_callback(_Config) -> {ok, LoopRef} = py_nif:event_loop_new(), - {ok, RouterPid} = py_event_router:start_link(LoopRef), - ok = py_nif:event_loop_set_router(LoopRef, RouterPid), + {ok, WorkerPid} = py_event_worker:start_link(<<"test">>, LoopRef), + ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), %% Create listener and connect {ok, {ListenFd, Port}} = py_nif:create_test_tcp_listener(0), @@ -508,14 +508,14 @@ test_tcp_read_write_callback(_Config) -> py_nif:close_test_fd(ServerFd), py_nif:close_test_fd(ClientFd), py_nif:close_test_fd(ListenFd), - py_event_router:stop(RouterPid), + py_event_worker:stop(WorkerPid), py_nif:event_loop_destroy(LoopRef), ok. test_tcp_echo(_Config) -> {ok, LoopRef} = py_nif:event_loop_new(), - {ok, RouterPid} = py_event_router:start_link(LoopRef), - ok = py_nif:event_loop_set_router(LoopRef, RouterPid), + {ok, WorkerPid} = py_event_worker:start_link(<<"test">>, LoopRef), + ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), %% Create listener and connect {ok, {ListenFd, Port}} = py_nif:create_test_tcp_listener(0), @@ -569,7 +569,7 @@ test_tcp_echo(_Config) -> py_nif:close_test_fd(ServerFd), py_nif:close_test_fd(ClientFd), py_nif:close_test_fd(ListenFd), - py_event_router:stop(RouterPid), + py_event_worker:stop(WorkerPid), py_nif:event_loop_destroy(LoopRef), ok. @@ -580,8 +580,8 @@ test_tcp_echo(_Config) -> test_cancel_reader_keeps_fd_open(_Config) -> %% Test that stop_reader stops monitoring but keeps FD open {ok, LoopRef} = py_nif:event_loop_new(), - {ok, RouterPid} = py_event_router:start_link(LoopRef), - ok = py_nif:event_loop_set_router(LoopRef, RouterPid), + {ok, WorkerPid} = py_event_worker:start_link(<<"test">>, LoopRef), + ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), %% Create a pipe {ok, {ReadFd, WriteFd}} = py_nif:create_test_pipe(), @@ -612,15 +612,15 @@ test_cancel_reader_keeps_fd_open(_Config) -> %% Cleanup - use close_fd since we stopped (remove_reader won't work) ok = py_nif:close_fd(FdRef), py_nif:close_test_fd(WriteFd), - py_event_router:stop(RouterPid), + py_event_worker:stop(WorkerPid), py_nif:event_loop_destroy(LoopRef), ok. test_cancel_writer_keeps_fd_open(_Config) -> %% Test that stop_writer stops monitoring but keeps FD open {ok, LoopRef} = py_nif:event_loop_new(), - {ok, RouterPid} = py_event_router:start_link(LoopRef), - ok = py_nif:event_loop_set_router(LoopRef, RouterPid), + {ok, WorkerPid} = py_event_worker:start_link(<<"test">>, LoopRef), + ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), %% Create a pipe {ok, {ReadFd, WriteFd}} = py_nif:create_test_pipe(), @@ -642,15 +642,15 @@ test_cancel_writer_keeps_fd_open(_Config) -> %% Cleanup ok = py_nif:close_fd(FdRef), py_nif:close_test_fd(ReadFd), - py_event_router:stop(RouterPid), + py_event_worker:stop(WorkerPid), py_nif:event_loop_destroy(LoopRef), ok. test_double_close_is_safe(_Config) -> %% Test that calling close_fd twice is idempotent (no crash) {ok, LoopRef} = py_nif:event_loop_new(), - {ok, RouterPid} = py_event_router:start_link(LoopRef), - ok = py_nif:event_loop_set_router(LoopRef, RouterPid), + {ok, WorkerPid} = py_event_worker:start_link(<<"test">>, LoopRef), + ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), %% Create a pipe {ok, {ReadFd, _WriteFd}} = py_nif:create_test_pipe(), @@ -670,15 +670,15 @@ test_double_close_is_safe(_Config) -> %% Note: WriteFd is owned by us and should still work %% But we won't test that since the pipe's read end is closed - py_event_router:stop(RouterPid), + py_event_worker:stop(WorkerPid), py_nif:event_loop_destroy(LoopRef), ok. test_cancel_then_reselect(_Config) -> %% Test that we can stop and then start (resume) monitoring {ok, LoopRef} = py_nif:event_loop_new(), - {ok, RouterPid} = py_event_router:start_link(LoopRef), - ok = py_nif:event_loop_set_router(LoopRef, RouterPid), + {ok, WorkerPid} = py_event_worker:start_link(<<"test">>, LoopRef), + ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), %% Create a pipe {ok, {ReadFd, WriteFd}} = py_nif:create_test_pipe(), @@ -709,6 +709,6 @@ test_cancel_then_reselect(_Config) -> ok = py_nif:remove_reader(LoopRef, FdRef), py_nif:close_test_fd(ReadFd), py_nif:close_test_fd(WriteFd), - py_event_router:stop(RouterPid), + py_event_worker:stop(WorkerPid), py_nif:event_loop_destroy(LoopRef), ok. diff --git a/test/py_udp_e2e_SUITE.erl b/test/py_udp_e2e_SUITE.erl index 370e7ea..ba8977d 100644 --- a/test/py_udp_e2e_SUITE.erl +++ b/test/py_udp_e2e_SUITE.erl @@ -202,8 +202,8 @@ test_udp_broadcast_option(_Config) -> test_udp_read_callback(_Config) -> {ok, LoopRef} = py_nif:event_loop_new(), - {ok, RouterPid} = py_event_router:start_link(LoopRef), - ok = py_nif:event_loop_set_router(LoopRef, RouterPid), + {ok, WorkerPid} = py_event_worker:start_link(<<"test">>, LoopRef), + ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), %% Create UDP sockets {ok, {ServerFd, ServerPort}} = py_nif:create_test_udp_socket(0), @@ -233,6 +233,6 @@ test_udp_read_callback(_Config) -> ok = py_nif:remove_reader(LoopRef, FdRef), py_nif:close_test_fd(ServerFd), py_nif:close_test_fd(ClientFd), - py_event_router:stop(RouterPid), + py_event_worker:stop(WorkerPid), py_nif:event_loop_destroy(LoopRef), ok.