Skip to content

Commit

Permalink
dispatcher: lift pipe buffer size restriction
Browse files Browse the repository at this point in the history
A task queue dispatcher puts all the tasks to the task queue at startup.
Then workers are started and are taking the tasks from it.

If there are many tasks in a task group (which roughly corresponds to a
test suite), we can reach the pipe buffer size on putting into the
queue, because `multiprocessing.SimpleQueue` uses a pipe under the hood.

The solution is to use `multiprocessing.Queue`, which has an
intermediate buffer before the underlying pipe and writes to the pipe in
a background thread, without blocking a thread that calls
`<queue>.put()`.

The `Queue` API is a superset of the `SimpleQueue` API, so we can just
replace the implementation.

Let's also use `Queue` for the worker's output queue to be on the safe
side and for consistency.

Fixes #287
  • Loading branch information
Totktonada committed May 29, 2024
1 parent 1037299 commit 584c128
Showing 1 changed file with 11 additions and 18 deletions.
29 changes: 11 additions & 18 deletions dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,23 @@

import multiprocessing

# SimpleQueue is available from multiprocessing.queues on
# all Python versions known at the moment of writting the code
# (up to 3.9).
#
# It was additionally exposed directly from the multiprocessing
# module since Python 3.3 ([1]).
# Queue is available from multiprocessing.queues on all Python
# versions known at the moment of writting the code (up to 3.12).
#
# However the mandatory argument 'ctx'
# (see multiprocessing.get_context()) was added to the constructor
# of SimpleQueue from multiprocessing.queues since Python 3.4
# ([2]).
# of Queue from multiprocessing.queues since Python 3.4 ([1]).
#
# So we should import SimpleQueue from multiprocessing on
# Python 3.3+ (and must to do so on Python 3.4+) to uniformly
# instantiate it (without constructor arguments).
# So we should import Queue from multiprocessing on Python 3.4+
# to uniformly instantiate it (without constructor arguments).
#
# [1]: https://bugs.python.org/issue11836
# [2]: https://bugs.python.org/issue18999
# [1]: https://bugs.python.org/issue18999
try:
# Python 3.3+
from multiprocessing import SimpleQueue
# Python 3.4+
from multiprocessing import Queue
except ImportError:
# Python 2
from multiprocessing.queues import SimpleQueue
from multiprocessing.queues import Queue

from lib import Options
from lib.sampler import sampler
Expand Down Expand Up @@ -363,8 +356,8 @@ def __init__(self, key, task_group, randomize):
random.shuffle(self.task_ids)
else:
self.randomize = False
self.result_queue = SimpleQueue()
self.task_queue = SimpleQueue()
self.result_queue = Queue()
self.task_queue = Queue()

# Don't expose queues file descriptors over Popen to, say, tarantool
# running tests.
Expand Down

0 comments on commit 584c128

Please sign in to comment.