Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore mutex without compromising performance, on Windows #104

Merged
merged 3 commits into from
Nov 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 74 additions & 47 deletions qasync/_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class _IocpProactor(windows_events.IocpProactor):
def __init__(self):
self.__events = []
super(_IocpProactor, self).__init__()
self._lock = QtCore.QMutex()

def select(self, timeout=None):
"""Override in order to handle events in a threadsafe manner."""
Expand All @@ -71,36 +72,60 @@ def select(self, timeout=None):
return tmp

def close(self):
if self._iocp is None:
# already closed
return

# Cancel remaining registered operations.
for fut, ov, obj, callback in list(self._cache.values()):
if fut.cancelled():
# Nothing to do with cancelled futures
pass
elif isinstance(fut, windows_events._WaitCancelFuture):
# _WaitCancelFuture must not be cancelled
pass
else:
try:
fut.cancel()
except OSError as exc:
if self._loop is not None:
context = {
"message": "Cancelling a future failed",
"exception": exc,
"future": fut,
}
if fut._source_traceback:
context["source_traceback"] = fut._source_traceback
self._loop.call_exception_handler(context)

self._results = []

_winapi.CloseHandle(self._iocp)
self._iocp = None
self._logger.debug("Closing")
super(_IocpProactor, self).close()

# Wrap all I/O submission methods to acquire the internal lock first; listed
# in the order they appear in the base class source code.

def recv(self, conn, nbytes, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).recv(conn, nbytes, flags)

def recv_into(self, conn, buf, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).recv_into(conn, buf, flags)

def recvfrom(self, conn, nbytes, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).recvfrom(conn, nbytes, flags)

def recvfrom_into(self, conn, buf, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).recvfrom_into(conn, buf, flags)

def sendto(self, conn, buf, flags=0, addr=None):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).sendto(conn, buf, flags, addr)

def send(self, conn, buf, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).send(conn, buf, flags)

def accept(self, listener):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).accept(listener)

def connect(self, conn, address):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).connect(conn, address)

def sendfile(self, sock, file, offset, count):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).sendfile(sock, file, offset, count)

def accept_pipe(self, pipe):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).accept_pipe(pipe)

# connect_pipe() does not actually use the delayed completion machinery.

# This takes care of wait_for_handle() too.
def _wait_for_handle(self, handle, timeout, _is_cancel):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self)._wait_for_handle(
handle, timeout, _is_cancel
)

def _poll(self, timeout=None):
"""Override in order to handle events in a threadsafe manner."""
Expand All @@ -116,28 +141,27 @@ def _poll(self, timeout=None):
raise ValueError("timeout too big")

while True:
# self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format(
# ms, threading.get_ident()))
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
if status is None:
break
ms = 0

err, transferred, key, address = status
try:
f, ov, obj, callback = self._cache.pop(address)
except KeyError:
# key is either zero, or it is used to return a pipe
# handle which should be closed to avoid a leak.
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
_winapi.CloseHandle(key)
continue

if obj in self._stopped_serving:
f.cancel()
# Futures might already be resolved or cancelled
elif not f.done():
self.__events.append((f, callback, transferred, key, ov))
with QtCore.QMutexLocker(self._lock):
err, transferred, key, address = status
try:
f, ov, obj, callback = self._cache.pop(address)
except KeyError:
# key is either zero, or it is used to return a pipe
# handle which should be closed to avoid a leak.
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
_winapi.CloseHandle(key)
continue

if obj in self._stopped_serving:
f.cancel()
# Futures might already be resolved or cancelled
elif not f.done():
self.__events.append((f, callback, transferred, key, ov))

# Remove unregistered futures
for ov in self._unregistered:
Expand All @@ -153,9 +177,11 @@ def __init__(self, proactor, parent):
self.__stop = False
self.__proactor = proactor
self.__sig_events = parent.sig_events
self.__semaphore = QtCore.QSemaphore()

def start(self):
super().start()
self.__semaphore.acquire()

def stop(self):
self.__stop = True
Expand All @@ -164,6 +190,7 @@ def stop(self):

def run(self):
self._logger.debug("Thread started")
self.__semaphore.release()

while not self.__stop:
events = self.__proactor.select(0.01)
Expand Down
8 changes: 0 additions & 8 deletions tests/test_qeventloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,14 +555,6 @@ def test_regression_bug13(loop, sock_pair):
c_sock, s_sock = sock_pair
client_done, server_done = asyncio.Future(), asyncio.Future()

if os.name == "nt":
# On Windows, `loop.add_reader` and `loop.add_writer`
# are not supported by Python's `asyncio` due to platform limitations.
# Though `qasync` does provide those methods on Windows,
# it doesn't guarantee safety against race conditions like on Unix.
# https://docs.python.org/3/library/asyncio-platforms.html
return

async def server_coro():
s_reader, s_writer = await asyncio.open_connection(sock=s_sock)

Expand Down