-
Notifications
You must be signed in to change notification settings - Fork 50
Fix AsyncioConnection race conditions causing EBADF errors (#614) #697
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| import errno | ||
| import threading | ||
|
|
||
| from cassandra.connection import Connection, ConnectionShutdown | ||
|
|
@@ -12,6 +13,24 @@ | |
|
|
||
| log = logging.getLogger(__name__) | ||
|
|
||
| # Errno values that indicate the remote peer has disconnected. | ||
| _PEER_DISCONNECT_ERRNOS = frozenset(( | ||
| errno.ENOTCONN, errno.ESHUTDOWN, | ||
| errno.ECONNRESET, errno.ECONNABORTED, | ||
| errno.EBADF, | ||
| )) | ||
|
Comment on lines
+16
to
+21
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Won't it be a ConnectionError in all those cases? |
||
|
|
||
| # Windows winerror codes for the same conditions: | ||
| # 10053 = WSAECONNABORTED, 10054 = WSAECONNRESET | ||
| _PEER_DISCONNECT_WINERRORS = frozenset((10053, 10054)) | ||
|
|
||
|
|
||
| def _is_peer_disconnect(err): | ||
| """Return True if *err* indicates the remote peer closed the connection.""" | ||
| return (isinstance(err, ConnectionError) | ||
| or getattr(err, 'winerror', None) in _PEER_DISCONNECT_WINERRORS | ||
| or getattr(err, 'errno', None) in _PEER_DISCONNECT_ERRNOS) | ||
|
|
||
|
|
||
| # This module uses ``yield from`` and ``@asyncio.coroutine`` over ``await`` and | ||
| # ``async def`` for pre-Python-3.5 compatibility, so keep in mind that the | ||
|
|
@@ -140,8 +159,7 @@ def close(self): | |
| return | ||
| self.is_closed = True | ||
|
|
||
| # close from the loop thread to avoid races when removing file | ||
| # descriptors | ||
| # Schedule async cleanup (cancel watchers, error pending requests) | ||
| asyncio.run_coroutine_threadsafe( | ||
| self._close(), loop=self._loop | ||
| ) | ||
|
|
@@ -153,11 +171,46 @@ async def _close(self): | |
| if self._read_watcher: | ||
| self._read_watcher.cancel() | ||
| if self._socket: | ||
| self._loop.remove_writer(self._socket.fileno()) | ||
| self._loop.remove_reader(self._socket.fileno()) | ||
| self._socket.close() | ||
|
|
||
| log.debug("Closed socket to %s" % (self.endpoint,)) | ||
| fd = self._socket.fileno() | ||
| if fd >= 0: | ||
|
Comment on lines
+174
to
+175
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why? When can fd be 0? |
||
| try: | ||
| self._loop.remove_writer(fd) | ||
| except NotImplementedError: | ||
github-code-quality[bot] marked this conversation as resolved.
Fixed
Show fixed
Hide fixed
|
||
| # NotImplementedError: remove_reader/remove_writer are not | ||
| # supported on Windows ProactorEventLoop (default since | ||
| # Python 3.10). ProactorEventLoop uses completion-based | ||
| # IOCP, which has no concept of "watching a fd for | ||
| # readiness" to remove. | ||
| pass | ||
| except Exception: | ||
| # It is not critical if it fails, driver can keep working, | ||
| # but it should not be happening, so logged as error | ||
| log.error("Unexpected error removing writer for %s", | ||
| self.endpoint, exc_info=True) | ||
| try: | ||
| self._loop.remove_reader(fd) | ||
| except NotImplementedError: | ||
github-code-quality[bot] marked this conversation as resolved.
Fixed
Show fixed
Hide fixed
|
||
| # NotImplementedError: remove_reader/remove_writer are not | ||
| # supported on Windows ProactorEventLoop (default since | ||
| # Python 3.10). ProactorEventLoop uses completion-based | ||
| # IOCP, which has no concept of "watching a fd for | ||
| # readiness" to remove. | ||
| pass | ||
| except Exception: | ||
| # It is not critical if it fails, driver can keep working, | ||
| # but it should not be happening, so logged as error | ||
| log.error("Unexpected error removing reader for %s", | ||
| self.endpoint, exc_info=True) | ||
|
|
||
|
Comment on lines
+199
to
+204
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In previous version you were catching |
||
| try: | ||
| self._socket.close() | ||
| except OSError: | ||
github-code-quality[bot] marked this conversation as resolved.
Fixed
Show fixed
Hide fixed
|
||
| # Ignore if socket is already closed | ||
| pass | ||
|
Comment on lines
+205
to
+209
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why would the socket be already closed? |
||
| except Exception: | ||
| log.debug("Unexpected error closing socket to %s", | ||
| self.endpoint, exc_info=True) | ||
| log.debug("Closed socket to %s" % (self.endpoint,)) | ||
|
|
||
| if not self.is_defunct: | ||
| msg = "Connection to %s was closed" % self.endpoint | ||
|
|
@@ -168,6 +221,9 @@ async def _close(self): | |
| self.connected_event.set() | ||
|
|
||
| def push(self, data): | ||
| if self.is_closed or self.is_defunct: | ||
| raise ConnectionShutdown( | ||
| "Connection to %s is already closed" % self.endpoint) | ||
|
Comment on lines
+224
to
+226
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As far as I can tell, So what does this check give us? |
||
| buff_size = self.out_buffer_size | ||
| if len(data) > buff_size: | ||
| chunks = [] | ||
|
|
@@ -196,43 +252,61 @@ async def _push_msg(self, chunks): | |
|
|
||
|
|
||
| async def handle_write(self): | ||
| while True: | ||
| try: | ||
| exc = None | ||
| try: | ||
| while True: | ||
| next_msg = await self._write_queue.get() | ||
| if next_msg: | ||
| await self._loop.sock_sendall(self._socket, next_msg) | ||
| except socket.error as err: | ||
| except asyncio.CancelledError: | ||
| pass | ||
| except Exception as err: | ||
| if _is_peer_disconnect(err): | ||
| log.debug("Connection %s closed by peer during write: %s", | ||
| self, err) | ||
| else: | ||
| exc = err | ||
| log.debug("Exception in send for %s: %s", self, err) | ||
| self.defunct(err) | ||
| return | ||
| except asyncio.CancelledError: | ||
| return | ||
| finally: | ||
| self.defunct(exc or ConnectionShutdown( | ||
| "Connection to %s was closed" % self.endpoint)) | ||
|
|
||
| async def handle_read(self): | ||
| while True: | ||
| try: | ||
| buf = await self._loop.sock_recv(self._socket, self.in_buffer_size) | ||
| self._iobuf.write(buf) | ||
| # sock_recv expects EWOULDBLOCK if socket provides no data, but | ||
| # nonblocking ssl sockets raise these instead, so we handle them | ||
| # ourselves by yielding to the event loop, where the socket will | ||
| # get the reading/writing it "wants" before retrying | ||
| except (ssl.SSLWantWriteError, ssl.SSLWantReadError): | ||
| # Apparently the preferred way to yield to the event loop from within | ||
| # a native coroutine based on https://github.com/python/asyncio/issues/284 | ||
| await asyncio.sleep(0) | ||
| continue | ||
| except socket.error as err: | ||
| log.debug("Exception during socket recv for %s: %s", | ||
| exc = None | ||
| try: | ||
| while True: | ||
| try: | ||
| buf = await self._loop.sock_recv(self._socket, self.in_buffer_size) | ||
| self._iobuf.write(buf) | ||
| # sock_recv expects EWOULDBLOCK if socket provides no data, but | ||
| # nonblocking ssl sockets raise these instead, so we handle them | ||
| # ourselves by yielding to the event loop, where the socket will | ||
| # get the reading/writing it "wants" before retrying | ||
| except (ssl.SSLWantWriteError, ssl.SSLWantReadError): | ||
| # Apparently the preferred way to yield to the event loop from within | ||
| # a native coroutine based on https://github.com/python/asyncio/issues/284 | ||
| await asyncio.sleep(0) | ||
| continue | ||
|
|
||
| if buf and self._iobuf.tell(): | ||
| self.process_io_buffer() | ||
| else: | ||
| log.debug("Connection %s closed by server", self) | ||
| exc = ConnectionShutdown( | ||
| "Connection to %s was closed by server" % self.endpoint) | ||
| return | ||
| except asyncio.CancelledError: | ||
github-code-quality[bot] marked this conversation as resolved.
Fixed
Show fixed
Hide fixed
|
||
| # Task cancellation is treated as a normal connection shutdown; | ||
| # cleanup and marking the connection as defunct are handled in finally. | ||
| pass | ||
| except Exception as err: | ||
| if _is_peer_disconnect(err): | ||
| log.debug("Connection %s closed by peer during read: %s", | ||
| self, err) | ||
| self.defunct(err) | ||
| return # leave the read loop | ||
| except asyncio.CancelledError: | ||
| return | ||
|
|
||
| if buf and self._iobuf.tell(): | ||
| self.process_io_buffer() | ||
| else: | ||
| log.debug("Connection %s closed by server", self) | ||
| self.close() | ||
| return | ||
| exc = err | ||
| log.debug("Exception during socket recv for %s: %s", | ||
| self, err) | ||
| finally: | ||
| self.defunct(exc or ConnectionShutdown( | ||
| "Connection to %s was closed" % self.endpoint)) | ||
Uh oh!
There was an error while loading. Please reload this page.