Skip to content

Commit 99dc18b

Browse files
authored
Merge pull request #104 from temeddix/restore-mutex
Restore mutex without compromising performance, on Windows
2 parents 1e26094 + 4ddde58 commit 99dc18b

File tree

2 files changed

+74
-55
lines changed

2 files changed

+74
-55
lines changed

qasync/_windows.py

Lines changed: 74 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class _IocpProactor(windows_events.IocpProactor):
6161
def __init__(self):
6262
self.__events = []
6363
super(_IocpProactor, self).__init__()
64+
self._lock = QtCore.QMutex()
6465

6566
def select(self, timeout=None):
6667
"""Override in order to handle events in a threadsafe manner."""
@@ -71,36 +72,60 @@ def select(self, timeout=None):
7172
return tmp
7273

7374
def close(self):
74-
if self._iocp is None:
75-
# already closed
76-
return
77-
78-
# Cancel remaining registered operations.
79-
for fut, ov, obj, callback in list(self._cache.values()):
80-
if fut.cancelled():
81-
# Nothing to do with cancelled futures
82-
pass
83-
elif isinstance(fut, windows_events._WaitCancelFuture):
84-
# _WaitCancelFuture must not be cancelled
85-
pass
86-
else:
87-
try:
88-
fut.cancel()
89-
except OSError as exc:
90-
if self._loop is not None:
91-
context = {
92-
"message": "Cancelling a future failed",
93-
"exception": exc,
94-
"future": fut,
95-
}
96-
if fut._source_traceback:
97-
context["source_traceback"] = fut._source_traceback
98-
self._loop.call_exception_handler(context)
99-
100-
self._results = []
101-
102-
_winapi.CloseHandle(self._iocp)
103-
self._iocp = None
75+
self._logger.debug("Closing")
76+
super(_IocpProactor, self).close()
77+
78+
# Wrap all I/O submission methods to acquire the internal lock first; listed
79+
# in the order they appear in the base class source code.
80+
81+
def recv(self, conn, nbytes, flags=0):
82+
with QtCore.QMutexLocker(self._lock):
83+
return super(_IocpProactor, self).recv(conn, nbytes, flags)
84+
85+
def recv_into(self, conn, buf, flags=0):
86+
with QtCore.QMutexLocker(self._lock):
87+
return super(_IocpProactor, self).recv_into(conn, buf, flags)
88+
89+
def recvfrom(self, conn, nbytes, flags=0):
90+
with QtCore.QMutexLocker(self._lock):
91+
return super(_IocpProactor, self).recvfrom(conn, nbytes, flags)
92+
93+
def recvfrom_into(self, conn, buf, flags=0):
94+
with QtCore.QMutexLocker(self._lock):
95+
return super(_IocpProactor, self).recvfrom_into(conn, buf, flags)
96+
97+
def sendto(self, conn, buf, flags=0, addr=None):
98+
with QtCore.QMutexLocker(self._lock):
99+
return super(_IocpProactor, self).sendto(conn, buf, flags, addr)
100+
101+
def send(self, conn, buf, flags=0):
102+
with QtCore.QMutexLocker(self._lock):
103+
return super(_IocpProactor, self).send(conn, buf, flags)
104+
105+
def accept(self, listener):
106+
with QtCore.QMutexLocker(self._lock):
107+
return super(_IocpProactor, self).accept(listener)
108+
109+
def connect(self, conn, address):
110+
with QtCore.QMutexLocker(self._lock):
111+
return super(_IocpProactor, self).connect(conn, address)
112+
113+
def sendfile(self, sock, file, offset, count):
114+
with QtCore.QMutexLocker(self._lock):
115+
return super(_IocpProactor, self).sendfile(sock, file, offset, count)
116+
117+
def accept_pipe(self, pipe):
118+
with QtCore.QMutexLocker(self._lock):
119+
return super(_IocpProactor, self).accept_pipe(pipe)
120+
121+
# connect_pipe() does not actually use the delayed completion machinery.
122+
123+
# This takes care of wait_for_handle() too.
124+
def _wait_for_handle(self, handle, timeout, _is_cancel):
125+
with QtCore.QMutexLocker(self._lock):
126+
return super(_IocpProactor, self)._wait_for_handle(
127+
handle, timeout, _is_cancel
128+
)
104129

105130
def _poll(self, timeout=None):
106131
"""Override in order to handle events in a threadsafe manner."""
@@ -116,28 +141,27 @@ def _poll(self, timeout=None):
116141
raise ValueError("timeout too big")
117142

118143
while True:
119-
# self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format(
120-
# ms, threading.get_ident()))
121144
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
122145
if status is None:
123146
break
124147
ms = 0
125148

126-
err, transferred, key, address = status
127-
try:
128-
f, ov, obj, callback = self._cache.pop(address)
129-
except KeyError:
130-
# key is either zero, or it is used to return a pipe
131-
# handle which should be closed to avoid a leak.
132-
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
133-
_winapi.CloseHandle(key)
134-
continue
135-
136-
if obj in self._stopped_serving:
137-
f.cancel()
138-
# Futures might already be resolved or cancelled
139-
elif not f.done():
140-
self.__events.append((f, callback, transferred, key, ov))
149+
with QtCore.QMutexLocker(self._lock):
150+
err, transferred, key, address = status
151+
try:
152+
f, ov, obj, callback = self._cache.pop(address)
153+
except KeyError:
154+
# key is either zero, or it is used to return a pipe
155+
# handle which should be closed to avoid a leak.
156+
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
157+
_winapi.CloseHandle(key)
158+
continue
159+
160+
if obj in self._stopped_serving:
161+
f.cancel()
162+
# Futures might already be resolved or cancelled
163+
elif not f.done():
164+
self.__events.append((f, callback, transferred, key, ov))
141165

142166
# Remove unregistered futures
143167
for ov in self._unregistered:
@@ -153,9 +177,11 @@ def __init__(self, proactor, parent):
153177
self.__stop = False
154178
self.__proactor = proactor
155179
self.__sig_events = parent.sig_events
180+
self.__semaphore = QtCore.QSemaphore()
156181

157182
def start(self):
158183
super().start()
184+
self.__semaphore.acquire()
159185

160186
def stop(self):
161187
self.__stop = True
@@ -164,6 +190,7 @@ def stop(self):
164190

165191
def run(self):
166192
self._logger.debug("Thread started")
193+
self.__semaphore.release()
167194

168195
while not self.__stop:
169196
events = self.__proactor.select(0.01)

tests/test_qeventloop.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -555,14 +555,6 @@ def test_regression_bug13(loop, sock_pair):
555555
c_sock, s_sock = sock_pair
556556
client_done, server_done = asyncio.Future(), asyncio.Future()
557557

558-
if os.name == "nt":
559-
# On Windows, `loop.add_reader` and `loop.add_writer`
560-
# are not supported by Python's `asyncio` due to platform limitations.
561-
# Though `qasync` does provide those methods on Windows,
562-
# it doesn't guarantee safety against race conditions like on Unix.
563-
# https://docs.python.org/3/library/asyncio-platforms.html
564-
return
565-
566558
async def server_coro():
567559
s_reader, s_writer = await asyncio.open_connection(sock=s_sock)
568560

0 commit comments

Comments
 (0)