diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index 39a9f7048..7b374e78e 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -2,6 +2,7 @@ # pylint: disable=logging-fstring-interpolation +import ctypes import logging import logging.handlers import multiprocessing @@ -20,18 +21,20 @@ from logprep.util.configuration import Configuration from logprep.util.logging import LogprepMPQueueListener, logqueue +libc = ctypes.CDLL("libc.so.6") + logger = logging.getLogger("Manager") class ThrottlingQueue(multiprocessing.queues.Queue): """A queue that throttles the number of items that can be put into it.""" - wait_time = 0.0000000000000000001 + wait_time = 5 @property def consumed_percent(self) -> int: """Return the percentage of items consumed.""" - return int(self.qsize() / self.capacity * 100) + return int((self.qsize() / self.capacity) * 100) def __init__(self, ctx, maxsize): super().__init__(ctx=ctx, maxsize=maxsize) @@ -40,12 +43,16 @@ def __init__(self, ctx, maxsize): def throttle(self, batch_size=1): """Throttle put by sleeping.""" - time.sleep((self.wait_time**self.consumed_percent) / batch_size) + if self.consumed_percent > 90: + sleep_time = max( + self.wait_time, int(self.wait_time * self.consumed_percent / batch_size) + ) + # sleep times in microseconds + libc.usleep(sleep_time) def put(self, obj, block=True, timeout=None, batch_size=1): """Put an obj into the queue.""" - if self.consumed_percent >= 90: - self.throttle(batch_size) + self.throttle(batch_size) super().put(obj, block=block, timeout=timeout)