diff --git a/mxdx/_mxdx.py b/mxdx/_mxdx.py index 09010de..6b5e62f 100644 --- a/mxdx/_mxdx.py +++ b/mxdx/_mxdx.py @@ -8,6 +8,7 @@ import glob import re import time +from multiprocessing.synchronize import SEM_VALUE_MAX from ._io import IO, MuxFile from ._constants import (INTERLEAVE, R1ONLY, R2ONLY, SEQUENTIAL, @@ -138,7 +139,7 @@ def start(self): """Start the Multiplexing.""" ctx = mp.get_context('spawn') self.buffered_queue = BufferedQueue(ctx) - self.msg_queue = ctx.Queue() + self.msg_queue = ctx.Queue(maxsize=16) reader = ctx.Process(target=self.read) writer = ctx.Process(target=self.write) @@ -176,11 +177,10 @@ class BufferedQueue: To adjust for this, we're increasing the amount of data which an individual queue item can hold. """ - - BUFSIZE = 1024 + BUFSIZE = 128 def __init__(self, ctx): - self._queue = ctx.Queue() + self._queue = ctx.Queue(maxsize=min(32767, SEM_VALUE_MAX)) self._buf = None self._init_buf() @@ -242,7 +242,7 @@ def start(self): """Start the Demultiplexing.""" ctx = mp.get_context('spawn') self.buffered_queue = BufferedQueue(ctx) - self.msg_queue = ctx.Queue() + self.msg_queue = ctx.Queue(maxsize=16) reader = ctx.Process(target=self.read) writer = ctx.Process(target=self.write)