Skip to content

Commit

Permalink
pythongh-105829: Fix concurrent.futures.ProcessPoolExecutor deadlock (p…
Browse files Browse the repository at this point in the history
…ython#108513)

This fixes issue python#105829, python#105829

Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
Co-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: Chris Withers <[email protected]>
Co-authored-by: Thomas Moreau <[email protected]>
  • Loading branch information
5 people authored and csm10495 committed Sep 28, 2023
1 parent 1a54072 commit 9a05c03
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 4 deletions.
18 changes: 15 additions & 3 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ def __init__(self):
self._reader, self._writer = mp.Pipe(duplex=False)

def close(self):
# Please note that we do not take the shutdown lock when
# calling clear() (to avoid deadlocking) so this method can
# only be called safely from the same thread as all calls to
# clear() even if you hold the shutdown lock. Otherwise we
# might try to read from the closed pipe.
if not self._closed:
self._closed = True
self._writer.close()
Expand Down Expand Up @@ -426,8 +431,12 @@ def wait_result_broken_or_wakeup(self):
elif wakeup_reader in ready:
is_broken = False

with self.shutdown_lock:
self.thread_wakeup.clear()
# No need to hold the _shutdown_lock here because:
# 1. we're the only thread to use the wakeup reader
# 2. we're also the only thread to call thread_wakeup.close()
# 3. we want to avoid a possible deadlock when both reader and writer
# would block (gh-105829)
self.thread_wakeup.clear()

return result_item, is_broken, cause

Expand Down Expand Up @@ -717,7 +726,10 @@ def __init__(self, max_workers=None, mp_context=None,
# as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired.
#
# _shutdown_lock must be locked to access _ThreadWakeup.
# _shutdown_lock must be locked to access _ThreadWakeup.close() and
# .wakeup(). Care must also be taken to not call clear or close from
# more than one thread since _ThreadWakeup.clear() is not protected by
# the _shutdown_lock
self._executor_manager_thread_wakeup = _ThreadWakeup()

# Create communication channels for the executor
Expand Down
72 changes: 71 additions & 1 deletion Lib/test/test_concurrent_futures/test_deadlock.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import contextlib
import queue
import signal
import sys
import time
import unittest
import unittest.mock
from pickle import PicklingError
from concurrent import futures
from concurrent.futures.process import BrokenProcessPool
from concurrent.futures.process import BrokenProcessPool, _ThreadWakeup

from test import support

Expand Down Expand Up @@ -241,6 +244,73 @@ def test_crash_big_data(self):

executor.shutdown(wait=True)

def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self):
# Issue #105829: The _ExecutorManagerThread wakeup pipe could
# fill up and block. See: https://github.com/python/cpython/issues/105829

# Lots of cargo culting while writing this test, apologies if
# something is really stupid...

self.executor.shutdown(wait=True)

if not hasattr(signal, 'alarm'):
raise unittest.SkipTest(
"Tested platform does not support the alarm signal")

def timeout(_signum, _frame):
import faulthandler
faulthandler.dump_traceback()

raise RuntimeError("timed out while submitting jobs?")

thread_run = futures.process._ExecutorManagerThread.run
def mock_run(self):
# Delay thread startup so the wakeup pipe can fill up and block
time.sleep(3)
thread_run(self)

class MockWakeup(_ThreadWakeup):
"""Mock wakeup object to force the wakeup to block"""
def __init__(self):
super().__init__()
self._dummy_queue = queue.Queue(maxsize=1)

def wakeup(self):
self._dummy_queue.put(None, block=True)
super().wakeup()

def clear(self):
try:
while True:
self._dummy_queue.get_nowait()
except queue.Empty:
super().clear()

with (unittest.mock.patch.object(futures.process._ExecutorManagerThread,
'run', mock_run),
unittest.mock.patch('concurrent.futures.process._ThreadWakeup',
MockWakeup)):
with self.executor_type(max_workers=2,
mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock

job_num = 100
job_data = range(job_num)

# Need to use sigalarm for timeout detection because
# Executor.submit is not guarded by any timeout (both
# self._work_ids.put(self._queue_count) and
# self._executor_manager_thread_wakeup.wakeup() might
# timeout, maybe more?). In this specific case it was
# the wakeup call that deadlocked on a blocking pipe.
old_handler = signal.signal(signal.SIGALRM, timeout)
try:
signal.alarm(int(self.TIMEOUT))
self.assertEqual(job_num, len(list(executor.map(int, job_data))))
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)


create_executor_tests(globals(), ExecutorDeadlockTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix concurrent.futures.ProcessPoolExecutor deadlock

0 comments on commit 9a05c03

Please sign in to comment.