Skip to content

Commit

Permalink
use libc.usleep for better precision
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Aug 27, 2024
1 parent aa66542 commit 22fb9a2
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# pylint: disable=logging-fstring-interpolation

import ctypes
import logging
import logging.handlers
import multiprocessing
Expand All @@ -20,18 +21,20 @@
from logprep.util.configuration import Configuration
from logprep.util.logging import LogprepMPQueueListener, logqueue

libc = ctypes.CDLL("libc.so.6")

logger = logging.getLogger("Manager")


class ThrottlingQueue(multiprocessing.queues.Queue):
"""A queue that throttles the number of items that can be put into it."""

wait_time = 0.0000000000000000001
wait_time = 5

@property
def consumed_percent(self) -> int:
"""Return the percentage of items consumed."""
return int(self.qsize() / self.capacity * 100)
return int((self.qsize() / self.capacity) * 100)

def __init__(self, ctx, maxsize):
super().__init__(ctx=ctx, maxsize=maxsize)
Expand All @@ -40,12 +43,16 @@ def __init__(self, ctx, maxsize):

def throttle(self, batch_size=1):
"""Throttle put by sleeping."""
time.sleep((self.wait_time**self.consumed_percent) / batch_size)
if self.consumed_percent > 90:
sleep_time = max(
self.wait_time, int(self.wait_time * self.consumed_percent / batch_size)
)
# sleep times in microseconds
libc.usleep(sleep_time)

def put(self, obj, block=True, timeout=None, batch_size=1):
"""Put an obj into the queue."""
if self.consumed_percent >= 90:
self.throttle(batch_size)
self.throttle(batch_size)
super().put(obj, block=block, timeout=timeout)


Expand Down

0 comments on commit 22fb9a2

Please sign in to comment.