From b78dc65e9c233b15dc564f0dc149775787544ac1 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Mon, 9 Oct 2023 15:02:29 +0000 Subject: [PATCH 1/3] remove MultiprocessLogHandler --- logprep/abc/processor.py | 10 +- logprep/framework/pipeline.py | 81 ++++---------- logprep/framework/pipeline_manager.py | 38 +++---- logprep/run_logprep.py | 1 - logprep/runner.py | 49 ++------ logprep/util/multiprocessing_log_handler.py | 28 ----- tests/acceptance/util.py | 1 - tests/unit/framework/test_pipeline.py | 68 ++++-------- tests/unit/framework/test_pipeline_manager.py | 74 ++---------- tests/unit/processor/test_process.py | 12 +- tests/unit/test_runner.py | 53 +++------ tests/util/__init__.py | 0 tests/util/testhelpers.py | 105 ------------------ 13 files changed, 99 insertions(+), 421 deletions(-) delete mode 100644 logprep/util/multiprocessing_log_handler.py delete mode 100644 tests/util/__init__.py delete mode 100644 tests/util/testhelpers.py diff --git a/logprep/abc/processor.py b/logprep/abc/processor.py index be780900c..a06e8d9e7 100644 --- a/logprep/abc/processor.py +++ b/logprep/abc/processor.py @@ -291,15 +291,9 @@ def load_rules(self, specific_rules_targets: List[str], generic_rules_targets: L self._generic_tree.add_rule(rule, self._logger) if self._logger.isEnabledFor(DEBUG): # pragma: no cover number_specific_rules = self._specific_tree.metrics.number_of_rules - self._logger.debug( - f"{self.describe()} loaded {number_specific_rules} " - f"specific rules ({current_process().name})" - ) + self._logger.debug(f"{self.describe()} loaded {number_specific_rules} specific rules") number_generic_rules = self._generic_tree.metrics.number_of_rules - self._logger.debug( - f"{self.describe()} loaded {number_generic_rules} generic rules " - f"generic rules ({current_process().name})" - ) + self._logger.debug(f"{self.describe()} loaded {number_generic_rules} generic rules") @staticmethod def _field_exists(event: dict, dotted_field: str) -> bool: diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index 146a5abad..1f0efabdd 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -5,13 +5,15 @@ """ import copy +import logging +import logging.handlers +import multiprocessing # pylint: disable=logging-fstring-interpolation import queue import warnings from ctypes import c_bool, c_ulonglong from functools import cached_property -from logging import INFO, NOTSET, Handler, Logger from multiprocessing import Lock, Process, Value, current_process from typing import Any, List, Tuple @@ -42,24 +44,11 @@ from logprep.metrics.metric import Metric, calculate_new_average from logprep.metrics.metric_exposer import MetricExposer from logprep.processor.base.exceptions import ProcessingCriticalError, ProcessingWarning -from logprep.util.multiprocessing_log_handler import MultiprocessingLogHandler from logprep.util.pipeline_profiler import PipelineProfiler from logprep.util.prometheus_exporter import PrometheusStatsExporter from logprep.util.time_measurement import TimeMeasurement -class PipelineError(BaseException): - """Base class for Pipeline related exceptions.""" - - -class MustProvideALogHandlerError(PipelineError): - """Raise if no log handler was provided.""" - - -class MustProvideAnMPLogHandlerError(BaseException): - """Raise if no multiprocessing log handler was provided.""" - - class SharedCounter: """A shared counter for multiprocessing pipelines.""" @@ -67,9 +56,9 @@ def __init__(self): self._val = Value(c_ulonglong, 0) self._printed = Value(c_bool, False) self._lock = None - self._logger = None self._period = None self.scheduler = Scheduler() + self._logger = logging.getLogger("Logprep SharedCounter") def _init_timer(self, period: float): if self._period is None: @@ -79,17 +68,9 @@ def _init_timer(self, period: float): self.scheduler.every(int(self._period)).seconds.do(self.print_value) self.scheduler.every(int(self._period + 1)).seconds.do(self.reset_printed) - def _create_logger(self, log_handler: Handler): - if self._logger is None: - logger = Logger("Processing Counter", level=log_handler.level) - for handler in logger.handlers: - logger.removeHandler(handler) - logger.addHandler(log_handler) - self._logger = logger - - def setup(self, print_processed_period: float, log_handler: Handler, lock: Lock): + def setup(self, print_processed_period: float, lock: Lock): """Setup shared counter for multiprocessing pipeline.""" - self._create_logger(log_handler) + self._init_timer(print_processed_period) self._lock = lock @@ -194,7 +175,7 @@ def update_mean_processing_time_per_event(self, new_sample): _logprep_config: dict """ the logprep configuration dict """ - _log_handler: Handler + _log_queue: multiprocessing.Queue """ the handler for the logs """ _continue_iterating: Value @@ -220,17 +201,17 @@ def __init__( config: dict, pipeline_index: int = None, counter: "SharedCounter" = None, - log_handler: Handler = None, + log_queue: multiprocessing.Queue = None, lock: Lock = None, shared_dict: dict = None, used_server_ports: dict = None, prometheus_exporter: PrometheusStatsExporter = None, ) -> None: - if log_handler and not isinstance(log_handler, Handler): - raise MustProvideALogHandlerError + self._log_queue = log_queue + self.logger = logging.getLogger(f"Logprep Pipeline {pipeline_index}") + self.logger.addHandler(logging.handlers.QueueHandler(log_queue)) self._logprep_config = config self._timeout = config.get("timeout") - self._log_handler = log_handler self._continue_iterating = Value(c_bool) self._lock = lock @@ -238,7 +219,7 @@ def __init__( self._processing_counter = counter if self._processing_counter: print_processed_period = self._logprep_config.get("print_processed_period", 300) - self._processing_counter.setup(print_processed_period, log_handler, lock) + self._processing_counter.setup(print_processed_period, lock) self._used_server_ports = used_server_ports self._prometheus_exporter = prometheus_exporter self.pipeline_index = pipeline_index @@ -285,7 +266,7 @@ def metrics(self) -> PipelineMetrics: def _pipeline(self) -> tuple: self.logger.debug(f"Building '{self._process_name}'") pipeline = [self._create_processor(entry) for entry in self._logprep_config.get("pipeline")] - self.logger.debug(f"Finished building pipeline ({self._process_name})") + self.logger.debug("Finished building pipeline") return pipeline @cached_property @@ -313,28 +294,14 @@ def _input(self) -> Input: ) return Factory.create(input_connector_config, self.logger) - @cached_property - def logger(self) -> Logger: - """the pipeline logger""" - if self._log_handler is None: - return Logger("Pipeline") - if self._log_handler.level == NOTSET: - self._log_handler.level = INFO - logger = Logger("Pipeline", level=self._log_handler.level) - for handler in logger.handlers: - logger.removeHandler(handler) - logger.addHandler(self._log_handler) - - return logger - @_handle_pipeline_error def _setup(self): - self.logger.debug(f"Creating connectors ({self._process_name})") + self.logger.debug("Creating connectors") for _, output in self._output.items(): output.input_connector = self._input self.logger.debug( f"Created connectors -> input: '{self._input.describe()}'," - f" output -> '{[output.describe() for _, output in self._output.items()]}' ({self._process_name})" + f" output -> '{[output.describe() for _, output in self._output.items()]}'" ) self._input.pipeline_index = self.pipeline_index self._input.setup() @@ -345,10 +312,10 @@ def _setup(self): while self._input.server.config.port in self._used_server_ports: self._input.server.config.port += 1 self._used_server_ports.update({self._input.server.config.port: self._process_name}) - self.logger.debug(f"Finished creating connectors ({self._process_name})") - self.logger.info(f"Start building pipeline ({self._process_name})") + self.logger.debug("Finished creating connectors") + self.logger.info("Start building pipeline") _ = self._pipeline - self.logger.info(f"Finished building pipeline ({self._process_name})") + self.logger.info("Finished building pipeline") def _create_processor(self, entry: dict) -> "Processor": processor_name = list(entry.keys())[0] @@ -357,7 +324,7 @@ def _create_processor(self, entry: dict) -> "Processor": processor.setup() if self.metrics: self.metrics.pipeline.append(processor.metrics) - self.logger.debug(f"Created '{processor}' processor ({self._process_name})") + self.logger.debug(f"Created '{processor}' processor") return processor def run(self) -> None: @@ -369,7 +336,7 @@ def run(self) -> None: with warnings.catch_warnings(): warnings.simplefilter("default") self._setup() - self.logger.debug(f"Start iterating ({self._process_name})") + self.logger.debug("Start iterating") if hasattr(self._input, "server"): with self._input.server.run_in_thread(): while self._iterate(): @@ -490,6 +457,7 @@ def _drain_input_queues(self) -> None: def stop(self) -> None: """Stop processing processors in the Pipeline.""" + self.logger.debug(f"Stopping pipeline ({self._process_name})") with self._continue_iterating.get_lock(): self._continue_iterating.value = False @@ -503,15 +471,12 @@ def __init__( self, pipeline_index: int, config: dict, - log_handler: Handler, + log_queue: multiprocessing.Queue, lock: Lock, shared_dict: dict, used_server_ports: dict, prometheus_exporter: PrometheusStatsExporter = None, ) -> None: - if not isinstance(log_handler, MultiprocessingLogHandler): - raise MustProvideAnMPLogHandlerError - self._profile = config.get("profile_pipelines", False) Pipeline.__init__( @@ -519,7 +484,7 @@ def __init__( pipeline_index=pipeline_index, config=config, counter=self.processed_counter, - log_handler=log_handler, + log_queue=log_queue, lock=lock, shared_dict=shared_dict, used_server_ports=used_server_ports, diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index 317602862..0d9203fb5 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -1,16 +1,16 @@ """This module contains functionality to manage pipelines via multi-processing.""" +# pylint: disable=logging-fstring-interpolation -from logging import DEBUG, Logger -from multiprocessing import Lock, Manager -from queue import Empty +import logging +import logging.handlers +import multiprocessing from logprep.framework.pipeline import MultiprocessingPipeline from logprep.util.configuration import Configuration -from logprep.util.multiprocessing_log_handler import MultiprocessingLogHandler from logprep.util.prometheus_exporter import PrometheusStatsExporter -class PipelineManagerError(BaseException): +class PipelineManagerError(Exception): """Base class for pipeline related exceptions.""" @@ -24,15 +24,17 @@ def __init__(self, what_failed: str): class PipelineManager: """Manage pipelines via multi-processing.""" - def __init__(self, logger: Logger): - self._logger = logger + def __init__(self): self.prometheus_exporter = None - self._log_handler = MultiprocessingLogHandler(self._logger.level) + self._logger = logging.getLogger("Logprep PipelineManager") + self.log_queue = multiprocessing.Queue(-1) + self._queue_listener = logging.handlers.QueueListener(self.log_queue) + self._queue_listener.start() self._pipelines = [] self._configuration = None - self._lock = Lock() + self._lock = multiprocessing.Lock() self._shared_dict = None self._used_server_ports = None @@ -40,7 +42,7 @@ def set_configuration(self, configuration: Configuration): """set the verified config""" self._configuration = configuration - manager = Manager() + manager = multiprocessing.Manager() self._shared_dict = manager.dict() self._used_server_ports = manager.dict() for idx in range(configuration.get("process_count", 1)): @@ -58,8 +60,7 @@ def get_count(self) -> int: The pipeline count will be incrementally changed until it reaches this value. """ - if self._logger.isEnabledFor(DEBUG): # pragma: no cover - self._logger.debug(f"Getting pipeline count: {len(self._pipelines)}") + self._logger.debug(f"Getting pipeline count: {len(self._pipelines)}") return len(self._pipelines) def set_count(self, count: int): @@ -101,18 +102,11 @@ def restart_failed_pipeline(self): self.set_count(self._configuration.get("process_count")) self._logger.warning(f"Restarted {len(failed_pipelines)} failed pipeline(s)") - def handle_logs_into_logger(self, logger: Logger, timeout: float): - """Handle logs.""" - try: - logger.handle(self._log_handler.get(timeout)) - while True: - logger.handle(self._log_handler.get(0.0)) - except Empty: - pass - def stop(self): """Stop processing any pipelines by reducing the pipeline count to zero.""" self._decrease_to_count(0) + while len(self._pipelines) > 0: + self._logger.debug(f"Waiting for {len(self._pipelines)} pipelines to stop") def _create_pipeline(self, index) -> MultiprocessingPipeline: if self._configuration is None: @@ -122,7 +116,7 @@ def _create_pipeline(self, index) -> MultiprocessingPipeline: return MultiprocessingPipeline( pipeline_index=index, config=self._configuration, - log_handler=self._log_handler, + log_queue=self.log_queue, lock=self._lock, shared_dict=self._shared_dict, used_server_ports=self._used_server_ports, diff --git a/logprep/run_logprep.py b/logprep/run_logprep.py index 7ffdad94a..33cb3dc6c 100644 --- a/logprep/run_logprep.py +++ b/logprep/run_logprep.py @@ -93,7 +93,6 @@ def _run_logprep(arguments, logger: logging.Logger): runner = None try: runner = Runner.get_runner() - runner.set_logger(logger) runner.load_configuration(arguments.config) logger.debug("Configuration loaded") runner.start() diff --git a/logprep/runner.py b/logprep/runner.py index b1064b22a..4d69311f4 100644 --- a/logprep/runner.py +++ b/logprep/runner.py @@ -1,8 +1,9 @@ """This module contains the logprep runner and is responsible for signal handling.""" +# pylint: disable=logging-fstring-interpolation +import logging import signal from ctypes import c_bool -from logging import Logger from multiprocessing import Value, current_process import requests @@ -10,10 +11,9 @@ from logprep.framework.pipeline_manager import PipelineManager from logprep.util.configuration import Configuration, InvalidConfigurationError -from logprep.util.multiprocessing_log_handler import MultiprocessingLogHandler -class RunnerError(BaseException): +class RunnerError(Exception): """Base class for Runner related exceptions.""" @@ -25,10 +25,6 @@ class NotALoggerError(RunnerError): """Raise if the logger was assigned a non-logger object .""" -class MustNotSetLoggerTwiceError(RunnerError): - """Raise if a logger has been set more than once.""" - - class MustConfigureALoggerError(RunnerError): """Raise if no logger has been configured.""" @@ -60,7 +56,7 @@ class Runner: to start processing. The Runner should only raise exceptions derived from RunnerError but other components may raise - exceptions that are not catched by it. Hence, we recommend to simply catch BaseException and + exceptions that are not catched by it. Hence, we recommend to simply catch Exception and log it as an unhandled exception. Example @@ -91,8 +87,7 @@ def get_runner(): def __init__(self, bypass_check_to_obtain_non_singleton_instance=False): self._configuration = None self._yaml_path = None - self._logger = None - self._log_handler = None + self._logger = logging.getLogger("Logprep Runner") self._config_refresh_interval = None self._manager = None @@ -105,29 +100,6 @@ def __init__(self, bypass_check_to_obtain_non_singleton_instance=False): if not bypass_check_to_obtain_non_singleton_instance: raise UseGetRunnerToCreateRunnerSingleton - def set_logger(self, logger: Logger): - """Setup logging for any "known" errors from any part of the software. - - Parameters - ---------- - logger: Logger - An instance of logging.Logger. - - Raises - ------ - NotALoggerError - If 'logger' is not an instance of Logger. - MustNotSetLoggerTwiceError - If 'self._logger' was already set. - """ - if not isinstance(logger, Logger): - raise NotALoggerError - if self._logger is not None: - raise MustNotSetLoggerTwiceError - - self._logger = logger - self._log_handler = MultiprocessingLogHandler(logger.level) - def load_configuration(self, yaml_file: str): """Load the configuration from a YAML file (cf. documentation). @@ -196,9 +168,6 @@ def start(self): def _loop(self): self.scheduler.run_pending() self._manager.restart_failed_pipeline() - # Note: We are waiting half the timeout because when shutting down, we also have to - # wait for the logprep's timeout before the shutdown is actually initiated. - self._manager.handle_logs_into_logger(self._logger, self._configuration["timeout"] / 2.0) def reload_configuration(self, refresh=False): """Reload the configuration from the configured yaml path. @@ -248,10 +217,8 @@ def reload_configuration(self, refresh=False): ) except InvalidConfigurationError as error: self._logger.error( - "Invalid configuration, leaving old configuration in place: " - + self._yaml_path - + ": " - + str(error) + "Invalid configuration, leaving old" + f" configuration in place: {self._yaml_path}: {str(error)}" ) def _schedule_config_refresh_job(self): @@ -267,7 +234,7 @@ def _schedule_config_refresh_job(self): def _create_manager(self): if self._manager is not None: raise MustNotCreateMoreThanOneManagerError - self._manager = PipelineManager(self._logger) + self._manager = PipelineManager() def stop(self): """Stop the current process""" diff --git a/logprep/util/multiprocessing_log_handler.py b/logprep/util/multiprocessing_log_handler.py deleted file mode 100644 index fc249a012..000000000 --- a/logprep/util/multiprocessing_log_handler.py +++ /dev/null @@ -1,28 +0,0 @@ -"""This module is used to handle logging with multi processing.""" - -from logging import Handler, LogRecord -from multiprocessing import Queue - - -class MultiprocessingLogHandler(Handler): - """Log handles with multi processing capabilities.""" - - def __init__(self, log_level: int): - self._queue = Queue() - super().__init__(log_level) - - def handle(self, record: LogRecord): - self._queue.put(record, block=True) - - def get(self, timeout: float) -> LogRecord: - """Get log event from multi processing queue. - - Parameters - ---------- - timeout : float - Time after which the blocking queue should timeout. - - """ - if timeout <= 0.0: - return self._queue.get(block=False) - return self._queue.get(block=True, timeout=timeout) diff --git a/tests/acceptance/util.py b/tests/acceptance/util.py index 03fe70dab..9c0d4dc49 100644 --- a/tests/acceptance/util.py +++ b/tests/acceptance/util.py @@ -165,7 +165,6 @@ def get_patched_runner(config_path, logger): The patched logprep runner """ runner = Runner(bypass_check_to_obtain_non_singleton_instance=True) - runner.set_logger(logger) runner.load_configuration(config_path) # patch runner to stop on empty pipeline diff --git a/tests/unit/framework/test_pipeline.py b/tests/unit/framework/test_pipeline.py index 0ce7fa04a..efb8dca53 100644 --- a/tests/unit/framework/test_pipeline.py +++ b/tests/unit/framework/test_pipeline.py @@ -5,7 +5,7 @@ import re import time from copy import deepcopy -from logging import DEBUG, WARNING, getLogger +from logging import DEBUG, getLogger from multiprocessing import Lock, active_children from unittest import mock @@ -29,17 +29,10 @@ ) from logprep.abc.processor import Processor from logprep.factory import Factory -from logprep.framework.pipeline import ( - MultiprocessingPipeline, - MustProvideALogHandlerError, - MustProvideAnMPLogHandlerError, - Pipeline, - SharedCounter, -) +from logprep.framework.pipeline import MultiprocessingPipeline, Pipeline, SharedCounter from logprep.processor.base.exceptions import ProcessingCriticalError, ProcessingWarning from logprep.processor.deleter.rule import DeleterRule from logprep.util.getter import GetterFactory -from logprep.util.multiprocessing_log_handler import MultiprocessingLogHandler original_create = Factory.create @@ -54,7 +47,6 @@ class ConfigurationForTests: "pipeline": [{"mock_processor1": {"proc": "conf"}}, {"mock_processor2": {"proc": "conf"}}], "metrics": {"period": 300, "enabled": False}, } - log_handler = MultiprocessingLogHandler(WARNING) lock = Lock() shared_dict = {} counter = SharedCounter() @@ -69,27 +61,13 @@ def setup_method(self): pipeline_index=1, config=self.logprep_config, counter=self.counter, - log_handler=self.log_handler, + log_queue=mock.MagicMock(), lock=self.lock, shared_dict=self.shared_dict, used_server_ports=mock.MagicMock(), prometheus_exporter=mock.MagicMock(), ) - def test_fails_if_log_handler_is_not_of_type_loghandler(self, _): - for not_a_log_handler in [123, 45.67, TestPipeline()]: - with raises(MustProvideALogHandlerError): - _ = Pipeline( - pipeline_index=1, - config=self.logprep_config, - counter=self.counter, - log_handler=not_a_log_handler, - lock=self.lock, - shared_dict=self.shared_dict, - used_server_ports=mock.MagicMock(), - prometheus_exporter=mock.MagicMock(), - ) - def test_pipeline_property_returns_pipeline(self, mock_create): assert len(self.pipeline._pipeline) == 2 assert mock_create.call_count == 4 # 2 processors, 1 input, 1 output @@ -772,30 +750,16 @@ def test_process_pipeline_raises_assertion_when_no_input_connector_is_set(self): class TestMultiprocessingPipeline(ConfigurationForTests): - def setup_class(self): - self.log_handler = MultiprocessingLogHandler(DEBUG) - - def test_fails_if_log_handler_is_not_a_multiprocessing_log_handler(self): - for not_a_log_handler in [None, 123, 45.67, TestMultiprocessingPipeline()]: - with raises(MustProvideAnMPLogHandlerError): - MultiprocessingPipeline( - pipeline_index=1, - config=self.logprep_config, - log_handler=not_a_log_handler, - lock=self.lock, - used_server_ports=mock.MagicMock(), - shared_dict=self.shared_dict, - ) - def test_does_not_fail_if_log_handler_is_a_multiprocessing_log_handler(self): try: MultiprocessingPipeline( pipeline_index=1, config=self.logprep_config, - log_handler=self.log_handler, + log_queue=mock.MagicMock(), lock=self.lock, used_server_ports=mock.MagicMock(), shared_dict=self.shared_dict, + prometheus_exporter=mock.MagicMock(), ) except MustProvideAnMPLogHandlerError: fail("Must not raise this error for a correct handler!") @@ -806,10 +770,11 @@ def test_creates_a_new_process(self): MultiprocessingPipeline( pipeline_index=1, config=self.logprep_config, - log_handler=self.log_handler, + log_queue=mock.MagicMock(), lock=self.lock, used_server_ports=mock.MagicMock(), shared_dict=self.shared_dict, + prometheus_exporter=mock.MagicMock(), ) ) @@ -820,10 +785,11 @@ def test_stop_terminates_the_process(self): MultiprocessingPipeline( pipeline_index=1, config=self.logprep_config, - log_handler=self.log_handler, + log_queue=mock.MagicMock(), lock=self.lock, used_server_ports=mock.MagicMock(), shared_dict=self.shared_dict, + prometheus_exporter=mock.MagicMock(), ) ) children_after = active_children() @@ -834,10 +800,11 @@ def test_enable_iteration_sets_iterate_to_true_stop_to_false(self): pipeline = MultiprocessingPipeline( pipeline_index=1, config=self.logprep_config, - log_handler=self.log_handler, + log_queue=mock.MagicMock(), lock=self.lock, used_server_ports=mock.MagicMock(), shared_dict=self.shared_dict, + prometheus_exporter=mock.MagicMock(), ) assert not pipeline._iterate() @@ -851,10 +818,11 @@ def test_graceful_shutdown_of_pipeline_on_source_disconnected_error(self, capfd) pipeline = MultiprocessingPipeline( pipeline_index=1, config=self.logprep_config, - log_handler=self.log_handler, + log_queue=mock.MagicMock(), lock=self.lock, used_server_ports=mock.MagicMock(), shared_dict=self.shared_dict, + prometheus_exporter=mock.MagicMock(), ) pipeline._input = mock.MagicMock() pipeline._input.get_next = mock.MagicMock() @@ -873,10 +841,11 @@ def test_graceful_shutdown_of_pipeline_on_fata_input_error(self, capfd): pipeline = MultiprocessingPipeline( pipeline_index=1, config=self.logprep_config, - log_handler=self.log_handler, + log_queue=mock.MagicMock(), lock=self.lock, used_server_ports=mock.MagicMock(), shared_dict=self.shared_dict, + prometheus_exporter=mock.MagicMock(), ) pipeline._input = mock.MagicMock() pipeline._input.get_next = mock.MagicMock() @@ -895,10 +864,11 @@ def test_graceful_shutdown_of_pipeline_on_fatal_output_error(self, capfd): pipeline = MultiprocessingPipeline( pipeline_index=1, config=self.logprep_config, - log_handler=self.log_handler, + log_queue=mock.MagicMock(), lock=self.lock, used_server_ports=mock.MagicMock(), shared_dict=self.shared_dict, + prometheus_exporter=mock.MagicMock(), ) pipeline._output = mock.MagicMock() pipeline._output.store = mock.MagicMock() @@ -908,7 +878,7 @@ def test_graceful_shutdown_of_pipeline_on_fatal_output_error(self, capfd): pipeline.start() pipeline.stop() pipeline.join() - out, err = capfd.readouterr() + _, err = capfd.readouterr() assert "AttributeError: 'bool' object has no attribute 'get_lock'" not in err @staticmethod @@ -930,7 +900,7 @@ def test_shared_counter_prints_value_after_configured_period(self, caplog): shared_counter = SharedCounter() print_period = 1 shared_counter._logger = self.test_logger - shared_counter.setup(print_period, None, Lock()) + shared_counter.setup(print_period, Lock()) test_counter = 0 test_counter_limit = 100 start_time = time.time() diff --git a/tests/unit/framework/test_pipeline_manager.py b/tests/unit/framework/test_pipeline_manager.py index 5afe68f74..4cbe685ff 100644 --- a/tests/unit/framework/test_pipeline_manager.py +++ b/tests/unit/framework/test_pipeline_manager.py @@ -1,8 +1,7 @@ # pylint: disable=missing-docstring # pylint: disable=protected-access # pylint: disable=attribute-defined-outside-init -from logging import ERROR, INFO, WARNING, Logger -from time import sleep, time +from logging import Logger from unittest import mock from pytest import raises @@ -14,11 +13,6 @@ ) from logprep.util.configuration import Configuration from tests.testdata.metadata import path_to_config -from tests.util.testhelpers import ( - AssertEmitsLogMessage, - AssertEmitsLogMessages, - HandlerStub, -) class MultiprocessingPipelineMock(MultiprocessingPipeline): @@ -59,15 +53,13 @@ def _create_pipeline(self, index): class TestPipelineManager: def setup_class(self): self.config = Configuration.create_from_yaml(path_to_config) - self.handler = HandlerStub() self.logger = Logger("test") - self.logger.addHandler(self.handler) - self.manager = PipelineManagerForTesting(self.logger) + self.manager = PipelineManagerForTesting() self.manager.set_configuration(self.config) def test_create_pipeline_fails_if_config_is_unset(self): - manager = PipelineManager(self.logger) + manager = PipelineManager() with raises( MustSetConfigurationFirstError, @@ -144,61 +136,13 @@ def test_remove_failed_pipelines_removes_terminated_pipelines(self): assert not failed_pipeline in self.manager._pipelines - def test_remove_failed_pipelines_logs_warning_for_removed_failed_pipelines(self): + @mock.patch("logging.Logger.warning") + def test_remove_failed_pipelines_logs_warning_for_removed_failed_pipelines(self, logger_mock): self.manager.set_count(2) failed_pipeline = self.manager._pipelines[-1] failed_pipeline.process_is_alive = False - - with AssertEmitsLogMessage(self.handler, WARNING, message="Restarted 1 failed pipeline(s)"): - self.manager.restart_failed_pipeline() - - def test_handle_logs_into_logger_returns_after_timeout(self): - self.manager.set_count(1) - timeout = 0.1 - - start = time() - self.manager.handle_logs_into_logger(self.logger, timeout=timeout) - duration = time() - start - - assert duration >= timeout - assert duration <= (1.5 * timeout) - - def test_handle_logs_into_logger_forwards_log_record_to_logger(self): - self.manager.set_count(1) - timeout = 0.1 - - handler = HandlerStub() - - logger_in = Logger("test_handle_logs_into_logger_forwards_log_record_to_logger") - logger_in.addHandler(self.manager._log_handler) - logger_in.error("this is a test") - - logger_out = Logger("test_handle_logs_into_logger_forwards_log_record_to_logger") - logger_out.addHandler(handler) - - with AssertEmitsLogMessage(handler, ERROR, "this is a test"): - self.manager.handle_logs_into_logger(logger_out, timeout=timeout) - - def test_handle_logs_into_logger_retrieves_all_logs_with_a_single_call(self): - self.manager.set_count(1) - timeout = 0.1 - - handler = HandlerStub() - - logger_in = Logger("test_handle_logs_into_logger_forwards_log_record_to_logger") - logger_in.addHandler(self.manager._log_handler) - logger_in.error("msg1") - logger_in.warning("msg2") - logger_in.info("msg3") - - logger_out = Logger("test_handle_logs_into_logger_forwards_log_record_to_logger") - logger_out.addHandler(handler) - # NOTE: This test failed once in a while (fewer messages received than expected), - # this sleep seems to have fixed it, try adjusting, if the test fails randomly. - sleep(0.01) # nosemgrep - - with AssertEmitsLogMessages(handler, [ERROR, WARNING, INFO], ["msg1", "msg2", "msg3"]): - self.manager.handle_logs_into_logger(logger_out, timeout=timeout) + self.manager.restart_failed_pipeline() + logger_mock.assert_called_with("Restarted 1 failed pipeline(s)") def test_stop_terminates_processes_created(self): self.manager.set_count(3) @@ -219,7 +163,7 @@ def test_restart_failed_pipelines_removes_metrics_database_if_prometheus_target_ failed_pipeline.is_alive = mock.MagicMock() # nosemgrep failed_pipeline.is_alive.return_value = False # nosemgrep failed_pipeline.pid = 42 - manager = PipelineManager(self.logger) + manager = PipelineManager() manager.set_configuration({"metrics": {"enabled": True}, "process_count": 2}) manager.prometheus_exporter = prometheus_exporter_mock manager._pipelines = [failed_pipeline] @@ -234,7 +178,7 @@ def test_restart_failed_pipelines_skips_removal_of_metrics_database_if_prometheu failed_pipeline = mock.MagicMock() failed_pipeline.is_alive = mock.MagicMock() # nosemgrep failed_pipeline.is_alive.return_value = False # nosemgrep - manager = PipelineManager(self.logger) + manager = PipelineManager() manager._pipelines = [failed_pipeline] manager._configuration = {"process_count": 2} manager.restart_failed_pipeline() diff --git a/tests/unit/processor/test_process.py b/tests/unit/processor/test_process.py index 6739473a3..c94abf447 100644 --- a/tests/unit/processor/test_process.py +++ b/tests/unit/processor/test_process.py @@ -123,7 +123,10 @@ def test_strategy_applies_rules_in_deterministic_order(self, execution_number): processor.process(event=event) mock_callback.assert_has_calls(expected_call_order, any_order=False) - def test_strategy_processes_generic_rules_after_processor_error_in_specific_rules(self, capsys): + @mock.patch("logging.Logger.warning") + def test_strategy_processes_generic_rules_after_processor_error_in_specific_rules( + self, mock_warning + ): config = { "pipeline": [ {"adder": {"type": "generic_adder", "specific_rules": [], "generic_rules": []}} @@ -160,6 +163,9 @@ def test_strategy_processes_generic_rules_after_processor_error_in_specific_rule pipeline._pipeline[0]._specific_tree.add_rule(specific_rule_two) pipeline._pipeline[0]._specific_tree.add_rule(specific_rule_one) pipeline.process_event(event) - captured = capsys.readouterr() - assert re.match("FieldExistsWarning in GenericAdder.*first", captured.err) + assert ( + "The following fields could not be written, " + "because one or more subfields existed and could not be extended: first" + in mock_warning.call_args[0][0] + ) assert event == expected_event diff --git a/tests/unit/test_runner.py b/tests/unit/test_runner.py index 5b1679c8c..83d6bbd4e 100644 --- a/tests/unit/test_runner.py +++ b/tests/unit/test_runner.py @@ -6,7 +6,7 @@ import json from copy import deepcopy from functools import partial -from logging import ERROR, INFO, Logger +from logging import Logger from os.path import join, split from unittest import mock @@ -16,12 +16,9 @@ from logprep.processor.labeler.labeling_schema import LabelingSchemaError from logprep.runner import ( CannotReloadWhenConfigIsUnsetError, - MustConfigureALoggerError, MustConfigureBeforeRunningError, MustNotConfigureTwiceError, MustNotCreateMoreThanOneManagerError, - MustNotSetLoggerTwiceError, - NotALoggerError, Runner, UseGetRunnerToCreateRunnerSingleton, ) @@ -35,7 +32,6 @@ path_to_schema2, ) from tests.unit.framework.test_pipeline_manager import PipelineManagerForTesting -from tests.util.testhelpers import AssertEmitsLogMessage, HandlerStub class RunnerForTesting(Runner): @@ -43,17 +39,14 @@ def __init__(self): super().__init__(bypass_check_to_obtain_non_singleton_instance=True) def _create_manager(self): - self._manager = PipelineManagerForTesting(self._logger) + self._manager = PipelineManagerForTesting() class LogprepRunnerTest: def setup_method(self, _): - self.handler = HandlerStub() self.logger = Logger("test") - self.logger.addHandler(self.handler) self.runner = RunnerForTesting() - self.runner.set_logger(self.logger) self.runner._create_manager() @@ -69,7 +62,6 @@ def test_init_fails_when_bypass_check_flag_is_not_set(self): def test_fails_when_calling_create_manager_more_than_once(self): runner = Runner(bypass_check_to_obtain_non_singleton_instance=True) - runner.set_logger(self.logger) runner.load_configuration(path_to_config) runner._create_manager() @@ -90,22 +82,6 @@ def test_fails_when_called_without_configuring_first(self): with raises(MustConfigureBeforeRunningError): self.runner.start() - def test_fails_when_setting_logger_to_non_logger_object(self): - for non_logger in [None, "string", 123, 45.67, TestRunner()]: - with raises(NotALoggerError): - self.runner.set_logger(non_logger) - - def test_fails_when_setting_logger_twice(self): - with raises(MustNotSetLoggerTwiceError): - self.runner.set_logger(Logger("test")) - - def test_fails_when_starting_without_setting_logger_first(self): - self.runner.load_configuration(path_to_config) - self.runner._logger = None - - with raises(MustConfigureALoggerError): - self.runner.start() - def test_fails_when_rules_are_invalid(self): with raises( InvalidConfigurationErrors, @@ -142,12 +118,9 @@ def test_fails_when_calling_reload_configuration_when_config_is_unset(self): class TestRunner(LogprepRunnerTest): def setup_method(self, _): - self.handler = HandlerStub() self.logger = Logger("test") - self.logger.addHandler(self.handler) self.runner = RunnerForTesting() - self.runner.set_logger(self.logger) self.runner.load_configuration(path_to_config) self.runner._create_manager() @@ -157,9 +130,10 @@ def test_get_runner_returns_the_same_runner_on_all_calls(self): for _ in range(10): assert runner == Runner.get_runner() - def test_reload_configuration_logs_info_when_reloading_config_was_successful(self): - with AssertEmitsLogMessage(self.handler, INFO, "Successfully reloaded configuration"): - self.runner.reload_configuration() + @mock.patch("logging.Logger.info") + def test_reload_configuration_logs_info_when_reloading_config_was_successful(self, mock_info): + self.runner.reload_configuration() + mock_info.assert_has_calls([mock.call("Successfully reloaded configuration")]) def test_reload_configuration_reduces_logprep_instance_count_to_new_value(self): self.runner._manager.set_count(3) @@ -176,15 +150,14 @@ def test_reload_configuration_leaves_old_configuration_in_place_if_new_config_is assert self.runner._configuration == old_configuration - def test_reload_configuration_logs_error_when_new_configuration_is_invalid(self): + @mock.patch("logging.Logger.error") + def test_reload_configuration_logs_error_when_new_configuration_is_invalid(self, mock_error): self.runner._yaml_path = path_to_invalid_config - - with AssertEmitsLogMessage( - self.handler, - ERROR, - prefix="Invalid configuration, leaving old configuration in place: ", - ): - self.runner.reload_configuration() + self.runner.reload_configuration() + assert ( + "Invalid configuration, leaving old configuration in place:" + in mock_error.call_args[0][0] + ) def test_reload_configuration_creates_new_logprep_instances_with_new_configuration(self): self.runner._manager.set_count(3) diff --git a/tests/util/__init__.py b/tests/util/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/util/testhelpers.py b/tests/util/testhelpers.py deleted file mode 100644 index ae706b529..000000000 --- a/tests/util/testhelpers.py +++ /dev/null @@ -1,105 +0,0 @@ -from logging import Handler, INFO, getLevelName -from queue import Empty - -from pytest import fail - - -class HandlerStub(Handler): - def __init__(self): - self.logs = [] - super().__init__(INFO) - - def emit(self, record): - self.logs.append(record) - - def clear(self): - del self.logs[:] - - def get(self, timeout=None): - return self.logs.pop(0) - - -## -# Check whether several messages were received. -# The log level must be provided for each message, you may use None to indicate -# that a prefix or message check should be skipped for a log record. -class AssertEmitsLogMessages: - def __init__(self, log_handler, log_levels, messages=[], prefixes=[], contains=[]): - self._log_handler_stub = log_handler - self._expected_levels = log_levels - self._expected_messages = messages - self._expected_prefixes = prefixes - self._expected_contains = contains - - while len(self._expected_messages) < len(self._expected_levels): - self._expected_messages.append(None) - while len(self._expected_prefixes) < len(self._expected_levels): - self._expected_prefixes.append(None) - while len(self._expected_contains) < len(self._expected_levels): - self._expected_contains.append(None) - - def __enter__(self): - pass - - def __exit__(self, exc_type, exc_val, exc_tb): - records = self._retrieve_records() - - if len(records) <= 0: - fail("Did not emit any log message.") - elif len(records) < len(self._expected_levels): - fail( - "Expected {} log messages but only {} message(s) were emitted.".format( - len(self._expected_levels), len(records) - ) - ) - - for offset in range(len(self._expected_levels)): - if records[offset].levelno != self._expected_levels[offset]: - fail( - "Message {}: Expected log level {}, have {}: {}".format( - offset, - getLevelName(self._expected_levels[offset]), - getLevelName(records[offset].levelno), - records[offset].msg, - ) - ) - - if (not self._expected_messages[offset] is None) and ( - records[offset].msg != self._expected_messages[offset] - ): - fail( - 'Expected message "{}" but got: {}'.format( - self._expected_prefixes[offset], records[offset].msg - ) - ) - if (not self._expected_prefixes[offset] is None) and ( - records[offset].msg[: len(self._expected_prefixes[offset])] - != self._expected_prefixes[offset] - ): - fail( - 'Message does not start with prefix "{}": {}'.format( - self._expected_prefixes[offset], records[offset].msg - ) - ) - if (not self._expected_contains[offset] is None) and ( - not self._expected_contains[offset] in records[offset].msg - ): - fail( - 'Expected message "{}" not found in: {}'.format( - self._expected_contains[offset], records[offset].msg - ) - ) - - def _retrieve_records(self): - records = [] - while len(records) < len(self._expected_levels): - try: - records.append(self._log_handler_stub.get(timeout=0.01)) - except (IndexError, Empty): - break - return records - - -class AssertEmitsLogMessage(AssertEmitsLogMessages): - def __init__(self, log_handler, log_level, message=None, prefix=None, contains=None): - super().__init__(log_handler, [log_level], [message], [prefix], [contains]) From acbb4f6bd114addd74b98be6d4fae4772a86648e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Zimmermann?= <101292599+ekneg54@users.noreply.github.com> Date: Wed, 11 Oct 2023 09:17:46 +0000 Subject: [PATCH 2/3] fix review remarks --- logprep/framework/pipeline_manager.py | 2 -- tests/acceptance/util.py | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index 0d9203fb5..c411aa116 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -105,8 +105,6 @@ def restart_failed_pipeline(self): def stop(self): """Stop processing any pipelines by reducing the pipeline count to zero.""" self._decrease_to_count(0) - while len(self._pipelines) > 0: - self._logger.debug(f"Waiting for {len(self._pipelines)} pipelines to stop") def _create_pipeline(self, index) -> MultiprocessingPipeline: if self._configuration is None: diff --git a/tests/acceptance/util.py b/tests/acceptance/util.py index 9c0d4dc49..b7158dc83 100644 --- a/tests/acceptance/util.py +++ b/tests/acceptance/util.py @@ -147,7 +147,7 @@ def get_runner_outputs(patched_runner) -> list: return parsed_outputs -def get_patched_runner(config_path, logger): +def get_patched_runner(config_path): """ Creates a patched runner that bypasses check to obtain non singleton instance and the runner won't continue iterating on an empty pipeline. From 9137b307e986229994fb312471360c0f4a6caf60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Zimmermann?= <101292599+ekneg54@users.noreply.github.com> Date: Wed, 11 Oct 2023 09:33:00 +0000 Subject: [PATCH 3/3] fix acceptance tests --- tests/acceptance/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/acceptance/util.py b/tests/acceptance/util.py index b7158dc83..cbfa6e418 100644 --- a/tests/acceptance/util.py +++ b/tests/acceptance/util.py @@ -179,7 +179,7 @@ def keep_iterating(): def get_test_output(config_path): - patched_runner = get_patched_runner(config_path, logger) + patched_runner = get_patched_runner(config_path) return get_runner_outputs(patched_runner=patched_runner)