diff --git a/examples/bench_event_loop_pool.erl b/examples/bench_event_loop_pool.erl new file mode 100644 index 0000000..3dd72b3 --- /dev/null +++ b/examples/bench_event_loop_pool.erl @@ -0,0 +1,113 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! -pa _build/default/lib/erlang_python/ebin + +%%% @doc Benchmark for event loop pool parallel processing. +%%% +%%% Compares single event loop vs pool performance. +%%% +%%% Run with: +%%% rebar3 compile && escript examples/bench_event_loop_pool.erl + +-mode(compile). + +main(_Args) -> + io:format("~n=== Event Loop Pool Benchmark ===~n~n"), + + {ok, _} = application:ensure_all_started(erlang_python), + {ok, _} = py:start_contexts(), + timer:sleep(500), + + print_system_info(), + + %% Verify pool is ready + case py_event_loop_pool:get_loop() of + {ok, _} -> ok; + {error, R} -> + io:format("Pool not available: ~p~n", [R]), + halt(1) + end, + + Stats = py_event_loop_pool:get_stats(), + io:format("Pool Stats: ~p~n~n", [Stats]), + + %% Run benchmarks + bench_single_vs_pool_sequential(1000), + bench_pool_concurrent(20, 100), + bench_pool_concurrent(50, 100), + bench_pool_throughput(10000), + + io:format("=== Benchmark Complete ===~n"), + halt(0). + +print_system_info() -> + io:format("System Information:~n"), + io:format(" Erlang/OTP: ~s~n", [erlang:system_info(otp_release)]), + io:format(" Schedulers: ~p~n", [erlang:system_info(schedulers)]), + {ok, PyVer} = py:version(), + io:format(" Python: ~s~n~n", [PyVer]). + +%% Compare single loop vs pool for sequential tasks +bench_single_vs_pool_sequential(N) -> + io:format("Benchmark: Sequential tasks (single caller)~n"), + io:format(" Iterations: ~p~n", [N]), + + %% Single event loop + {T1, _} = timer:tc(fun() -> + lists:foreach(fun(I) -> + Ref = py_event_loop:create_task(math, sqrt, [float(I)]), + {ok, _} = py_event_loop:await(Ref) + end, lists:seq(1, N)) + end), + + %% Pool (should be similar since same caller = same loop) + {T2, _} = timer:tc(fun() -> + lists:foreach(fun(I) -> + Ref = py_event_loop_pool:create_task(math, sqrt, [float(I)]), + {ok, _} = py_event_loop_pool:await(Ref) + end, lists:seq(1, N)) + end), + + io:format(" py_event_loop: ~.2f ms (~p tasks/sec)~n", + [T1/1000, round(N / (T1/1000000))]), + io:format(" py_event_loop_pool: ~.2f ms (~p tasks/sec)~n~n", + [T2/1000, round(N / (T2/1000000))]). + +%% Pool with concurrent callers (each gets own loop = parallel) +bench_pool_concurrent(NumProcs, TasksPerProc) -> + TotalTasks = NumProcs * TasksPerProc, + io:format("Benchmark: Concurrent callers via pool~n"), + io:format(" Processes: ~p, Tasks/process: ~p, Total: ~p~n", + [NumProcs, TasksPerProc, TotalTasks]), + + Parent = self(), + + {Time, _} = timer:tc(fun() -> + Pids = [spawn_link(fun() -> + lists:foreach(fun(I) -> + Ref = py_event_loop_pool:create_task(math, sqrt, [float(I)]), + {ok, _} = py_event_loop_pool:await(Ref) + end, lists:seq(1, TasksPerProc)), + Parent ! {done, self()} + end) || _ <- lists:seq(1, NumProcs)], + + [receive {done, Pid} -> ok end || Pid <- Pids] + end), + + io:format(" Total time: ~.2f ms~n", [Time/1000]), + io:format(" Throughput: ~p tasks/sec~n~n", [round(TotalTasks / (Time/1000000))]). + +%% High throughput test +bench_pool_throughput(N) -> + io:format("Benchmark: Pool throughput (fire-and-collect)~n"), + io:format(" Tasks: ~p~n", [N]), + + %% Submit all tasks first, then await all + {Time, _} = timer:tc(fun() -> + Refs = [py_event_loop_pool:create_task(math, sqrt, [float(I)]) + || I <- lists:seq(1, N)], + [py_event_loop_pool:await(Ref) || Ref <- Refs] + end), + + io:format(" Total time: ~.2f ms~n", [Time/1000]), + io:format(" Throughput: ~p tasks/sec~n~n", [round(N / (Time/1000000))]). diff --git a/src/py_event_loop.erl b/src/py_event_loop.erl index f6dad5d..7a04e18 100644 --- a/src/py_event_loop.erl +++ b/src/py_event_loop.erl @@ -36,7 +36,9 @@ spawn_task/3, spawn_task/4, %% Per-process namespace API exec/1, exec/2, - eval/1, eval/2 + eval/1, eval/2, + %% Internal API (used by py_event_loop_pool) + get_process_env/0 ]). %% gen_server callbacks diff --git a/src/py_event_loop_pool.erl b/src/py_event_loop_pool.erl index e4f9761..201958f 100644 --- a/src/py_event_loop_pool.erl +++ b/src/py_event_loop_pool.erl @@ -12,25 +12,41 @@ %% See the License for the specific language governing permissions and %% limitations under the License. -%%% @doc Pool manager for event loop-based async Python execution. +%%% @doc Event Loop Worker Pool with Process Affinity. %%% -%%% This module provides a pool of event loops for executing async Python -%%% coroutines. It replaces the pthread+usleep polling model with efficient -%%% event-driven execution using enif_select and erlang.send(). +%%% This module provides a pool of event loops for parallel Python coroutine +%%% execution, inspired by libuv's "one loop per thread" model. Each loop has +%%% its own worker and maintains its own event ordering. %%% -%%% The pool uses round-robin scheduling to distribute work across event loops. +%%% Process Affinity: All tasks from the same Erlang process are routed to +%%% the same event loop (via PID hash). This guarantees that timers and +%%% related async operations from a single process execute in order. %%% %%% @private -module(py_event_loop_pool). -behaviour(gen_server). +%% Pool management -export([ start_link/0, start_link/1, - run_async/1, + stop/0, + get_loop/0, get_stats/0 ]). +%% Distributed task API (pool-aware) +-export([ + create_task/3, create_task/4, + run/3, run/4, + spawn_task/3, spawn_task/4, + await/1, await/2 +]). + +%% Legacy API +-export([run_async/1]). + +%% gen_server callbacks -export([ init/1, handle_call/3, @@ -40,13 +56,13 @@ ]). -record(state, { - loops :: tuple(), %% tuple of {LoopRef, WorkerPid} for O(1) access num_loops :: non_neg_integer(), - next_idx :: non_neg_integer(), supported :: boolean() }). --define(DEFAULT_NUM_LOOPS, 1). +%% Persistent term keys for O(1) access +-define(PT_LOOPS, {?MODULE, loops}). +-define(PT_NUM_LOOPS, {?MODULE, num_loops}). %%% ============================================================================ %%% API @@ -54,29 +70,154 @@ -spec start_link() -> {ok, pid()} | {error, term()}. start_link() -> - start_link(?DEFAULT_NUM_LOOPS). + PoolSize = application:get_env(erlang_python, event_loop_pool_size, + erlang:system_info(schedulers)), + start_link(PoolSize). -spec start_link(pos_integer()) -> {ok, pid()} | {error, term()}. start_link(NumLoops) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [NumLoops], []). -%% @doc Submit an async request to be executed on the event loop pool. -%% The request should be a map with keys: -%% ref => reference() - A reference to identify the result -%% caller => pid() - The pid to send the result to -%% module => atom() | binary() - Python module name -%% func => atom() | binary() - Python function name -%% args => list() - Arguments to pass to the function -%% kwargs => map() - Keyword arguments (optional) --spec run_async(map()) -> ok | {error, term()}. -run_async(Request) -> - gen_server:call(?MODULE, {run_async, Request}). +-spec stop() -> ok. +stop() -> + gen_server:stop(?MODULE). + +%% @doc Get an event loop reference for the calling process. +%% Always returns the same loop for the same PID (process affinity). +-spec get_loop() -> {ok, reference()} | {error, not_available}. +get_loop() -> + case pool_size() of + 0 -> {error, not_available}; + N -> + %% Hash PID to get consistent loop assignment + Hash = erlang:phash2(self()), + Idx = (Hash rem N) + 1, + {LoopRef, _WorkerPid} = get_loop_by_index(Idx), + {ok, LoopRef} + end. %% @doc Get pool statistics. -spec get_stats() -> map(). get_stats() -> gen_server:call(?MODULE, get_stats). +%%% ============================================================================ +%%% Distributed Task API (Pool-aware) +%%% ============================================================================ + +%% @doc Submit an async task and return a reference to await the result. +%% Tasks from the same process always go to the same loop (ordered execution). +%% +%% Example: +%% Ref = py_event_loop_pool:create_task(my_module, my_async_func, [arg1]), +%% {ok, Result} = py_event_loop_pool:await(Ref) +-spec create_task(Module :: atom() | binary(), Func :: atom() | binary(), + Args :: list()) -> reference(). +create_task(Module, Func, Args) -> + create_task(Module, Func, Args, #{}). + +-spec create_task(Module :: atom() | binary(), Func :: atom() | binary(), + Args :: list(), Kwargs :: map()) -> reference(). +create_task(Module, Func, Args, Kwargs) -> + case get_loop() of + {ok, LoopRef} -> + create_task_on_loop(LoopRef, Module, Func, Args, Kwargs); + {error, not_available} -> + %% Fallback to default event loop + py_event_loop:create_task(Module, Func, Args, Kwargs) + end. + +%% @doc Submit a task to a specific loop. +-spec create_task_on_loop(LoopRef :: reference(), Module :: atom() | binary(), + Func :: atom() | binary(), Args :: list(), + Kwargs :: map()) -> reference(). +create_task_on_loop(LoopRef, Module, Func, Args, Kwargs) -> + Ref = make_ref(), + Caller = self(), + ModuleBin = py_util:to_binary(Module), + FuncBin = py_util:to_binary(Func), + ok = case py_event_loop:get_process_env() of + undefined -> + py_nif:submit_task(LoopRef, Caller, Ref, ModuleBin, FuncBin, Args, Kwargs); + EnvRef -> + py_nif:submit_task_with_env(LoopRef, Caller, Ref, ModuleBin, FuncBin, Args, Kwargs, EnvRef) + end, + Ref. + +%% @doc Blocking run of an async Python function. +-spec run(Module :: atom() | binary(), Func :: atom() | binary(), Args :: list()) -> + {ok, term()} | {error, term()}. +run(Module, Func, Args) -> + run(Module, Func, Args, #{}). + +-spec run(Module :: atom() | binary(), Func :: atom() | binary(), + Args :: list(), Opts :: map()) -> {ok, term()} | {error, term()}. +run(Module, Func, Args, Opts) -> + Timeout = maps:get(timeout, Opts, 5000), + Kwargs = maps:get(kwargs, Opts, #{}), + Ref = create_task(Module, Func, Args, Kwargs), + await(Ref, Timeout). + +%% @doc Fire-and-forget task execution. +-spec spawn_task(Module :: atom() | binary(), Func :: atom() | binary(), + Args :: list()) -> ok. +spawn_task(Module, Func, Args) -> + spawn_task(Module, Func, Args, #{}). + +-spec spawn_task(Module :: atom() | binary(), Func :: atom() | binary(), + Args :: list(), Kwargs :: map()) -> ok. +spawn_task(Module, Func, Args, Kwargs) -> + case get_loop() of + {ok, LoopRef} -> + Ref = make_ref(), + CallerEnv = py_event_loop:get_process_env(), + Receiver = erlang:spawn(fun() -> + receive + {async_result, _, _} -> ok + after 30000 -> ok + end + end), + ModuleBin = py_util:to_binary(Module), + FuncBin = py_util:to_binary(Func), + ok = case CallerEnv of + undefined -> + py_nif:submit_task(LoopRef, Receiver, Ref, ModuleBin, FuncBin, Args, Kwargs); + EnvRef -> + py_nif:submit_task_with_env(LoopRef, Receiver, Ref, ModuleBin, FuncBin, Args, Kwargs, EnvRef) + end, + ok; + {error, not_available} -> + py_event_loop:spawn_task(Module, Func, Args, Kwargs) + end. + +%% @doc Wait for an async task result. +-spec await(Ref :: reference()) -> {ok, term()} | {error, term()}. +await(Ref) -> + await(Ref, 5000). + +-spec await(Ref :: reference(), Timeout :: non_neg_integer() | infinity) -> + {ok, term()} | {error, term()}. +await(Ref, Timeout) -> + receive + {async_result, Ref, Result} -> Result + after Timeout -> + {error, timeout} + end. + +%%% ============================================================================ +%%% Legacy API +%%% ============================================================================ + +%% @doc Submit an async request (legacy API for backward compatibility). +-spec run_async(map()) -> ok | {error, term()}. +run_async(Request) -> + case get_loop() of + {ok, LoopRef} -> + py_event_loop:run_async(LoopRef, Request); + {error, not_available} -> + {error, event_loop_not_available} + end. + %%% ============================================================================ %%% gen_server callbacks %%% ============================================================================ @@ -84,23 +225,21 @@ get_stats() -> init([NumLoops]) -> process_flag(trap_exit, true), - %% Create multiple independent event loops for true parallelism case create_loops(NumLoops, []) of {ok, LoopList} -> - %% Convert to tuple for O(1) element access Loops = list_to_tuple(LoopList), + persistent_term:put(?PT_LOOPS, Loops), + persistent_term:put(?PT_NUM_LOOPS, NumLoops), {ok, #state{ - loops = Loops, num_loops = NumLoops, - next_idx = 0, supported = true }}; {error, Reason} -> error_logger:warning_msg("py_event_loop_pool: failed to create loops: ~p~n", [Reason]), + persistent_term:put(?PT_LOOPS, {}), + persistent_term:put(?PT_NUM_LOOPS, 0), {ok, #state{ - loops = {}, num_loops = 0, - next_idx = 0, supported = false }} end. @@ -127,25 +266,10 @@ create_loops(N, Acc) -> handle_call(get_stats, _From, State) -> Stats = #{ num_loops => State#state.num_loops, - next_idx => State#state.next_idx, supported => State#state.supported }, {reply, Stats, State}; -handle_call({run_async, _Request}, _From, #state{supported = false} = State) -> - {reply, {error, event_loop_not_available}, State}; - -handle_call({run_async, Request}, _From, State) -> - %% Get the next loop in round-robin fashion with O(1) tuple access - Idx = State#state.next_idx rem State#state.num_loops + 1, - {LoopRef, _WorkerPid} = element(Idx, State#state.loops), - - %% Submit to the event loop - Result = py_event_loop:run_async(LoopRef, Request), - - NextState = State#state{next_idx = Idx}, - {reply, Result, NextState}; - handle_call(_Request, _From, State) -> {reply, {error, unknown_request}, State}. @@ -153,17 +277,35 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({'EXIT', _Pid, _Reason}, State) -> - %% A worker died, mark pool as unsupported - %% Future: could try to restart just the failed loop/worker pair - {noreply, State#state{supported = false, loops = {}, num_loops = 0}}; + {noreply, State#state{supported = false}}; handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, #state{loops = Loops}) -> - %% Clean up all loops and workers - lists:foreach(fun({LoopRef, WorkerPid}) -> - catch py_event_worker:stop(WorkerPid), - catch py_nif:event_loop_destroy(LoopRef) - end, tuple_to_list(Loops)), +terminate(_Reason, _State) -> + case persistent_term:get(?PT_LOOPS, {}) of + {} -> ok; + Loops -> + lists:foreach(fun({LoopRef, WorkerPid}) -> + catch py_event_worker:stop(WorkerPid), + catch py_nif:event_loop_destroy(LoopRef) + end, tuple_to_list(Loops)) + end, + catch persistent_term:erase(?PT_LOOPS), + catch persistent_term:erase(?PT_NUM_LOOPS), ok. + +%%% ============================================================================ +%%% Internal functions +%%% ============================================================================ + +%% @private Get the pool size +-spec pool_size() -> non_neg_integer(). +pool_size() -> + persistent_term:get(?PT_NUM_LOOPS, 0). + +%% @private Get a loop by 1-based index +-spec get_loop_by_index(pos_integer()) -> {reference(), pid()}. +get_loop_by_index(Idx) -> + Loops = persistent_term:get(?PT_LOOPS), + element(Idx, Loops). diff --git a/test/py_event_loop_pool_SUITE.erl b/test/py_event_loop_pool_SUITE.erl new file mode 100644 index 0000000..4aa2d5f --- /dev/null +++ b/test/py_event_loop_pool_SUITE.erl @@ -0,0 +1,163 @@ +%%% @doc Common Test suite for Event Loop Worker Pool. +%%% +%%% Tests the pool of event loops with process affinity for ordered execution. +-module(py_event_loop_pool_SUITE). + +-include_lib("common_test/include/ct.hrl"). + +-export([ + all/0, + init_per_suite/1, + end_per_suite/1, + init_per_testcase/2, + end_per_testcase/2 +]). + +-export([ + %% Pool tests + test_pool_starts/1, + test_pool_stats/1, + test_get_loop_returns_reference/1, + test_same_process_same_loop/1, + + %% Task API tests + test_create_task_and_await/1, + test_run_blocking/1, + test_spawn_task/1, + test_concurrent_tasks/1, + + %% Ordering test + test_tasks_execute_in_order/1 +]). + +all() -> + [ + test_pool_starts, + test_pool_stats, + test_get_loop_returns_reference, + test_same_process_same_loop, + test_create_task_and_await, + test_run_blocking, + test_spawn_task, + test_concurrent_tasks, + test_tasks_execute_in_order + ]. + +init_per_suite(Config) -> + case application:ensure_all_started(erlang_python) of + {ok, _} -> + timer:sleep(500), + case wait_for_pool(5000) of + ok -> Config; + {error, Reason} -> ct:fail({pool_not_ready, Reason}) + end; + {error, {App, Reason}} -> + ct:fail({failed_to_start, App, Reason}) + end. + +wait_for_pool(Timeout) when Timeout =< 0 -> + {error, timeout}; +wait_for_pool(Timeout) -> + case py_event_loop_pool:get_loop() of + {ok, LoopRef} when is_reference(LoopRef) -> ok; + _ -> + timer:sleep(100), + wait_for_pool(Timeout - 100) + end. + +end_per_suite(_Config) -> + ok = application:stop(erlang_python), + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + ok. + +%% ============================================================================ +%% Pool Tests +%% ============================================================================ + +test_pool_starts(_Config) -> + Stats = py_event_loop_pool:get_stats(), + NumLoops = maps:get(num_loops, Stats), + ct:log("Pool size: ~p", [NumLoops]), + true = NumLoops > 0, + ok. + +test_pool_stats(_Config) -> + Stats = py_event_loop_pool:get_stats(), + ct:log("Pool stats: ~p", [Stats]), + true = is_map(Stats), + true = maps:is_key(num_loops, Stats), + true = maps:is_key(supported, Stats), + true = maps:get(supported, Stats), + ok. + +test_get_loop_returns_reference(_Config) -> + {ok, LoopRef} = py_event_loop_pool:get_loop(), + ct:log("Got loop ref: ~p", [LoopRef]), + true = is_reference(LoopRef), + ok. + +test_same_process_same_loop(_Config) -> + %% Same process always gets the same loop (process affinity) + {ok, Loop1} = py_event_loop_pool:get_loop(), + {ok, Loop2} = py_event_loop_pool:get_loop(), + {ok, Loop3} = py_event_loop_pool:get_loop(), + ct:log("Loops: ~p, ~p, ~p", [Loop1, Loop2, Loop3]), + Loop1 = Loop2, + Loop2 = Loop3, + ok. + +%% ============================================================================ +%% Task API Tests +%% ============================================================================ + +test_create_task_and_await(_Config) -> + Ref = py_event_loop_pool:create_task(math, sqrt, [25.0]), + ct:log("Created task ref: ~p", [Ref]), + true = is_reference(Ref), + {ok, 5.0} = py_event_loop_pool:await(Ref, 5000), + ok. + +test_run_blocking(_Config) -> + {ok, 3} = py_event_loop_pool:run(math, floor, [3.7]), + ok. + +test_spawn_task(_Config) -> + ok = py_event_loop_pool:spawn_task(math, ceil, [2.3]), + timer:sleep(100), + ok. + +test_concurrent_tasks(_Config) -> + NumTasks = 50, + Refs = [py_event_loop_pool:create_task(math, sqrt, [float(I * I)]) + || I <- lists:seq(1, NumTasks)], + ct:log("Created ~p tasks", [length(Refs)]), + + Results = [py_event_loop_pool:await(Ref, 5000) || Ref <- Refs], + OkResults = [{ok, V} || {ok, V} <- Results], + NumTasks = length(OkResults), + + Values = lists:sort([round(V) || {ok, V} <- Results]), + Expected = lists:seq(1, NumTasks), + Values = Expected, + ok. + +%% ============================================================================ +%% Ordering Test +%% ============================================================================ + +test_tasks_execute_in_order(_Config) -> + %% All tasks from this process go to the same loop, so they execute in order + Refs = [py_event_loop_pool:create_task(math, sqrt, [float(I)]) + || I <- [1, 4, 9, 16, 25]], + + Results = [py_event_loop_pool:await(Ref, 5000) || Ref <- Refs], + ct:log("Results: ~p", [Results]), + + %% Results should be in submission order + [{ok, 1.0}, {ok, 2.0}, {ok, 3.0}, {ok, 4.0}, {ok, 5.0}] = Results, + ok.