Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@
contexts uses `thread_worker_call()` rather than suspension/resume protocol;
re-entrant calls to the same OWN_GIL context are not supported

### Changed

- **Removed py_event_router** - Removed legacy `py_event_router` module. The `py_event_worker`
now handles all event loop functionality including FD events, timers, and task processing.
This simplifies the architecture by consolidating event handling into a single worker process.
The `py_nif:set_shared_router/1` function has been removed.

### Added

- **Event Loop Pool** - Pool of event loops for parallel Python coroutine execution
Expand Down
183 changes: 64 additions & 119 deletions c_src/py_event_loop.c

Large diffs are not rendered by default.

16 changes: 4 additions & 12 deletions c_src/py_event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,10 @@ typedef struct {
* - Synchronization primitives
*/
typedef struct erlang_event_loop {
/** @brief PID of the py_event_router gen_server (legacy) */
/** @brief Legacy field - kept for binary compatibility */
ErlNifPid router_pid;

/** @brief Whether router_pid has been set */
/** @brief Legacy field - kept for binary compatibility */
bool has_router;

/** @brief PID of the py_event_worker gen_server (scalable I/O model) */
Expand Down Expand Up @@ -594,9 +594,9 @@ ERL_NIF_TERM nif_get_pending(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]);

/**
* @brief Dispatch a callback from the router
* @brief Dispatch a callback from the worker
*
* Called by py_event_router when an event occurs.
* Called by py_event_worker when an event occurs.
*
* NIF: dispatch_callback(LoopRef, CallbackId, Type) -> ok
*/
Expand Down Expand Up @@ -978,14 +978,6 @@ int py_event_loop_init_python(ErlNifEnv *env, erlang_event_loop_t *loop);
ERL_NIF_TERM nif_set_python_event_loop(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]);

/**
* @brief Set the shared router PID for per-loop created loops
*
* NIF: set_shared_router(RouterPid) -> ok | {error, Reason}
*/
ERL_NIF_TERM nif_set_shared_router(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]);

/**
* @brief Set the shared worker PID for task_ready notifications
*
Expand Down
1 change: 0 additions & 1 deletion c_src/py_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -6568,7 +6568,6 @@ static ErlNifFunc nif_funcs[] = {
/* Python event loop integration */
{"set_python_event_loop", 1, nif_set_python_event_loop, 0},
{"set_isolation_mode", 1, nif_set_isolation_mode, 0},
{"set_shared_router", 1, nif_set_shared_router, 0},
{"set_shared_worker", 1, nif_set_shared_worker, 0},

/* ASGI optimizations */
Expand Down
38 changes: 19 additions & 19 deletions docs/asyncio.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ erlang.run(main())
│ │ _run_once() │ └────────────────────────────────────┘ │
│ │ │ │ │
│ │ ▼ │ ┌────────────────────────────────────┐ │
│ │ process pending │ │ py_event_router │ │
│ │ process pending │ │ │ │
│ │ callbacks │ │ │ │
│ └──────────────────┘ │ Routes events to correct loop │ │
│ │ based on resource backref │ │
│ └──────────────────┘ │ │ │
│ │ │ │
│ ┌──────────────────┐ └────────────────────────────────────┘ │
│ │ asyncio (via │ │
│ │ erlang.run()) │ ┌────────────────────────────────────┐ │
Expand All @@ -68,8 +68,7 @@ erlang.run(main())
| Component | Role |
|-----------|------|
| `ErlangEventLoop` | Python asyncio event loop using Erlang for I/O and timers |
| `py_event_worker` | Erlang gen_server managing FDs and timers for a Python context |
| `py_event_router` | Routes timer/FD events to the correct event loop instance |
| `py_event_worker` | Erlang gen_server handling FDs, timers, and task processing |
| `erlang.run()` | Entry point to run asyncio code with the Erlang event loop |

## Usage Patterns
Expand Down Expand Up @@ -541,13 +540,13 @@ py_nif:close_test_fd(Fd).

## Integration with Erlang

The event loop integrates with Erlang's message passing system through a router process:
The event loop integrates with Erlang's message passing system through a worker process:

```erlang
%% Start the event router
%% Start the event worker
{ok, LoopRef} = py_nif:event_loop_new(),
{ok, RouterPid} = py_event_router:start_link(LoopRef),
ok = py_nif:event_loop_set_router(LoopRef, RouterPid).
{ok, WorkerPid} = py_event_worker:start_link(<<"worker">>, LoopRef),
ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid).
```

