diff --git a/CHANGELOG.md b/CHANGELOG.md index 16796df86..35253ea4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ * Fix writing time measurements into the event after the deleter has deleted the event. The bug only happened when the `metrics.measure_time.append_to_event` configuration was set to `true`. +* Fix memory leak by removing the log aggregation capability + ## v6.8.0 ### Features diff --git a/logprep/run_logprep.py b/logprep/run_logprep.py index c0075cc34..debd22bad 100644 --- a/logprep/run_logprep.py +++ b/logprep/run_logprep.py @@ -16,7 +16,6 @@ from logprep._version import get_versions from logprep.processor.base.rule import Rule from logprep.runner import Runner -from logprep.util.aggregating_logger import AggregatingLogger from logprep.util.auto_rule_tester.auto_rule_corpus_tester import RuleCorpusTester from logprep.util.auto_rule_tester.auto_rule_tester import AutoRuleTester from logprep.util.configuration import Configuration, InvalidConfigurationError @@ -25,6 +24,14 @@ from logprep.util.schema_and_rule_checker import SchemaAndRuleChecker from logprep.util.time_measurement import TimeMeasurement +from logging import ( + getLogger, + basicConfig, + Logger, +) +from logging.handlers import SysLogHandler + + warnings.simplefilter("always", DeprecationWarning) logging.captureWarnings(True) @@ -137,10 +144,15 @@ def get_versions_string(args) -> str: return version_string -def _setup_logger(args, config): +def _setup_logger(args, config: Configuration): try: - AggregatingLogger.setup(config, logger_disabled=args.disable_logging) - logger = AggregatingLogger.create("Logprep") + log_config = config.get("logger", {}) + log_level = log_config.get("level", "INFO") + basicConfig( + level=log_level, format="%(asctime)-15s %(name)-5s %(levelname)-8s: %(message)s" + ) + logger = logging.getLogger("Logprep") + logger.info(f"Log level set to '{log_level}'") for version in get_versions_string(args).split("\n"): logger.info(version) except BaseException as error: # pylint: disable=broad-except diff --git a/logprep/runner.py b/logprep/runner.py index 0f11bd9d9..697764cf8 100644 --- a/logprep/runner.py +++ b/logprep/runner.py @@ -184,6 +184,7 @@ def start(self): self._continue_iterating.value = True self._schedule_config_refresh_job() self._logger.info("Startup complete") + self._logger.debug("Runner iterating") for _ in self._keep_iterating(): self._loop() self.stop() @@ -194,7 +195,6 @@ def start(self): def _loop(self): self.scheduler.run_pending() - self._logger.debug("Runner iterating") self._manager.restart_failed_pipeline() # Note: We are waiting half the timeout because when shutting down, we also have to # wait for the logprep's timeout before the shutdown is actually initiated. diff --git a/logprep/util/aggregating_logger.py b/logprep/util/aggregating_logger.py deleted file mode 100644 index 4d550c248..000000000 --- a/logprep/util/aggregating_logger.py +++ /dev/null @@ -1,96 +0,0 @@ -"""This module create a logger that is able to aggregate log messages.""" - -from logging import ( - getLogger, - CRITICAL, - FATAL, - ERROR, - WARNING, - INFO, - DEBUG, - NOTSET, - basicConfig, - Logger, -) -from logging.handlers import SysLogHandler -from os import path - -from logprep.util.log_aggregator import Aggregator - -name_to_level = { - "CRITICAL": CRITICAL, - "FATAL": FATAL, - "ERROR": ERROR, - "WARN": WARNING, - "WARNING": WARNING, - "INFO": INFO, - "DEBUG": DEBUG, - "NOTSET": NOTSET, -} - - -class AggregatingLogger: - """Used to create logger that aggregates log messages.""" - - logger_config = None - level_str = None - log_level = None - - @classmethod - def setup(cls, config: dict, logger_disabled: bool = False): - """Setup aggregating logger. - - Parameters - ---------- - config : dict - Logprep configuration - logger_disabled : bool - Defines if aggregating loggers are enabled or not - - """ - cls.logger_disabled = logger_disabled - cls.logger_config = config.get("logger", {}) - - cls.level_str = cls.logger_config.get("level", "INFO") - if cls.level_str is None: - cls.level_str = "INFO" - cls.log_level = name_to_level.get(cls.level_str.upper(), INFO) - basicConfig( - level=cls.log_level, format="%(asctime)-15s %(name)-5s %(levelname)-8s: %(message)s" - ) - - Aggregator.count_threshold = cls.logger_config.get("aggregation_threshold", 4) - Aggregator.log_period = cls.logger_config.get("aggregation_period", 30) - Aggregator.start_timer() - - @classmethod - def create(cls, name: str) -> Logger: - """Create aggregating logger. - - Parameters - ---------- - name : str - Name for aggregating logger. - - Returns - ------- - logger : logging.Logger - Logger with aggregating filter - - """ - logger = getLogger(name) - logger.disabled = cls.logger_disabled - - if path.exists("/dev/log"): - logger.handlers = [] - logger.addHandler(SysLogHandler(address="/dev/log")) - - if cls.level_str.upper() not in name_to_level: - logger.info(f"Invalid log level '{cls.level_str.upper()}', defaulting to 'INFO'") - else: - logger.setLevel(cls.log_level) - logger.info(f"Log level set to '{cls.level_str.upper()}'") - - logger.addFilter(Aggregator("Agregator")) - - return logger diff --git a/logprep/util/log_aggregator.py b/logprep/util/log_aggregator.py deleted file mode 100644 index f2b46764f..000000000 --- a/logprep/util/log_aggregator.py +++ /dev/null @@ -1,97 +0,0 @@ -"""This module implements a logger that is able to aggregate log messages.""" - -import logging -from logging import LogRecord, Filter -from time import time, sleep -import threading - - -class Aggregator(Filter): - """Used to aggregate log messages.""" - - logs = {} - count_threshold = 4 - log_period = 10 - timer_thread = None - - @classmethod - def setup(cls, count: int, period: float): - """Setup aggregating logger. - - Parameters - ---------- - count : int - Count of log messages for which aggregation should begin. - period : float - Period for which log messages are being counted for aggregation. - - """ - cls.count_threshold = count - cls.log_period = period - cls.logs.clear() - - @classmethod - def start_timer(cls): - """Start repeating timer for aggregation.""" - cls.timer_thread = threading.Timer(cls.log_period, cls._log_aggregated) - cls.timer_thread.daemon = True - cls.timer_thread.start() - - @classmethod - def _aggregate(cls, record: LogRecord) -> bool: - log_id = "{0[levelname]}:{0[name]}:{0[msg]}".format(record.__dict__) - if log_id not in cls.logs: - cls.logs[log_id] = { - "cnt": 1, - "first_record": record, - "last_record": None, - "cnt_passed": 0, - "aggregate": False, - } - else: - cls.logs[log_id]["cnt"] += 1 - cls.logs[log_id]["last_record"] = record - - if record.created - cls.logs[log_id]["last_record"].created < cls.log_period: - if cls.logs[log_id]["cnt"] > cls.count_threshold or cls.logs[log_id]["aggregate"]: - return False - - cls.logs[log_id]["aggregate"] = False - cls.logs[log_id]["first_record"] = record - cls.logs[log_id]["cnt_passed"] += 1 - - return True - - @classmethod - def _log_aggregated(cls): - while True: - cls._perform_logging_if_possible() - sleep(cls.log_period) - - @classmethod - def _perform_logging_if_possible(cls): - for log_id, data in list(cls.logs.items()): - count = data["cnt"] - data["cnt_passed"] - if count > 1 and data["last_record"]: - time_passed = round(time() - data["first_record"].created, 1) - time_passed = min(time_passed, cls.log_period) - if time_passed < 60: - period = f"{time_passed} sek" - else: - period = f"{time_passed / 60.0:.1f} min" - last_record = data["last_record"] - last_record.msg = f"{last_record.msg} ({count} in ~{period})" - logging.getLogger(last_record.name).log(last_record.levelno, last_record.msg) - - cls.logs[log_id]["first_record"] = data["last_record"] - cls.logs[log_id]["last_record"] = None - cls.logs[log_id]["cnt"] = 0 - cls.logs[log_id]["cnt_passed"] = 0 - cls.logs[log_id]["aggregate"] = True - else: - if time() - cls.logs[log_id]["first_record"].created >= cls.log_period: - cls.logs[log_id]["aggregate"] = False - - def filter(self, record: LogRecord) -> bool: - """Print aggregation if it is ready via a Logger filter.""" - return Aggregator._aggregate(record) diff --git a/tests/unit/test_run_logprep.py b/tests/unit/test_run_logprep.py index 4b3cb3ad3..f53ed6c60 100644 --- a/tests/unit/test_run_logprep.py +++ b/tests/unit/test_run_logprep.py @@ -233,7 +233,7 @@ def test_main_calls_runner_stop_on_any_exception(self, mock_stop, mock_start): mock_stop.assert_called() def test_logprep_exits_if_logger_can_not_be_created(self): - with mock.patch("logprep.run_logprep.AggregatingLogger.create") as mock_create: + with mock.patch("logging.getLogger") as mock_create: mock_create.side_effect = BaseException config_path = "quickstart/exampledata/config/pipeline.yml" with mock.patch("sys.argv", ["logprep", config_path]): diff --git a/tests/unit/util/test_log_aggregator.py b/tests/unit/util/test_log_aggregator.py deleted file mode 100644 index 2d23e3254..000000000 --- a/tests/unit/util/test_log_aggregator.py +++ /dev/null @@ -1,171 +0,0 @@ -# pylint: disable=missing-docstring -# pylint: disable=protected-access -from logging import makeLogRecord -from random import randint -from unittest import mock - -import pytest - -from logprep.util.log_aggregator import Aggregator - - -@pytest.fixture(autouse=True) -def clear_aggregator(): - yield - Aggregator.logs.clear() - - -class TestAggregator: - def test_initialized(self): - Aggregator.setup(5, 6) - assert Aggregator.count_threshold == 5 - assert Aggregator.log_period == 6 - assert len(Aggregator.logs) == 0 - - def test_one_log_without_aggregation(self): - Aggregator._aggregate(makeLogRecord({"msg": "Test log"})) - assert len(Aggregator.logs) == 1 - self.assert_count(1) - - def test_one_log_many_times_without_aggregation(self): - log_cnt = 10 - for _ in range(log_cnt): - Aggregator._aggregate(makeLogRecord({"msg": "Test log"})) - assert len(Aggregator.logs) == 1 - self.assert_count(log_cnt) - - def test_many_logs_many_times_without_aggregation(self): - log_cnt = 10 - for _ in range(log_cnt): - Aggregator._aggregate(makeLogRecord({"msg": "Test log 1"})) - Aggregator._aggregate(makeLogRecord({"msg": "Test log 2"})) - assert len(Aggregator.logs) == 2 - self.assert_count(log_cnt) - - @mock.patch("logprep.util.log_aggregator.time", return_value=0.0) - def test_aggregation_no_new_aggregation_after_period_if_no_new_logs_at_print(self, mock_time): - cnt_threshold = 3 - period = 0.25 - Aggregator.setup(cnt_threshold, period) - log_cnt = 10 - - should_print = self.add_log_n_times(log_cnt) - assert should_print == [True] * cnt_threshold + [False] * (log_cnt - cnt_threshold) - assert len(Aggregator.logs) == 1 - - for log in Aggregator.logs.values(): - assert log["cnt_passed"] == cnt_threshold - assert log["cnt"] == 10 - assert log["first_record"].msg == "Test log" - assert log["last_record"].msg == "Test log" - - Aggregator._perform_logging_if_possible() - - for log in Aggregator.logs.values(): - assert log["cnt_passed"] == 0 - assert log["cnt"] == 0 - assert log["first_record"].msg == f"Test log ({log_cnt - cnt_threshold} in ~0.0 sek)" - assert log["last_record"] is None - assert log["aggregate"] is True - - mock_time.return_value = mock_time.return_value + period + 0.05 - Aggregator._perform_logging_if_possible() - - for log in Aggregator.logs.values(): - assert log["aggregate"] is False - - @mock.patch("logprep.util.log_aggregator.time", return_value=0.0) - def test_aggregation_converts_to_minutes(self, mock_time): - cnt_threshold = 3 - period = 300 - Aggregator.setup(cnt_threshold, period) - log_cnt = 10 - - should_print = self.add_log_n_times(log_cnt) - assert should_print == [True] * cnt_threshold + [False] * (log_cnt - cnt_threshold) - assert len(Aggregator.logs) == 1 - - for log in Aggregator.logs.values(): - assert log["cnt_passed"] == cnt_threshold - assert log["cnt"] == 10 - assert log["first_record"].msg == "Test log" - assert log["last_record"].msg == "Test log" - - mock_time.return_value = mock_time.return_value + 61 - - Aggregator._perform_logging_if_possible() - - for log in Aggregator.logs.values(): - assert log["cnt_passed"] == 0 - assert log["cnt"] == 0 - assert log["first_record"].msg == f"Test log ({log_cnt - cnt_threshold} in ~1.0 min)" - assert log["last_record"] is None - assert log["aggregate"] is True - - @mock.patch("logprep.util.log_aggregator.time", return_value=0.0) - @mock.patch("logging.getLogger") - def test_aggregation_keep_aggregating_on_consecutive_periods( - self, logging_get_logger, mock_time - ): - cnt_threshold = 3 - period = 0.25 - Aggregator.setup(cnt_threshold, period) - log_cnt = 10 - - # It should log the first cnt_threshold logs normally - should_print = self.add_log_n_times(log_cnt) - assert should_print == [True] * cnt_threshold + [False] * (log_cnt - cnt_threshold) - assert len(Aggregator.logs) == 1 - - for log in Aggregator.logs.values(): - assert log["cnt_passed"] == cnt_threshold - assert log["cnt"] == 10 - assert log["first_record"].msg == "Test log" - assert log["last_record"].msg == "Test log" - - Aggregator._perform_logging_if_possible() - - for log in Aggregator.logs.values(): - assert log["cnt_passed"] == 0 - assert log["cnt"] == 0 - assert log["first_record"].msg == f"Test log ({log_cnt - cnt_threshold} in ~0.0 sek)" - assert log["last_record"] is None - assert log["aggregate"] is True - - # It should only print aggregated if there were still too many logs after - # the last aggregation unless the next period passed - additional_aggregation_cnt = 3 - for _ in range(additional_aggregation_cnt): - mock_time.return_value = mock_time.return_value + period + 0.05 - random_log_cnt = log_cnt + randint(1, 10) - should_print = self.add_log_n_times(random_log_cnt) - assert should_print == [False] * random_log_cnt - - self.assert_count(random_log_cnt) - - Aggregator._perform_logging_if_possible() - - for log in Aggregator.logs.values(): - assert log["cnt_passed"] == 0 - assert log["cnt"] == 0 - assert log["first_record"].msg == f"Test log ({random_log_cnt} in ~{period} sek)" - assert log["last_record"] is None - assert log["aggregate"] is True - - assert logging_get_logger.call_count == additional_aggregation_cnt + 1 - - @staticmethod - def add_log_n_times(log_cnt): - should_print = [] - for _ in range(log_cnt): - should_print.append( - Aggregator().filter( - makeLogRecord({"msg": "Test log", "levelno": 20, "created": 0.0}) - ) - ) - return should_print - - @staticmethod - def assert_count(log_cnt): - for log in Aggregator.logs.values(): - assert log["cnt"] == log_cnt