Skip to content

Commit

Permalink
Fix the slow performance on Windows (#101)
Browse files Browse the repository at this point in the history
* Remove unneeded mutex and semaphore

* Remove unneeded methods

* Remove more methods

* Remove Windows-specific code

* Revert "Remove Windows-specific code"

This reverts commit b0ff584.

* Restore semaphore

* Revert "Restore semaphore"

This reverts commit abac70a.

* Do not perform regression test on Windows

* Use `exec()` over `exec_()`

* Fix hanging

* Add comments

* Register a pytest marker to avoid warning
  • Loading branch information
temeddix authored Nov 16, 2023
1 parent ee9b0a3 commit c91a915
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 84 deletions.
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,6 @@ build-backend = "poetry.core.masonry.api"
[tool.pytest]
addopts = "-n auto"
testpaths = ["tests"]

[tool.pytest.ini_options]
markers = ["raises"]
6 changes: 3 additions & 3 deletions qasync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,10 @@ def run_forever(self):
self.__log_debug("Starting Qt event loop")
asyncio.events._set_running_loop(self)
rslt = -1
if hasattr(self.__app, "exec_"):
rslt = self.__app.exec_()
else:
if hasattr(self.__app, "exec"):
rslt = self.__app.exec()
else:
rslt = self.__app.exec_()
self.__log_debug("Qt event loop ended with result %s", rslt)
return rslt
finally:
Expand Down
133 changes: 52 additions & 81 deletions qasync/_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ class _IocpProactor(windows_events.IocpProactor):
def __init__(self):
self.__events = []
super(_IocpProactor, self).__init__()
self._lock = QtCore.QMutex()

def select(self, timeout=None):
"""Override in order to handle events in a threadsafe manner."""
Expand All @@ -72,60 +71,36 @@ def select(self, timeout=None):
return tmp

def close(self):
self._logger.debug("Closing")
super(_IocpProactor, self).close()

# Wrap all I/O submission methods to acquire the internal lock first; listed
# in the order they appear in the base class source code.

def recv(self, conn, nbytes, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).recv(conn, nbytes, flags)

def recv_into(self, conn, buf, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).recv_into(conn, buf, flags)

def recvfrom(self, conn, nbytes, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).recvfrom(conn, nbytes, flags)

def recvfrom_into(self, conn, buf, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).recvfrom_into(conn, buf, flags)

def sendto(self, conn, buf, flags=0, addr=None):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).sendto(conn, buf, flags, addr)

def send(self, conn, buf, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).send(conn, buf, flags)

def accept(self, listener):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).accept(listener)

def connect(self, conn, address):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).connect(conn, address)

def sendfile(self, sock, file, offset, count):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).sendfile(sock, file, offset, count)

def accept_pipe(self, pipe):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).accept_pipe(pipe)

# connect_pipe() does not actually use the delayed completion machinery.

# This takes care of wait_for_handle() too.
def _wait_for_handle(self, handle, timeout, _is_cancel):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self)._wait_for_handle(
handle, timeout, _is_cancel
)
if self._iocp is None:
# already closed
return

# Cancel remaining registered operations.
for fut, ov, obj, callback in list(self._cache.values()):
if fut.cancelled():
# Nothing to do with cancelled futures
pass
elif isinstance(fut, windows_events._WaitCancelFuture):
# _WaitCancelFuture must not be cancelled
pass
else:
try:
fut.cancel()
except OSError as exc:
if self._loop is not None:
context = {
"message": "Cancelling a future failed",
"exception": exc,
"future": fut,
}
if fut._source_traceback:
context["source_traceback"] = fut._source_traceback
self._loop.call_exception_handler(context)

self._results = []

_winapi.CloseHandle(self._iocp)
self._iocp = None

def _poll(self, timeout=None):
"""Override in order to handle events in a threadsafe manner."""
Expand All @@ -140,30 +115,29 @@ def _poll(self, timeout=None):
if ms >= UINT32_MAX:
raise ValueError("timeout too big")

with QtCore.QMutexLocker(self._lock):
while True:
# self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format(
# ms, threading.get_ident()))
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
if status is None:
break
ms = 0
while True:
# self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format(
# ms, threading.get_ident()))
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
if status is None:
break
ms = 0

err, transferred, key, address = status
try:
f, ov, obj, callback = self._cache.pop(address)
except KeyError:
# key is either zero, or it is used to return a pipe
# handle which should be closed to avoid a leak.
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
_winapi.CloseHandle(key)
continue

if obj in self._stopped_serving:
f.cancel()
# Futures might already be resolved or cancelled
elif not f.done():
self.__events.append((f, callback, transferred, key, ov))
err, transferred, key, address = status
try:
f, ov, obj, callback = self._cache.pop(address)
except KeyError:
# key is either zero, or it is used to return a pipe
# handle which should be closed to avoid a leak.
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
_winapi.CloseHandle(key)
continue

if obj in self._stopped_serving:
f.cancel()
# Futures might already be resolved or cancelled
elif not f.done():
self.__events.append((f, callback, transferred, key, ov))

# Remove unregistered futures
for ov in self._unregistered:
Expand All @@ -179,11 +153,9 @@ def __init__(self, proactor, parent):
self.__stop = False
self.__proactor = proactor
self.__sig_events = parent.sig_events
self.__semaphore = QtCore.QSemaphore()

def start(self):
super().start()
self.__semaphore.acquire()

def stop(self):
self.__stop = True
Expand All @@ -192,7 +164,6 @@ def stop(self):

def run(self):
self._logger.debug("Thread started")
self.__semaphore.release()

while not self.__stop:
events = self.__proactor.select(0.01)
Expand Down
8 changes: 8 additions & 0 deletions tests/test_qeventloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,14 @@ def test_regression_bug13(loop, sock_pair):
c_sock, s_sock = sock_pair
client_done, server_done = asyncio.Future(), asyncio.Future()

if os.name == "nt":
# On Windows, `loop.add_reader` and `loop.add_writer`
# are not supported by Python's `asyncio` due to platform limitations.
# Though `qasync` does provide those methods on Windows,
# it doesn't guarantee safety against race conditions like on Unix.
# https://docs.python.org/3/library/asyncio-platforms.html
return

async def server_coro():
s_reader, s_writer = await asyncio.open_connection(sock=s_sock)

Expand Down

0 comments on commit c91a915

Please sign in to comment.