Skip to content

Commit

Permalink
add throtteling queue
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Aug 16, 2024
1 parent d61552b commit 6becffe
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
8 changes: 6 additions & 2 deletions examples/exampledata/config/http_pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,21 @@ metrics:
server_header: false
date_header: false
workers: 1
ws: none
interface: asgi3
backlog: 16384
timeout_keep_alive: 65
input:
httpinput:
type: http_input
message_backlog_size: 15000000
message_backlog_size: 150000
collect_meta: true
metafield_name: "@metadata"
uvicorn_config:
host: 0.0.0.0
port: 9000
workers: 2
access_log: false
access_log: true
server_header: false
date_header: false
ws: none
Expand Down
20 changes: 19 additions & 1 deletion logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,22 @@
logger = logging.getLogger("Manager")


class ThrottlingQueue(multiprocessing.queues.Queue):
"""A queue that throttles the number of items that can be put into it."""

wait_time_max = 10000000000000000000000000

@property
def wait_time(self) -> float:
return float(self.qsize() * self._maxsize) / self.wait_time_max

def put(self, obj, block=True, timeout=None):
if self.qsize() >= self._maxsize / 2:
# logger.warning("Too many requests, waiting for %s seconds", self.wait_time)
time.sleep(self.wait_time)
super().put(obj, block=block, timeout=timeout)


class PipelineManager:
"""Manage pipelines via multi-processing."""

Expand Down Expand Up @@ -88,7 +104,9 @@ def _set_http_input_queue(self, configuration):
if not is_http_input and HttpInput.messages is not None:
return
message_backlog_size = input_config.get("message_backlog_size", 15000)
HttpInput.messages = multiprocessing.Queue(maxsize=message_backlog_size)
HttpInput.messages = ThrottlingQueue(
maxsize=message_backlog_size, ctx=multiprocessing.get_context()
)

def set_count(self, count: int):
"""Set the pipeline count.
Expand Down

0 comments on commit 6becffe

Please sign in to comment.