Skip to content

Commit

Permalink
Internally call shutdown to prevent a resource leak when calling term…
Browse files Browse the repository at this point in the history
…inate_workers
  • Loading branch information
csm10495 committed Dec 20, 2024
1 parent a878221 commit 794ee25
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 9 deletions.
7 changes: 4 additions & 3 deletions Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -419,11 +419,12 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.

Attempt to terminate all living worker processes immediately by sending
each of them the given signal. If the signal is not specified, the default
signal :data:`signal.SIGTERM` is used.
signal :data:`signal.SIGTERM` is used. Internally, it will also call
:meth:`Executor.shutdown` to ensure that all other resources associated with
the executor are freed.

After calling this method the caller should no longer submit tasks to the
executor. It is also recommended to still call :meth:`Executor.shutdown`
to ensure that all other resources associated with the executor are freed.
executor.

.. versionadded:: next

Expand Down
12 changes: 10 additions & 2 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,10 +870,18 @@ def terminate_workers(self, signal=signal.SIGTERM):
signal: The signal to send to each worker process. Defaults to
signal.SIGTERM.
"""
if not self._processes:
processes = {}
if self._processes:
processes = self._processes.copy()

# shutdown will invalidate ._processes, so we copy it right before calling.
# If we waited here, we would deadlock if a process decides not to exit.
self.shutdown(wait=False, cancel_futures=True)

if not processes:
return

for pid, proc in self._processes.items():
for pid, proc in processes.items():
try:
if not proc.is_alive():
continue
Expand Down
9 changes: 5 additions & 4 deletions Lib/test/test_concurrent_futures/test_process_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ def test_process_pool_executor_terminate_workers_dead_workers(self):
# Patching in here instead of at the function level since we only want
# to patch it for this function call, not other parts of the flow.
with unittest.mock.patch('concurrent.futures.process.os.kill') as mock_kill:
executor.terminate_workers()
with unittest.mock.patch.object(executor, 'shutdown') as mock_shutdown:
executor.terminate_workers()
mock_shutdown.assert_called_once_with(wait=False, cancel_futures=True)

mock_kill.assert_not_called()

Expand All @@ -269,18 +271,17 @@ def test_process_pool_executor_terminate_workers_stops_pool(self):

executor.terminate_workers()

future = executor.submit(time.sleep, 0)
self.assertRaises(BrokenProcessPool, future.result)
self.assertRaises(RuntimeError, executor.submit, time.sleep, 0)

@unittest.mock.patch('concurrent.futures.process.os.kill')
def test_process_pool_executor_terminate_workers_passes_signal(self, mock_kill):
with futures.ProcessPoolExecutor(max_workers=1) as executor:
future = executor.submit(time.sleep, 0)
future.result()

worker_process = list(executor._processes.values())[0]
executor.terminate_workers(signal.SIGABRT)

worker_process = list(executor._processes.values())[0]
mock_kill.assert_called_once_with(worker_process.pid, signal.SIGABRT)

def test_process_pool_executor_terminate_workers_passes_even_bad_signals(self):
Expand Down

0 comments on commit 794ee25

Please sign in to comment.