From 756783747fab18046dc8ada71e4bf564c06d3bf3 Mon Sep 17 00:00:00 2001 From: Konstantin Neumann Date: Mon, 4 Nov 2024 12:03:45 +0100 Subject: [PATCH] Logprep on Mac Testing --- .gitignore | 1 + examples/exampledata/config/http_pipeline.yml | 3 ++- examples/exampledata/config/pipeline.yml | 4 ++-- logprep/connector/http/input.py | 2 +- logprep/framework/pipeline.py | 2 +- logprep/framework/pipeline_manager.py | 2 +- logprep/runner.py | 4 ++-- logprep/util/logging.py | 6 ++++++ 8 files changed, 16 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index 2fe9ccc0e..403c87af8 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,4 @@ examples/k8s/charts target wheelhouse requirements.* +.DS_Store diff --git a/examples/exampledata/config/http_pipeline.yml b/examples/exampledata/config/http_pipeline.yml index 2bd2df9b3..bb6892b24 100644 --- a/examples/exampledata/config/http_pipeline.yml +++ b/examples/exampledata/config/http_pipeline.yml @@ -3,6 +3,7 @@ process_count: 4 config_refresh_interval: 5 profile_pipelines: false restart_count: 3 +error_backlog_size: 150 logger: level: INFO loggers: @@ -29,7 +30,7 @@ metrics: input: httpinput: type: http_input - message_backlog_size: 1500000 + message_backlog_size: 150 collect_meta: true metafield_name: "@metadata" uvicorn_config: diff --git a/examples/exampledata/config/pipeline.yml b/examples/exampledata/config/pipeline.yml index ce91f79b8..7207ae21d 100644 --- a/examples/exampledata/config/pipeline.yml +++ b/examples/exampledata/config/pipeline.yml @@ -3,9 +3,9 @@ process_count: 2 timeout: 0.1 restart_count: 2 config_refresh_interval: 5 -error_backlog_size: 1500000 +error_backlog_size: 150 logger: - level: INFO + level: DEBUG format: "%(asctime)-15s %(hostname)-5s %(name)-10s %(levelname)-8s: %(message)s" datefmt: "%Y-%m-%d %H:%M:%S" loggers: diff --git a/logprep/connector/http/input.py b/logprep/connector/http/input.py index 2f97f8d9a..8428840b4 100644 --- a/logprep/connector/http/input.py +++ b/logprep/connector/http/input.py @@ -479,7 +479,7 @@ def _get_asgi_app(endpoints_config: dict) -> falcon.asgi.App: def _get_event(self, timeout: float) -> Tuple: """Returns the first message from the queue""" - self.metrics.message_backlog_size += self.messages.qsize() + #self.metrics.message_backlog_size += self.messages.qsize() try: message = self.messages.get(timeout=timeout) raw_message = str(message).encode("utf8") diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index 04bd44a9e..4dc74a294 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -336,7 +336,7 @@ def _drain_input_queues(self) -> None: if not hasattr(self._input, "messages"): return if isinstance(self._input.messages, multiprocessing.queues.Queue): - while self._input.messages.qsize(): + while not self._input.messages.empty(): self.process_pipeline() def stop(self) -> None: diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index a0a3034be..72f41e8a5 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -52,7 +52,7 @@ def throttle(self, batch_size=1): def put(self, obj, block=True, timeout=None, batch_size=1): """Put an obj into the queue.""" - self.throttle(batch_size) + #self.throttle(batch_size) super().put(obj, block=block, timeout=timeout) diff --git a/logprep/runner.py b/logprep/runner.py index 634b6b2dd..62d950813 100644 --- a/logprep/runner.py +++ b/logprep/runner.py @@ -174,8 +174,8 @@ def _iterate(self): self.exit_code = EXITCODES.PIPELINE_ERROR.value self._logger.error("Restart count exceeded. Exiting.") sys.exit(self.exit_code) - if self._manager.error_queue is not None: - self.metrics.number_of_events_in_error_queue += self._manager.error_queue.qsize() + #if self._manager.error_queue is not None: + # self.metrics.number_of_events_in_error_queue += self._manager.error_queue.qsize() self._manager.restart_failed_pipeline() def reload_configuration(self): diff --git a/logprep/util/logging.py b/logprep/util/logging.py index 475baab19..6709743c3 100644 --- a/logprep/util/logging.py +++ b/logprep/util/logging.py @@ -4,6 +4,12 @@ import multiprocessing as mp from logging.handlers import QueueListener from socket import gethostname +import platform +if platform.system() != "Linux": + from multiprocessing import set_start_method + set_start_method("fork") + + logqueue = mp.Queue(-1)