From d137c0a1b63941c5aaface5cdcc22c53c0e81f13 Mon Sep 17 00:00:00 2001 From: DongHyun Kim Date: Sun, 19 Nov 2023 04:47:17 +0900 Subject: [PATCH 1/3] Revert "Fix the slow performance on Windows (#101)" This reverts commit c91a9152ffb0fb2d372a43ada58c2f2772867c14. --- pyproject.toml | 3 - qasync/__init__.py | 6 +- qasync/_windows.py | 133 ++++++++++++++++++++++++--------------- tests/test_qeventloop.py | 8 --- 4 files changed, 84 insertions(+), 66 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index acf6949..1689732 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,3 @@ build-backend = "poetry.core.masonry.api" [tool.pytest] addopts = "-n auto" testpaths = ["tests"] - -[tool.pytest.ini_options] -markers = ["raises"] diff --git a/qasync/__init__.py b/qasync/__init__.py index 3363e97..d9dc8aa 100644 --- a/qasync/__init__.py +++ b/qasync/__init__.py @@ -373,10 +373,10 @@ def run_forever(self): self.__log_debug("Starting Qt event loop") asyncio.events._set_running_loop(self) rslt = -1 - if hasattr(self.__app, "exec"): - rslt = self.__app.exec() - else: + if hasattr(self.__app, "exec_"): rslt = self.__app.exec_() + else: + rslt = self.__app.exec() self.__log_debug("Qt event loop ended with result %s", rslt) return rslt finally: diff --git a/qasync/_windows.py b/qasync/_windows.py index 6150435..e11a027 100644 --- a/qasync/_windows.py +++ b/qasync/_windows.py @@ -61,6 +61,7 @@ class _IocpProactor(windows_events.IocpProactor): def __init__(self): self.__events = [] super(_IocpProactor, self).__init__() + self._lock = QtCore.QMutex() def select(self, timeout=None): """Override in order to handle events in a threadsafe manner.""" @@ -71,36 +72,60 @@ def select(self, timeout=None): return tmp def close(self): - if self._iocp is None: - # already closed - return - - # Cancel remaining registered operations. - for fut, ov, obj, callback in list(self._cache.values()): - if fut.cancelled(): - # Nothing to do with cancelled futures - pass - elif isinstance(fut, windows_events._WaitCancelFuture): - # _WaitCancelFuture must not be cancelled - pass - else: - try: - fut.cancel() - except OSError as exc: - if self._loop is not None: - context = { - "message": "Cancelling a future failed", - "exception": exc, - "future": fut, - } - if fut._source_traceback: - context["source_traceback"] = fut._source_traceback - self._loop.call_exception_handler(context) - - self._results = [] - - _winapi.CloseHandle(self._iocp) - self._iocp = None + self._logger.debug("Closing") + super(_IocpProactor, self).close() + + # Wrap all I/O submission methods to acquire the internal lock first; listed + # in the order they appear in the base class source code. + + def recv(self, conn, nbytes, flags=0): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).recv(conn, nbytes, flags) + + def recv_into(self, conn, buf, flags=0): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).recv_into(conn, buf, flags) + + def recvfrom(self, conn, nbytes, flags=0): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).recvfrom(conn, nbytes, flags) + + def recvfrom_into(self, conn, buf, flags=0): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).recvfrom_into(conn, buf, flags) + + def sendto(self, conn, buf, flags=0, addr=None): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).sendto(conn, buf, flags, addr) + + def send(self, conn, buf, flags=0): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).send(conn, buf, flags) + + def accept(self, listener): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).accept(listener) + + def connect(self, conn, address): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).connect(conn, address) + + def sendfile(self, sock, file, offset, count): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).sendfile(sock, file, offset, count) + + def accept_pipe(self, pipe): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).accept_pipe(pipe) + + # connect_pipe() does not actually use the delayed completion machinery. + + # This takes care of wait_for_handle() too. + def _wait_for_handle(self, handle, timeout, _is_cancel): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self)._wait_for_handle( + handle, timeout, _is_cancel + ) def _poll(self, timeout=None): """Override in order to handle events in a threadsafe manner.""" @@ -115,29 +140,30 @@ def _poll(self, timeout=None): if ms >= UINT32_MAX: raise ValueError("timeout too big") - while True: - # self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format( - # ms, threading.get_ident())) - status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) - if status is None: - break - ms = 0 + with QtCore.QMutexLocker(self._lock): + while True: + # self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format( + # ms, threading.get_ident())) + status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) + if status is None: + break + ms = 0 - err, transferred, key, address = status - try: - f, ov, obj, callback = self._cache.pop(address) - except KeyError: - # key is either zero, or it is used to return a pipe - # handle which should be closed to avoid a leak. - if key not in (0, _overlapped.INVALID_HANDLE_VALUE): - _winapi.CloseHandle(key) - continue - - if obj in self._stopped_serving: - f.cancel() - # Futures might already be resolved or cancelled - elif not f.done(): - self.__events.append((f, callback, transferred, key, ov)) + err, transferred, key, address = status + try: + f, ov, obj, callback = self._cache.pop(address) + except KeyError: + # key is either zero, or it is used to return a pipe + # handle which should be closed to avoid a leak. + if key not in (0, _overlapped.INVALID_HANDLE_VALUE): + _winapi.CloseHandle(key) + continue + + if obj in self._stopped_serving: + f.cancel() + # Futures might already be resolved or cancelled + elif not f.done(): + self.__events.append((f, callback, transferred, key, ov)) # Remove unregistered futures for ov in self._unregistered: @@ -153,9 +179,11 @@ def __init__(self, proactor, parent): self.__stop = False self.__proactor = proactor self.__sig_events = parent.sig_events + self.__semaphore = QtCore.QSemaphore() def start(self): super().start() + self.__semaphore.acquire() def stop(self): self.__stop = True @@ -164,6 +192,7 @@ def stop(self): def run(self): self._logger.debug("Thread started") + self.__semaphore.release() while not self.__stop: events = self.__proactor.select(0.01) diff --git a/tests/test_qeventloop.py b/tests/test_qeventloop.py index 2c076cd..68ba88b 100644 --- a/tests/test_qeventloop.py +++ b/tests/test_qeventloop.py @@ -555,14 +555,6 @@ def test_regression_bug13(loop, sock_pair): c_sock, s_sock = sock_pair client_done, server_done = asyncio.Future(), asyncio.Future() - if os.name == "nt": - # On Windows, `loop.add_reader` and `loop.add_writer` - # are not supported by Python's `asyncio` due to platform limitations. - # Though `qasync` does provide those methods on Windows, - # it doesn't guarantee safety against race conditions like on Unix. - # https://docs.python.org/3/library/asyncio-platforms.html - return - async def server_coro(): s_reader, s_writer = await asyncio.open_connection(sock=s_sock) From 3055d63a0d71dccbd0b37cc50686306de3b0b620 Mon Sep 17 00:00:00 2001 From: DongHyun Kim Date: Sun, 19 Nov 2023 04:48:17 +0900 Subject: [PATCH 2/3] Make performant on Windows, while keeping mutexes --- qasync/_windows.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/qasync/_windows.py b/qasync/_windows.py index e11a027..b90747c 100644 --- a/qasync/_windows.py +++ b/qasync/_windows.py @@ -140,15 +140,13 @@ def _poll(self, timeout=None): if ms >= UINT32_MAX: raise ValueError("timeout too big") - with QtCore.QMutexLocker(self._lock): - while True: - # self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format( - # ms, threading.get_ident())) - status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) - if status is None: - break - ms = 0 + while True: + status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) + if status is None: + break + ms = 0 + with QtCore.QMutexLocker(self._lock): err, transferred, key, address = status try: f, ov, obj, callback = self._cache.pop(address) From 4ddde58a29d951e6709a5079f24857cf46c8cfde Mon Sep 17 00:00:00 2001 From: DongHyun Kim Date: Sun, 19 Nov 2023 04:54:12 +0900 Subject: [PATCH 3/3] Organize pytest a bit --- pyproject.toml | 3 +++ qasync/__init__.py | 6 +++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 1689732..acf6949 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,3 +51,6 @@ build-backend = "poetry.core.masonry.api" [tool.pytest] addopts = "-n auto" testpaths = ["tests"] + +[tool.pytest.ini_options] +markers = ["raises"] diff --git a/qasync/__init__.py b/qasync/__init__.py index d9dc8aa..3363e97 100644 --- a/qasync/__init__.py +++ b/qasync/__init__.py @@ -373,10 +373,10 @@ def run_forever(self): self.__log_debug("Starting Qt event loop") asyncio.events._set_running_loop(self) rslt = -1 - if hasattr(self.__app, "exec_"): - rslt = self.__app.exec_() - else: + if hasattr(self.__app, "exec"): rslt = self.__app.exec() + else: + rslt = self.__app.exec_() self.__log_debug("Qt event loop ended with result %s", rslt) return rslt finally: