From 584c1283af7f9028ae5dfa93b85e650a7acfe866 Mon Sep 17 00:00:00 2001 From: Alexander Turenko Date: Wed, 29 May 2024 13:29:07 +0300 Subject: [PATCH] dispatcher: lift pipe buffer size restriction A task queue dispatcher puts all the tasks to the task queue at startup. Then workers are started and are taking the tasks from it. If there are many tasks in a task group (which roughly corresponds to a test suite), we can reach the pipe buffer size on putting into the queue, because `multiprocessing.SimpleQueue` uses a pipe under the hood. The solution is to use `multiprocessing.Queue`, which has an intermediate buffer before the underlying pipe and writes to the pipe in a background thread, without blocking a thread that calls `.put()`. The `Queue` API is a superset of the `SimpleQueue` API, so we can just replace the implementation. Let's also use `Queue` for the worker's output queue to be on the safe side and for consistency. Fixes #287 --- dispatcher.py | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/dispatcher.py b/dispatcher.py index c9ce386b..00d4b53a 100644 --- a/dispatcher.py +++ b/dispatcher.py @@ -8,30 +8,23 @@ import multiprocessing -# SimpleQueue is available from multiprocessing.queues on -# all Python versions known at the moment of writting the code -# (up to 3.9). -# -# It was additionally exposed directly from the multiprocessing -# module since Python 3.3 ([1]). +# Queue is available from multiprocessing.queues on all Python +# versions known at the moment of writting the code (up to 3.12). # # However the mandatory argument 'ctx' # (see multiprocessing.get_context()) was added to the constructor -# of SimpleQueue from multiprocessing.queues since Python 3.4 -# ([2]). +# of Queue from multiprocessing.queues since Python 3.4 ([1]). # -# So we should import SimpleQueue from multiprocessing on -# Python 3.3+ (and must to do so on Python 3.4+) to uniformly -# instantiate it (without constructor arguments). +# So we should import Queue from multiprocessing on Python 3.4+ +# to uniformly instantiate it (without constructor arguments). # -# [1]: https://bugs.python.org/issue11836 -# [2]: https://bugs.python.org/issue18999 +# [1]: https://bugs.python.org/issue18999 try: - # Python 3.3+ - from multiprocessing import SimpleQueue + # Python 3.4+ + from multiprocessing import Queue except ImportError: # Python 2 - from multiprocessing.queues import SimpleQueue + from multiprocessing.queues import Queue from lib import Options from lib.sampler import sampler @@ -363,8 +356,8 @@ def __init__(self, key, task_group, randomize): random.shuffle(self.task_ids) else: self.randomize = False - self.result_queue = SimpleQueue() - self.task_queue = SimpleQueue() + self.result_queue = Queue() + self.task_queue = Queue() # Don't expose queues file descriptors over Popen to, say, tarantool # running tests.