Skip to content

Commit

Permalink
Add some tests
Browse files Browse the repository at this point in the history
  • Loading branch information
csm10495 committed Dec 17, 2024
1 parent 3bf5464 commit 4b285b8
Showing 1 changed file with 63 additions and 0 deletions.
63 changes: 63 additions & 0 deletions Lib/test/test_concurrent_futures/test_process_pool.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import multiprocessing
import os
import queue
import sys
import threading
import time
import unittest
import unittest.mock
from concurrent import futures
from concurrent.futures.process import BrokenProcessPool

Expand All @@ -22,6 +25,12 @@ def __init__(self, mgr):
def __del__(self):
self.event.set()

def _put_sleep_put(queue):
""" Used as part of test_process_pool_executor_terminate_workers """
queue.put('started')
time.sleep(2)
queue.put('finished')


class ProcessPoolExecutorTest(ExecutorTest):

Expand Down Expand Up @@ -218,6 +227,60 @@ def mock_start_new_thread(func, *args, **kwargs):
list(executor.map(mul, [(2, 3)] * 10))
executor.shutdown()

def test_process_pool_executor_terminate_workers(self):
manager = multiprocessing.Manager()
q = manager.Queue()

with futures.ProcessPoolExecutor(max_workers=1) as executor:
executor.submit(_put_sleep_put, q)

# We should get started, but not finished since we'll terminate the workers just after
self.assertEqual(q.get(timeout=1), 'started')

executor.terminate_workers()

try:
q.get(timeout=1)
raise RuntimeError("Queue should not have gotten a second value")
except queue.Empty:
pass

def test_process_pool_executor_terminate_workers_dead_workers(self):
with futures.ProcessPoolExecutor(max_workers=1) as executor:
try:
executor.submit(os._exit, 1).result()
except BrokenProcessPool:
# BrokenProcessPool will be raised by our call to .result() since the worker will die
pass

# The worker has been killed already, terminate_workers should basically no-op
executor.terminate_workers()

def test_process_pool_executor_terminate_workers_not_started_yet(self):
with futures.ProcessPoolExecutor(max_workers=1) as executor:
# The worker has not been started yet, terminate_workers should basically no-op
executor.terminate_workers()

def test_process_pool_executor_terminate_workers_stops_pool(self):
with futures.ProcessPoolExecutor(max_workers=1) as executor:
executor.submit(time.sleep, 0).result()

executor.terminate_workers()

try:
executor.submit(time.sleep, 0).result()
raise RuntimeError("Should have raised BrokenProcessPool")
except BrokenProcessPool:
pass

@unittest.mock.patch('concurrent.futures.process.os.kill')
def test_process_pool_executor_terminate_workers_passes_signal(self, mock_kill):
with futures.ProcessPoolExecutor(max_workers=1) as executor:
executor.submit(time.sleep, 0).result()

executor.terminate_workers(9)
mock_kill.assert_called_once_with(list(executor._processes.values())[0].pid, 9)


create_executor_tests(globals(), ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down

0 comments on commit 4b285b8

Please sign in to comment.