Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 239 additions & 0 deletions c_src/py_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -6154,6 +6154,219 @@
return map;
}

/**
* @brief NIF: Create OWN_GIL session for event loop pool
*
* Creates a new namespace in a worker thread for a calling process.
* Uses the worker_hint for worker assignment (typically loop index).
*
* Returns {ok, WorkerId, HandleId} on success.
*/
static ERL_NIF_TERM nif_owngil_create_session(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
if (argc != 1) {
return enif_make_badarg(env);
}

if (!subinterp_thread_pool_is_ready()) {
return enif_make_tuple2(env, ATOM_ERROR,
enif_make_atom(env, "pool_not_ready"));
}

unsigned int worker_hint;
if (!enif_get_uint(env, argv[0], &worker_hint)) {
return enif_make_badarg(env);
}

/* Use worker_hint to select worker (modulo num_workers for safety) */
int num_workers = g_thread_pool.num_workers;
if (num_workers <= 0) {
return enif_make_tuple2(env, ATOM_ERROR,
enif_make_atom(env, "no_workers"));
}

int worker_id = worker_hint % num_workers;
uint64_t handle_id = atomic_fetch_add(&g_thread_pool.next_handle_id, 1);

/* Send create namespace request to worker */
subinterp_thread_worker_t *w = &g_thread_pool.workers[worker_id];

pthread_mutex_lock(&w->dispatch_mutex);

uint64_t request_id = atomic_fetch_add(&g_thread_pool.next_request_id, 1);
owngil_header_t header = {
.magic = OWNGIL_MAGIC,
.version = OWNGIL_PROTOCOL_VERSION,
.msg_type = MSG_REQUEST,
.req_type = REQ_CREATE_NS,
.request_id = request_id,
.handle_id = handle_id,
.payload_len = 0,
};

/* Write header */
if (write(w->cmd_pipe[1], &header, sizeof(header)) != sizeof(header)) {
pthread_mutex_unlock(&w->dispatch_mutex);
return enif_make_tuple2(env, ATOM_ERROR,
enif_make_atom(env, "write_failed"));
}

/* Wait for response */
owngil_header_t resp;
if (read(w->result_pipe[0], &resp, sizeof(resp)) != sizeof(resp)) {
pthread_mutex_unlock(&w->dispatch_mutex);
return enif_make_tuple2(env, ATOM_ERROR,
enif_make_atom(env, "read_failed"));
}

pthread_mutex_unlock(&w->dispatch_mutex);

if (resp.msg_type != MSG_RESPONSE) {
return enif_make_tuple2(env, ATOM_ERROR,
enif_make_atom(env, "create_failed"));
}

return enif_make_tuple3(env, ATOM_OK,
enif_make_uint(env, worker_id),
enif_make_uint64(env, handle_id));
}

/**
* @brief NIF: Submit async task to OWN_GIL worker
*
* Submits a task to run in the worker's asyncio event loop.
* Result is sent to CallerPid as {async_result, Ref, Result}.
*/
static ERL_NIF_TERM nif_owngil_submit_task(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
if (argc != 7) {
return enif_make_badarg(env);
}

if (!subinterp_thread_pool_is_ready()) {
return enif_make_tuple2(env, ATOM_ERROR,
enif_make_atom(env, "pool_not_ready"));
}

unsigned int worker_id;
ErlNifUInt64 handle_id;
ErlNifPid caller_pid;

if (!enif_get_uint(env, argv[0], &worker_id) ||
!enif_get_uint64(env, argv[1], &handle_id) ||
!enif_get_local_pid(env, argv[2], &caller_pid)) {
return enif_make_badarg(env);
}

if (worker_id >= (unsigned int)g_thread_pool.num_workers) {
return enif_make_tuple2(env, ATOM_ERROR,
enif_make_atom(env, "invalid_worker"));
}

/* Build payload tuple: {Module, Func, Args, Kwargs, CallerPid, Ref} */
ERL_NIF_TERM caller_pid_term = enif_make_pid(env, &caller_pid);
ERL_NIF_TERM kwargs = enif_make_new_map(env);
ERL_NIF_TERM payload_tuple = enif_make_tuple6(env,
argv[4], /* Module */
argv[5], /* Func */
argv[6], /* Args */
kwargs, /* Kwargs */
caller_pid_term,
argv[3] /* Ref */
);

/* Serialize to ETF */
ErlNifBinary payload_bin;
if (!enif_term_to_binary(env, payload_tuple, &payload_bin)) {
return enif_make_tuple2(env, ATOM_ERROR,
enif_make_atom(env, "serialization_failed"));
}

subinterp_thread_worker_t *w = &g_thread_pool.workers[worker_id];

pthread_mutex_lock(&w->dispatch_mutex);

uint64_t request_id = atomic_fetch_add(&g_thread_pool.next_request_id, 1);
owngil_header_t header = {
.magic = OWNGIL_MAGIC,
.version = OWNGIL_PROTOCOL_VERSION,
.msg_type = MSG_REQUEST,
.req_type = REQ_ASYNC_CALL,
.request_id = request_id,
.handle_id = handle_id,
.payload_len = payload_bin.size,
};

/* Write header and payload */
if (write(w->cmd_pipe[1], &header, sizeof(header)) != sizeof(header) ||
write(w->cmd_pipe[1], payload_bin.data, payload_bin.size) != (ssize_t)payload_bin.size) {
pthread_mutex_unlock(&w->dispatch_mutex);
enif_release_binary(&payload_bin);
return enif_make_tuple2(env, ATOM_ERROR,
enif_make_atom(env, "write_failed"));
}

enif_release_binary(&payload_bin);
pthread_mutex_unlock(&w->dispatch_mutex);

/* For async, we don't wait for response - worker sends directly to caller */
return ATOM_OK;
}