Events are delivered as Erlang messages, enabling the event loop to participate in BEAM's supervision trees and distributed computing capabilities.
Expand Down Expand Up @@ -598,27 +597,28 @@ t2.join()

### Internal Architecture

A shared router process handles timer and FD events for all loops:
Each event loop has an associated worker process that handles timer and FD events:

```
┌─────────────────────────────────────────────────────────────────┐
py_event_router (shared)
py_event_worker
│ │
│ Receives: │
│ - Timer expirations from erlang:send_after │
│ - FD ready events from enif_select │
│ - task_ready messages for processing tasks │
│ │
│ Dispatches to correct loop via resource backref
│ Dispatches events to the loop's pending queue
└─────────────────────────────────────────────────────────────────┘
▲ ▲
│ │
┌────┴────┐ ┌────┴────┐ ┌────────┐
│ Loop A │ │ Loop B │ Loop C
│ pending │ │ pending │ pending │
└─────────┘ └─────────┘ └─────────┘
┌───────────┐
Loop
pending
───────────┘
```

Each loop has its own pending queue, ensuring callbacks are processed only by the loop that scheduled them. The shared router dispatches timer and FD events to the correct loop based on the capsule backref.
Each loop has its own pending queue, ensuring callbacks are processed only by the loop that scheduled them. The worker dispatches timer, FD events, and tasks to the correct loop.

## Erlang Timer Integration

Expand Down
24 changes: 5 additions & 19 deletions src/py_event_loop.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@
-record(state, {
loop_ref :: reference() | undefined,
worker_pid :: pid() | undefined,
worker_id :: binary(),
router_pid :: pid() | undefined
worker_id :: binary()
}).

%% ============================================================================
Expand Down Expand Up @@ -326,19 +325,14 @@ init([]) ->
%% Set global shared worker for dispatch_timer task_ready notifications
ok = py_nif:set_shared_worker(WorkerPid),

%% Also start legacy router for backward compatibility
{ok, RouterPid} = py_event_router:start_link(LoopRef),
ok = py_nif:set_shared_router(RouterPid),

%% Make the event loop available to Python
ok = py_nif:set_python_event_loop(LoopRef),
%% Set ErlangEventLoop as the default asyncio policy
ok = set_default_policy(),
{ok, #state{
loop_ref = LoopRef,
worker_pid = WorkerPid,
worker_id = WorkerId,
router_pid = RouterPid
worker_id = WorkerId
}};
{error, Reason} ->
{stop, {event_loop_init_failed, Reason}}
Expand Down Expand Up @@ -392,14 +386,11 @@ handle_call(get_loop, _From, #state{loop_ref = undefined} = State) ->
ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid),
ok = py_nif:event_loop_set_id(LoopRef, WorkerId),
ok = py_nif:set_shared_worker(WorkerPid),
{ok, RouterPid} = py_event_router:start_link(LoopRef),
ok = py_nif:set_shared_router(RouterPid),
ok = py_nif:set_python_event_loop(LoopRef),
NewState = State#state{
loop_ref = LoopRef,
worker_pid = WorkerPid,
worker_id = WorkerId,
router_pid = RouterPid
worker_id = WorkerId
},
{reply, {ok, LoopRef}, NewState};
{error, _} = Error ->
Expand All @@ -418,19 +409,14 @@ handle_cast(_Msg, State) ->
handle_info(_Info, State) ->
{noreply, State}.

terminate(_Reason, #state{loop_ref = LoopRef, worker_pid = WorkerPid, router_pid = RouterPid}) ->
terminate(_Reason, #state{loop_ref = LoopRef, worker_pid = WorkerPid}) ->
%% Reset asyncio policy back to default before destroying the loop
reset_default_policy(),
%% Clean up worker (scalable I/O model)
%% Clean up worker
case WorkerPid of
undefined -> ok;
WPid -> py_event_worker:stop(WPid)
end,
%% Clean up legacy router
case RouterPid of
undefined -> ok;
RPid -> py_event_router:stop(RPid)
end,
%% Clean up event loop
case LoopRef of
undefined -> ok;
Expand Down
168 changes: 0 additions & 168 deletions src/py_event_router.erl

This file was deleted.

Loading
Loading