From be246a4040695f5cb57c261c33f845360cb21d53 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Thu, 19 Mar 2026 17:39:26 +0100 Subject: [PATCH] Add event loop pool with process affinity for parallel execution Event loop worker pool inspired by libuv's "one loop per thread" model. Each loop has its own worker and maintains event ordering. Process affinity: All tasks from the same Erlang process are routed to the same event loop (via PID hash), guaranteeing that timers and related async operations execute in order. Changes: - py_event_loop_pool.erl: Rewrite with process affinity, persistent_term for O(1) access, distributed task API (create_task, run, spawn_task, await) - py_event_loop.erl: Export get_process_env/0 for pool use - py_event_loop_pool_SUITE.erl: New test suite (9 tests) - bench_event_loop_pool.erl: New benchmark Configuration: - event_loop_pool_size: Number of loops (default: schedulers count) Benchmarks (Python 3.14, 14 schedulers): - Sequential (pool): 150k tasks/sec (vs 83k single loop) - Concurrent (50 procs): 164k tasks/sec - Fire-and-collect: 417k tasks/sec --- examples/bench_event_loop_pool.erl | 113 +++++++++++++ src/py_event_loop.erl | 4 +- src/py_event_loop_pool.erl | 244 +++++++++++++++++++++++------ test/py_event_loop_pool_SUITE.erl | 163 +++++++++++++++++++ 4 files changed, 472 insertions(+), 52 deletions(-) create mode 100644 examples/bench_event_loop_pool.erl create mode 100644 test/py_event_loop_pool_SUITE.erl diff --git a/examples/bench_event_loop_pool.erl b/examples/bench_event_loop_pool.erl new file mode 100644 index 0000000..3dd72b3 --- /dev/null +++ b/examples/bench_event_loop_pool.erl @@ -0,0 +1,113 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! -pa _build/default/lib/erlang_python/ebin + +%%% @doc Benchmark for event loop pool parallel processing. +%%% +%%% Compares single event loop vs pool performance. +%%% +%%% Run with: +%%% rebar3 compile && escript examples/bench_event_loop_pool.erl + +-mode(compile). + +main(_Args) -> + io:format("~n=== Event Loop Pool Benchmark ===~n~n"), + + {ok, _} = application:ensure_all_started(erlang_python), + {ok, _} = py:start_contexts(), + timer:sleep(500), + + print_system_info(), + + %% Verify pool is ready + case py_event_loop_pool:get_loop() of + {ok, _} -> ok; + {error, R} -> + io:format("Pool not available: ~p~n", [R]), + halt(1) + end, + + Stats = py_event_loop_pool:get_stats(), + io:format("Pool Stats: ~p~n~n", [Stats]), + + %% Run benchmarks + bench_single_vs_pool_sequential(1000), + bench_pool_concurrent(20, 100), + bench_pool_concurrent(50, 100), + bench_pool_throughput(10000), + + io:format("=== Benchmark Complete ===~n"), + halt(0). + +print_system_info() -> + io:format("System Information:~n"), + io:format(" Erlang/OTP: ~s~n", [erlang:system_info(otp_release)]), + io:format(" Schedulers: ~p~n", [erlang:system_info(schedulers)]), + {ok, PyVer} = py:version(), + io:format(" Python: ~s~n~n", [PyVer]). + +%% Compare single loop vs pool for sequential tasks +bench_single_vs_pool_sequential(N) -> + io:format("Benchmark: Sequential tasks (single caller)~n"), + io:format(" Iterations: ~p~n", [N]), + + %% Single event loop + {T1, _} = timer:tc(fun() -> + lists:foreach(fun(I) -> + Ref = py_event_loop:create_task(math, sqrt, [float(I)]), + {ok, _} = py_event_loop:await(Ref) + end, lists:seq(1, N)) + end), + + %% Pool (should be similar since same caller = same loop) + {T2, _} = timer:tc(fun() -> + lists:foreach(fun(I) -> + Ref = py_event_loop_pool:create_task(math, sqrt, [float(I)]), + {ok, _} = py_event_loop_pool:await(Ref) + end, lists:seq(1, N)) + end), + + io:format(" py_event_loop: ~.2f ms (~p tasks/sec)~n", + [T1/1000, round(N / (T1/1000000))]), + io:format(" py_event_loop_pool: ~.2f ms (~p tasks/sec)~n~n", + [T2/1000, round(N / (T2/1000000))]). + +%% Pool with concurrent callers (each gets own loop = parallel) +bench_pool_concurrent(NumProcs, TasksPerProc) -> + TotalTasks = NumProcs * TasksPerProc, + io:format("Benchmark: Concurrent callers via pool~n"), + io:format(" Processes: ~p, Tasks/process: ~p, Total: ~p~n", + [NumProcs, TasksPerProc, TotalTasks]), + + Parent = self(), + + {Time, _} = timer:tc(fun() -> + Pids = [spawn_link(fun() -> + lists:foreach(fun(I) -> + Ref = py_event_loop_pool:create_task(math, sqrt, [float(I)]), + {ok, _} = py_event_loop_pool:await(Ref) + end, lists:seq(1, TasksPerProc)), + Parent ! {done, self()} + end) || _ <- lists:seq(1, NumProcs)], + + [receive {done, Pid} -> ok end || Pid <- Pids] + end), + + io:format(" Total time: ~.2f ms~n", [Time/1000]), + io:format(" Throughput: ~p tasks/sec~n~n", [round(TotalTasks / (Time/1000000))]). + +%% High throughput test +bench_pool_throughput(N) -> + io:format("Benchmark: Pool throughput (fire-and-collect)~n"), + io:format(" Tasks: ~p~n", [N]), + + %% Submit all tasks first, then await all + {Time, _} = timer:tc(fun() -> + Refs = [py_event_loop_pool:create_task(math, sqrt, [float(I)]) + || I <- lists:seq(1, N)], + [py_event_loop_pool:await(Ref) || Ref <- Refs] + end), + + io:format(" Total time: ~.2f ms~n", [Time/1000]), + io:format(" Throughput: ~p tasks/sec~n~n", [round(N / (Time/1000000))]). diff --git a/src/py_event_loop.erl b/src/py_event_loop.erl index f6dad5d..7a04e18 100644 --- a/src/py_event_loop.erl +++ b/src/py_event_loop.erl @@ -36,7 +36,9 @@ spawn_task/3, spawn_task/4, %% Per-process namespace API exec/1, exec/2, - eval/1, eval/2 + eval/1, eval/2, + %% Internal API (used by py_event_loop_pool) + get_process_env/0 ]). %% gen_server callbacks diff --git a/src/py_event_loop_pool.erl b/src/py_event_loop_pool.erl index e4f9761..201958f 100644 --- a/src/py_event_loop_pool.erl +++ b/src/py_event_loop_pool.erl @@ -12,25 +12,41 @@ %% See the License for the specific language governing permissions and %% limitations under the License. -%%% @doc Pool manager for event loop-based async Python execution. +%%% @doc Event Loop Worker Pool with Process Affinity. %%% -%%% This module provides a pool of event loops for executing async Python -%%% coroutines. It replaces the pthread+usleep polling model with efficient -%%% event-driven execution using enif_select and erlang.send(). +%%% This module provides a pool of event loops for parallel Python coroutine +%%% execution, inspired by libuv's "one loop per thread" model. Each loop has +%%% its own worker and maintains its own event ordering. %%% -%%% The pool uses round-robin scheduling to distribute work across event loops. +%%% Process Affinity: All tasks from the same Erlang process are routed to +%%% the same event loop (via PID hash). This guarantees that timers and +%%% related async operations from a single process execute in order. %%% %%% @private -module(py_event_loop_pool). -behaviour(gen_server). +%% Pool management -export([ start_link/0, start_link/1, - run_async/1, + stop/0, + get_loop/0, get_stats/0 ]). +%% Distributed task API (pool-aware) +-export([ + create_task/3, create_task/4, + run/3, run/4, + spawn_task/3, spawn_task/4, + await/1, await/2 +]). + +%% Legacy API +-export([run_async/1]). + +%% gen_server callbacks -export([ init/1, handle_call/3, @@ -40,13 +56,13 @@ ]). -record(state, { - loops :: tuple(), %% tuple of {LoopRef, WorkerPid} for O(1) access num_loops :: non_neg_integer(), - next_idx :: non_neg_integer(), supported :: boolean() }). --define(DEFAULT_NUM_LOOPS, 1). +%% Persistent term keys for O(1) access +-define(PT_LOOPS, {?MODULE, loops}). +-define(PT_NUM_LOOPS, {?MODULE, num_loops}). %%% ============================================================================ %%% API @@ -54,29 +70,154 @@ -spec start_link() -> {ok, pid()} | {error, term()}. start_link() -> - start_link(?DEFAULT_NUM_LOOPS). + PoolSize = application:get_env(erlang_python, event_loop_pool_size, + erlang:system_info(schedulers)), + start_link(PoolSize). -spec start_link(pos_integer()) -> {ok, pid()} | {error, term()}. start_link(NumLoops) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [NumLoops], []). -%% @doc Submit an async request to be executed on the event loop pool. -%% The request should be a map with keys: -%% ref => reference() - A reference to identify the result -%% caller => pid() - The pid to send the result to -%% module => atom() | binary() - Python module name -%% func => atom() | binary() - Python function name -%% args => list() - Arguments to pass to the function -%% kwargs => map() - Keyword arguments (optional) --spec run_async(map()) -> ok | {error, term()}. -run_async(Request) -> - gen_server:call(?MODULE, {run_async, Request}). +-spec stop() -> ok. +stop() -> + gen_server:stop(?MODULE). + +%% @doc Get an event loop reference for the calling process. +%% Always returns the same loop for the same PID (process affinity). +-spec get_loop() -> {ok, reference()} | {error, not_available}. +get_loop() -> + case pool_size() of + 0 -> {error, not_available}; + N -> + %% Hash PID to get consistent loop assignment + Hash = erlang:phash2(self()), + Idx = (Hash rem N) + 1, + {LoopRef, _WorkerPid} = get_loop_by_index(Idx), + {ok, LoopRef} + end. %% @doc Get pool statistics. -spec get_stats() -> map(). get_stats() -> gen_server:call(?MODULE, get_stats). +%%% ============================================================================ +%%% Distributed Task API (Pool-aware) +%%% ============================================================================ + +%% @doc Submit an async task and return a reference to await the result. +%% Tasks from the same process always go to the same loop (ordered execution). +%% +%% Example: +%% Ref = py_event_loop_pool:create_task(my_module, my_async_func, [arg1]), +%% {ok, Result} = py_event_loop_pool:await(Ref) +-spec create_task(Module :: atom() | binary(), Func :: atom() | binary(), + Args :: list()) -> reference(). +create_task(Module, Func, Args) -> + create_task(Module, Func, Args, #{}). + +-spec create_task(Module :: atom() | binary(), Func :: atom() | binary(), + Args :: list(), Kwargs :: map()) -> reference(). +create_task(Module, Func, Args, Kwargs) -> + case get_loop() of + {ok, LoopRef} -> + create_task_on_loop(LoopRef, Module, Func, Args, Kwargs); + {error, not_available} -> + %% Fallback to default event loop + py_event_loop:create_task(Module, Func, Args, Kwargs) + end. + +%% @doc Submit a task to a specific loop. +-spec create_task_on_loop(LoopRef :: reference(), Module :: atom() | binary(), + Func :: atom() | binary(), Args :: list(), + Kwargs :: map()) -> reference(). +create_task_on_loop(LoopRef, Module, Func, Args, Kwargs) -> + Ref = make_ref(), + Caller = self(), + ModuleBin = py_util:to_binary(Module), + FuncBin = py_util:to_binary(Func), + ok = case py_event_loop:get_process_env() of + undefined -> + py_nif:submit_task(LoopRef, Caller, Ref, ModuleBin, FuncBin, Args, Kwargs); + EnvRef -> + py_nif:submit_task_with_env(LoopRef, Caller, Ref, ModuleBin, FuncBin, Args, Kwargs, EnvRef) + end, + Ref. + +%% @doc Blocking run of an async Python function. +-spec run(Module :: atom() | binary(), Func :: atom() | binary(), Args :: list()) -> + {ok, term()} | {error, term()}. +run(Module, Func, Args) -> + run(Module, Func, Args, #{}). + +-spec run(Module :: atom() | binary(), Func :: atom() | binary(), + Args :: list(), Opts :: map()) -> {ok, term()} | {error, term()}. +run(Module, Func, Args, Opts) -> + Timeout = maps:get(timeout, Opts, 5000), + Kwargs = maps:get(kwargs, Opts, #{}), + Ref = create_task(Module, Func, Args, Kwargs), + await(Ref, Timeout). + +%% @doc Fire-and-forget task execution. +-spec spawn_task(Module :: atom() | binary(), Func :: atom() | binary(), + Args :: list()) -> ok. +spawn_task(Module, Func, Args) -> + spawn_task(Module, Func, Args, #{}). + +-spec spawn_task(Module :: atom() | binary(), Func :: atom() | binary(), + Args :: list(), Kwargs :: map()) -> ok. +spawn_task(Module, Func, Args, Kwargs) -> + case get_loop() of + {ok, LoopRef} -> + Ref = make_ref(), + CallerEnv = py_event_loop:get_process_env(), + Receiver = erlang:spawn(fun() -> + receive + {async_result, _, _} -> ok + after 30000 -> ok + end + end), + ModuleBin = py_util:to_binary(Module), + FuncBin = py_util:to_binary(Func), + ok = case CallerEnv of + undefined -> + py_nif:submit_task(LoopRef, Receiver, Ref, ModuleBin, FuncBin, Args, Kwargs); + EnvRef -> + py_nif:submit_task_with_env(LoopRef, Receiver, Ref, ModuleBin, FuncBin, Args, Kwargs, EnvRef) + end, + ok; + {error, not_available} -> + py_event_loop:spawn_task(Module, Func, Args, Kwargs) + end. + +%% @doc Wait for an async task result. +-spec await(Ref :: reference()) -> {ok, term()} | {error, term()}. +await(Ref) -> + await(Ref, 5000). + +-spec await(Ref :: reference(), Timeout :: non_neg_integer() | infinity) -> + {ok, term()} | {error, term()}. +await(Ref, Timeout) -> + receive + {async_result, Ref, Result} -> Result + after Timeout -> + {error, timeout} + end. + +%%% ============================================================================ +%%% Legacy API +%%% ============================================================================ + +%% @doc Submit an async request (legacy API for backward compatibility). +-spec run_async(map()) -> ok | {error, term()}. +run_async(Request) -> + case get_loop() of + {ok, LoopRef} -> + py_event_loop:run_async(LoopRef, Request); + {error, not_available} -> + {error, event_loop_not_available} + end. + %%% ============================================================================ %%% gen_server callbacks %%% ============================================================================ @@ -84,23 +225,21 @@ get_stats() -> init([NumLoops]) -> process_flag(trap_exit, true), - %% Create multiple independent event loops for true parallelism case create_loops(NumLoops, []) of {ok, LoopList} -> - %% Convert to tuple for O(1) element access Loops = list_to_tuple(LoopList), + persistent_term:put(?PT_LOOPS, Loops), + persistent_term:put(?PT_NUM_LOOPS, NumLoops), {ok, #state{ - loops = Loops, num_loops = NumLoops, - next_idx = 0, supported = true }}; {error, Reason} -> error_logger:warning_msg("py_event_loop_pool: failed to create loops: ~p~n", [Reason]), + persistent_term:put(?PT_LOOPS, {}), + persistent_term:put(?PT_NUM_LOOPS, 0), {ok, #state{ - loops = {}, num_loops = 0, - next_idx = 0, supported = false }} end. @@ -127,25 +266,10 @@ create_loops(N, Acc) -> handle_call(get_stats, _From, State) -> Stats = #{ num_loops => State#state.num_loops, - next_idx => State#state.next_idx, supported => State#state.supported }, {reply, Stats, State}; -handle_call({run_async, _Request}, _From, #state{supported = false} = State) -> - {reply, {error, event_loop_not_available}, State}; - -handle_call({run_async, Request}, _From, State) -> - %% Get the next loop in round-robin fashion with O(1) tuple access - Idx = State#state.next_idx rem State#state.num_loops + 1, - {LoopRef, _WorkerPid} = element(Idx, State#state.loops), - - %% Submit to the event loop - Result = py_event_loop:run_async(LoopRef, Request), - - NextState = State#state{next_idx = Idx}, - {reply, Result, NextState}; - handle_call(_Request, _From, State) -> {reply, {error, unknown_request}, State}. @@ -153,17 +277,35 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({'EXIT', _Pid, _Reason}, State) -> - %% A worker died, mark pool as unsupported - %% Future: could try to restart just the failed loop/worker pair - {noreply, State#state{supported = false, loops = {}, num_loops = 0}}; + {noreply, State#state{supported = false}}; handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, #state{loops = Loops}) -> - %% Clean up all loops and workers - lists:foreach(fun({LoopRef, WorkerPid}) -> - catch py_event_worker:stop(WorkerPid), - catch py_nif:event_loop_destroy(LoopRef) - end, tuple_to_list(Loops)), +terminate(_Reason, _State) -> + case persistent_term:get(?PT_LOOPS, {}) of + {} -> ok; + Loops -> + lists:foreach(fun({LoopRef, WorkerPid}) -> + catch py_event_worker:stop(WorkerPid), + catch py_nif:event_loop_destroy(LoopRef) + end, tuple_to_list(Loops)) + end, + catch persistent_term:erase(?PT_LOOPS), + catch persistent_term:erase(?PT_NUM_LOOPS), ok. + +%%% ============================================================================ +%%% Internal functions +%%% ============================================================================ + +%% @private Get the pool size +-spec pool_size() -> non_neg_integer(). +pool_size() -> + persistent_term:get(?PT_NUM_LOOPS, 0). + +%% @private Get a loop by 1-based index +-spec get_loop_by_index(pos_integer()) -> {reference(), pid()}. +get_loop_by_index(Idx) -> + Loops = persistent_term:get(?PT_LOOPS), + element(Idx, Loops). diff --git a/test/py_event_loop_pool_SUITE.erl b/test/py_event_loop_pool_SUITE.erl new file mode 100644 index 0000000..4aa2d5f --- /dev/null +++ b/test/py_event_loop_pool_SUITE.erl @@ -0,0 +1,163 @@ +%%% @doc Common Test suite for Event Loop Worker Pool. +%%% +%%% Tests the pool of event loops with process affinity for ordered execution. +-module(py_event_loop_pool_SUITE). + +-include_lib("common_test/include/ct.hrl"). + +-export([ + all/0, + init_per_suite/1, + end_per_suite/1, + init_per_testcase/2, + end_per_testcase/2 +]). + +-export([ + %% Pool tests + test_pool_starts/1, + test_pool_stats/1, + test_get_loop_returns_reference/1, + test_same_process_same_loop/1, + + %% Task API tests + test_create_task_and_await/1, + test_run_blocking/1, + test_spawn_task/1, + test_concurrent_tasks/1, + + %% Ordering test + test_tasks_execute_in_order/1 +]). + +all() -> + [ + test_pool_starts, + test_pool_stats, + test_get_loop_returns_reference, + test_same_process_same_loop, + test_create_task_and_await, + test_run_blocking, + test_spawn_task, + test_concurrent_tasks, + test_tasks_execute_in_order + ]. + +init_per_suite(Config) -> + case application:ensure_all_started(erlang_python) of + {ok, _} -> + timer:sleep(500), + case wait_for_pool(5000) of + ok -> Config; + {error, Reason} -> ct:fail({pool_not_ready, Reason}) + end; + {error, {App, Reason}} -> + ct:fail({failed_to_start, App, Reason}) + end. + +wait_for_pool(Timeout) when Timeout =< 0 -> + {error, timeout}; +wait_for_pool(Timeout) -> + case py_event_loop_pool:get_loop() of + {ok, LoopRef} when is_reference(LoopRef) -> ok; + _ -> + timer:sleep(100), + wait_for_pool(Timeout - 100) + end. + +end_per_suite(_Config) -> + ok = application:stop(erlang_python), + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + ok. + +%% ============================================================================ +%% Pool Tests +%% ============================================================================ + +test_pool_starts(_Config) -> + Stats = py_event_loop_pool:get_stats(), + NumLoops = maps:get(num_loops, Stats), + ct:log("Pool size: ~p", [NumLoops]), + true = NumLoops > 0, + ok. + +test_pool_stats(_Config) -> + Stats = py_event_loop_pool:get_stats(), + ct:log("Pool stats: ~p", [Stats]), + true = is_map(Stats), + true = maps:is_key(num_loops, Stats), + true = maps:is_key(supported, Stats), + true = maps:get(supported, Stats), + ok. + +test_get_loop_returns_reference(_Config) -> + {ok, LoopRef} = py_event_loop_pool:get_loop(), + ct:log("Got loop ref: ~p", [LoopRef]), + true = is_reference(LoopRef), + ok. + +test_same_process_same_loop(_Config) -> + %% Same process always gets the same loop (process affinity) + {ok, Loop1} = py_event_loop_pool:get_loop(), + {ok, Loop2} = py_event_loop_pool:get_loop(), + {ok, Loop3} = py_event_loop_pool:get_loop(), + ct:log("Loops: ~p, ~p, ~p", [Loop1, Loop2, Loop3]), + Loop1 = Loop2, + Loop2 = Loop3, + ok. + +%% ============================================================================ +%% Task API Tests +%% ============================================================================ + +test_create_task_and_await(_Config) -> + Ref = py_event_loop_pool:create_task(math, sqrt, [25.0]), + ct:log("Created task ref: ~p", [Ref]), + true = is_reference(Ref), + {ok, 5.0} = py_event_loop_pool:await(Ref, 5000), + ok. + +test_run_blocking(_Config) -> + {ok, 3} = py_event_loop_pool:run(math, floor, [3.7]), + ok. + +test_spawn_task(_Config) -> + ok = py_event_loop_pool:spawn_task(math, ceil, [2.3]), + timer:sleep(100), + ok. + +test_concurrent_tasks(_Config) -> + NumTasks = 50, + Refs = [py_event_loop_pool:create_task(math, sqrt, [float(I * I)]) + || I <- lists:seq(1, NumTasks)], + ct:log("Created ~p tasks", [length(Refs)]), + + Results = [py_event_loop_pool:await(Ref, 5000) || Ref <- Refs], + OkResults = [{ok, V} || {ok, V} <- Results], + NumTasks = length(OkResults), + + Values = lists:sort([round(V) || {ok, V} <- Results]), + Expected = lists:seq(1, NumTasks), + Values = Expected, + ok. + +%% ============================================================================ +%% Ordering Test +%% ============================================================================ + +test_tasks_execute_in_order(_Config) -> + %% All tasks from this process go to the same loop, so they execute in order + Refs = [py_event_loop_pool:create_task(math, sqrt, [float(I)]) + || I <- [1, 4, 9, 16, 25]], + + Results = [py_event_loop_pool:await(Ref, 5000) || Ref <- Refs], + ct:log("Results: ~p", [Results]), + + %% Results should be in submission order + [{ok, 1.0}, {ok, 2.0}, {ok, 3.0}, {ok, 4.0}, {ok, 5.0}] = Results, + ok.