diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index d7b365e451..c3f8f967ee 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -116,6 +116,10 @@ def _cleanup(self): if not self._thread: return + # Stop the prepare watcher first to prevent race conditions + if self._preparer: + self._preparer.stop() + for conn in self._live_conns | self._new_conns | self._closed_conns: conn.close() for watcher in (conn._write_watcher, conn._read_watcher): @@ -125,8 +129,9 @@ def _cleanup(self): self.notify() # wake the timer watcher # PYTHON-752 Thread might have just been created and not started + # Use longer timeout to allow proper cleanup with self._lock_thread: - self._thread.join(timeout=1.0) + self._thread.join(timeout=5.0) if self._thread.is_alive(): log.warning( diff --git a/cassandra/io/libevwrapper.c b/cassandra/io/libevwrapper.c index f32504fa34..fc25f9ceba 100644 --- a/cassandra/io/libevwrapper.c +++ b/cassandra/io/libevwrapper.c @@ -118,7 +118,13 @@ IO_dealloc(libevwrapper_IO *self) { static void io_callback(struct ev_loop *loop, ev_io *watcher, int revents) { libevwrapper_IO *self = watcher->data; PyObject *result; - PyGILState_STATE gstate = PyGILState_Ensure(); + PyGILState_STATE gstate; + + if (!self || !self->callback) { + return; // Skip callback if object is being destroyed + } + + gstate = PyGILState_Ensure(); if (revents & EV_ERROR && errno) { result = PyObject_CallFunction(self->callback, "Obi", self, revents, errno); } else { @@ -354,6 +360,10 @@ static void prepare_callback(struct ev_loop *loop, ev_prepare *watcher, int reve PyObject *result = NULL; PyGILState_STATE gstate; + if (!self || !self->callback) { + return; // Skip callback if object is being destroyed + } + gstate = PyGILState_Ensure(); result = PyObject_CallFunction(self->callback, "O", self); if (!result) { @@ -473,6 +483,10 @@ static void timer_callback(struct ev_loop *loop, ev_timer *watcher, int revents) PyObject *result = NULL; PyGILState_STATE gstate; + if (!self || !self->callback) { + return; // Skip callback if object is being destroyed + } + gstate = PyGILState_Ensure(); result = PyObject_CallFunction(self->callback, NULL); if (!result) {