/**
* @brief NIF: Destroy OWN_GIL session
*
* Cleans up the namespace in the worker thread.
*/
static ERL_NIF_TERM nif_owngil_destroy_session(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
if (argc != 2) {
return enif_make_badarg(env);
}

if (!subinterp_thread_pool_is_ready()) {
return ATOM_OK; /* Nothing to clean up */
}

unsigned int worker_id;
ErlNifUInt64 handle_id;

if (!enif_get_uint(env, argv[0], &worker_id) ||
!enif_get_uint64(env, argv[1], &handle_id)) {
return enif_make_badarg(env);
}

if (worker_id >= (unsigned int)g_thread_pool.num_workers) {
return ATOM_OK; /* Invalid worker, nothing to do */
}

subinterp_thread_worker_t *w = &g_thread_pool.workers[worker_id];

pthread_mutex_lock(&w->dispatch_mutex);

uint64_t request_id = atomic_fetch_add(&g_thread_pool.next_request_id, 1);
owngil_header_t header = {
.magic = OWNGIL_MAGIC,
.version = OWNGIL_PROTOCOL_VERSION,
.msg_type = MSG_REQUEST,
.req_type = REQ_DESTROY_NS,
.request_id = request_id,
.handle_id = handle_id,
.payload_len = 0,
};

/* Write header */
if (write(w->cmd_pipe[1], &header, sizeof(header)) == sizeof(header)) {
/* Wait for response */
owngil_header_t resp;
read(w->result_pipe[0], &resp, sizeof(resp));

Check warning on line 6362 in c_src/py_nif.c

View workflow job for this annotation

GitHub Actions / Documentation

ignoring return value of ‘read’ declared with attribute ‘warn_unused_result’ [-Wunused-result]

Check warning on line 6362 in c_src/py_nif.c

View workflow job for this annotation

GitHub Actions / Lint

ignoring return value of ‘read’ declared with attribute ‘warn_unused_result’ [-Wunused-result]

Check warning on line 6362 in c_src/py_nif.c

View workflow job for this annotation

GitHub Actions / Free-threaded Python 3.13t

ignoring return value of ‘read’ declared with attribute ‘warn_unused_result’ [-Wunused-result]

Check warning on line 6362 in c_src/py_nif.c

View workflow job for this annotation

GitHub Actions / OTP 27.0 / Python 3.13 / ubuntu-24.04

ignoring return value of ‘read’ declared with attribute ‘warn_unused_result’ [-Wunused-result]

Check warning on line 6362 in c_src/py_nif.c

View workflow job for this annotation

GitHub Actions / OTP 27.0 / Python 3.12 / ubuntu-24.04

ignoring return value of ‘read’ declared with attribute ‘warn_unused_result’ [-Wunused-result]

Check warning on line 6362 in c_src/py_nif.c

View workflow job for this annotation

GitHub Actions / ASan / Python 3.12

ignoring return value of ‘read’ declared with attribute ‘warn_unused_result’ [-Wunused-result]

Check warning on line 6362 in c_src/py_nif.c

View workflow job for this annotation

GitHub Actions / ASan / Python 3.13

ignoring return value of ‘read’ declared with attribute ‘warn_unused_result’ [-Wunused-result]
}

pthread_mutex_unlock(&w->dispatch_mutex);

return ATOM_OK;
}

#else /* !HAVE_SUBINTERPRETERS */

/* Stub implementations for Python < 3.12 */
Expand Down Expand Up @@ -6232,6 +6445,27 @@
return map;
}

/* OWN_GIL session stubs for non-subinterpreter builds */
static ERL_NIF_TERM nif_owngil_create_session(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
(void)argc; (void)argv;
return enif_make_tuple2(env, ATOM_ERROR,
enif_make_atom(env, "not_supported"));
}

static ERL_NIF_TERM nif_owngil_submit_task(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
(void)argc; (void)argv;
return enif_make_tuple2(env, ATOM_ERROR,
enif_make_atom(env, "not_supported"));
}

static ERL_NIF_TERM nif_owngil_destroy_session(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
(void)argc; (void)argv;
return ATOM_OK;
}

#endif /* HAVE_SUBINTERPRETERS */

/* ============================================================================
Expand Down Expand Up @@ -6486,6 +6720,11 @@
{"subinterp_thread_cast", 4, nif_subinterp_thread_cast, 0},
{"subinterp_thread_async_call", 6, nif_subinterp_thread_async_call, 0},

/* OWN_GIL session management for event loop pool */
{"owngil_create_session", 1, nif_owngil_create_session, 0},
{"owngil_submit_task", 7, nif_owngil_submit_task, 0},
{"owngil_destroy_session", 2, nif_owngil_destroy_session, 0},

/* Execution mode info */
{"execution_mode", 0, nif_execution_mode, 0},
{"num_executors", 0, nif_num_executors, 0},
Expand Down
Loading
Loading