From e053fdedd3838451ebd720da798e5d879869959b Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sat, 28 Sep 2024 11:51:18 +0200 Subject: [PATCH 1/7] Refactor kafka startup command in docker-compose.yml --- examples/compose/docker-compose.yml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/examples/compose/docker-compose.yml b/examples/compose/docker-compose.yml index 4cb8e7b4d..6cd312788 100644 --- a/examples/compose/docker-compose.yml +++ b/examples/compose/docker-compose.yml @@ -62,7 +62,13 @@ services: - ALLOW_PLAINTEXT_LISTENER=yes volumes: - /var/run/docker.sock:/var/run/docker.sock - command: sh -c "((sleep 15 && echo 'kafka up' && kafka-topics.sh --create --if-not-exists --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 4 --topic consumer)&) && /opt/bitnami/scripts/kafka/run.sh" + command: | + sh -c + "((sleep 15 && echo 'kafka up' && + kafka-topics.sh --create --if-not-exists --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 4 --topic consumer && + kafka-topics.sh --create --if-not-exists --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 4 --topic errors && + kafka-topics.sh --create --if-not-exists --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 4 --topic producer)&) && + /opt/bitnami/scripts/kafka/run.sh" healthcheck: test: [ From 491a1ca3814a5b1affdf6ee185b45c7d7fb400c9 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sat, 28 Sep 2024 11:52:26 +0200 Subject: [PATCH 2/7] Add config refresh interval to compose example setup --- examples/exampledata/config/http_pipeline.yml | 6 ++++-- examples/exampledata/config/pipeline.yml | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/examples/exampledata/config/http_pipeline.yml b/examples/exampledata/config/http_pipeline.yml index 909b28499..7eb3a7432 100644 --- a/examples/exampledata/config/http_pipeline.yml +++ b/examples/exampledata/config/http_pipeline.yml @@ -1,7 +1,8 @@ version: 2 -process_count: 8 -config_refresh_interval: 300 +process_count: 4 +config_refresh_interval: 5 profile_pipelines: false +restart_count: 3 logger: level: INFO loggers: @@ -47,6 +48,7 @@ input: /json: json /lab/123/(ABC|DEF)/pl.*: plaintext /lab/123/ABC/auditlog: jsonl + output: kafka: type: confluentkafka_output diff --git a/examples/exampledata/config/pipeline.yml b/examples/exampledata/config/pipeline.yml index c670d11a5..ffed82e28 100644 --- a/examples/exampledata/config/pipeline.yml +++ b/examples/exampledata/config/pipeline.yml @@ -2,6 +2,7 @@ version: 1 process_count: 2 timeout: 0.1 restart_count: 2 +config_refresh_interval: 5 logger: level: INFO format: "%(asctime)-15s %(hostname)-5s %(name)-10s %(levelname)-8s: %(message)s" From ce14489ae003880dbd8d914d4754fa6bc7366792 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sat, 28 Sep 2024 11:53:30 +0200 Subject: [PATCH 3/7] make process joining in LogprepMPQueueListener more robust --- logprep/util/logging.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/logprep/util/logging.py b/logprep/util/logging.py index aaa4d7ea8..475baab19 100644 --- a/logprep/util/logging.py +++ b/logprep/util/logging.py @@ -47,5 +47,6 @@ def start(self): def stop(self): self.enqueue_sentinel() - self._process.join() + if self._process and hasattr(self._process, "join"): + self._process.join() self._process = None From 06424be97722c814e930822511c8b15157c09eb1 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sat, 28 Sep 2024 11:55:26 +0200 Subject: [PATCH 4/7] registers atexit ThreadingHTTPServer shutdown --- logprep/util/http.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/logprep/util/http.py b/logprep/util/http.py index 22afe9328..d0e8f583b 100644 --- a/logprep/util/http.py +++ b/logprep/util/http.py @@ -1,5 +1,6 @@ """logprep http utils""" +import atexit import inspect import json import logging @@ -52,7 +53,7 @@ def __init__( logger_name: str Name of the logger instance """ - + atexit.register(self.shut_down, wait=0.1) if ( hasattr(self, "thread") and self.thread is not None From 718a3536462e571463e715fb667a487f6cc4f24f Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sat, 28 Sep 2024 12:00:09 +0200 Subject: [PATCH 5/7] Fix restart of exporter on logprep start --- logprep/framework/pipeline_manager.py | 56 +++++++++++++------ logprep/metrics/exporter.py | 7 ++- logprep/run_logprep.py | 3 + logprep/runner.py | 35 +++++------- tests/acceptance/test_config_refresh.py | 44 ++++++++++++--- tests/acceptance/test_full_configuration.py | 1 + tests/unit/framework/test_pipeline_manager.py | 44 ++++++++------- tests/unit/test_runner.py | 44 ++++++++------- 8 files changed, 146 insertions(+), 88 deletions(-) diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index 5889c87b0..d6b89e88f 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -8,7 +8,6 @@ import multiprocessing.managers import multiprocessing.queues import random -import signal import time from attr import define, field @@ -89,18 +88,21 @@ def __init__(self, configuration: Configuration): self.restart_count = 0 self.restart_timeout_ms = random.randint(100, 1000) self.metrics = self.Metrics(labels={"component": "manager"}) - self.loghandler = None + self.loghandler: LogprepMPQueueListener = None + self._error_queue: multiprocessing.Queue | None = None + self._configuration: Configuration = configuration + self._pipelines: list[multiprocessing.Process] = [] + self.prometheus_exporter: PrometheusExporter | None = None if multiprocessing.current_process().name == "MainProcess": - self._set_http_input_queue(configuration) self._setup_logging() - self._pipelines: list[multiprocessing.Process] = [] - self._configuration = configuration + self._setup_prometheus_exporter() + self._set_http_input_queue() + def _setup_prometheus_exporter(self): prometheus_config = self._configuration.metrics - if prometheus_config.enabled: + if prometheus_config.enabled and not self.prometheus_exporter: self.prometheus_exporter = PrometheusExporter(prometheus_config) - else: - self.prometheus_exporter = None + self.prometheus_exporter.prepare_multiprocessing() def _setup_logging(self): console_logger = logging.getLogger("console") @@ -109,12 +111,13 @@ def _setup_logging(self): self.loghandler = LogprepMPQueueListener(logqueue, console_handler) self.loghandler.start() - def _set_http_input_queue(self, configuration): + def _set_http_input_queue(self): """ this workaround has to be done because the queue size is not configurable after initialization and the queue has to be shared between the multiple processes """ - input_config = next(iter(configuration.input.values())) + input_config = list(self._configuration.input.values()) + input_config = input_config[0] if input_config else {} is_http_input = input_config.get("type") == "http_input" if not is_http_input and HttpInput.messages is not None: return @@ -169,27 +172,39 @@ def restart_failed_pipeline(self): self._pipelines.insert(index, self._create_pipeline(pipeline_index)) exit_code = failed_pipeline.exitcode logger.warning( - "Restarting failed pipeline on index %s " "with exit code: %s", + "Restarting failed pipeline on index %s with exit code: %s", pipeline_index, exit_code, ) if self._configuration.restart_count < 0: return + self._wait_to_restart() + + def _wait_to_restart(self): self.restart_count += 1 time.sleep(self.restart_timeout_ms / 1000) self.restart_timeout_ms = self.restart_timeout_ms * 2 def stop(self): """Stop processing any pipelines by reducing the pipeline count to zero.""" - self._decrease_to_count(0) + self.set_count(0) if self.prometheus_exporter: - self.prometheus_exporter.server.server.handle_exit(signal.SIGTERM, None) + self.prometheus_exporter.server.shut_down() self.prometheus_exporter.cleanup_prometheus_multiprocess_dir() + logger.info("Shutdown complete") + if self.loghandler is not None: + self.loghandler.stop() + + def start(self): + """Start processing.""" + self.set_count(self._configuration.process_count) - def restart(self, daemon=True): + def restart(self): """Restarts all pipelines""" - if self.prometheus_exporter: - self.prometheus_exporter.run(daemon=daemon) + self.stop() + self.start() + + def reload(self): self.set_count(0) self.set_count(self._configuration.process_count) @@ -204,3 +219,12 @@ def _create_pipeline(self, index) -> multiprocessing.Process: process.start() logger.info("Created new pipeline") return process + + def should_exit(self) -> bool: + """Check if the manager should exit.""" + return all( + ( + self._configuration.restart_count >= 0, + self.restart_count >= self._configuration.restart_count, + ) + ) diff --git a/logprep/metrics/exporter.py b/logprep/metrics/exporter.py index 1cdaa7c71..411a47c7e 100644 --- a/logprep/metrics/exporter.py +++ b/logprep/metrics/exporter.py @@ -53,7 +53,7 @@ def __init__(self, configuration: MetricsConfig): self.healthcheck_functions = None self._multiprocessing_prepared = False - def _prepare_multiprocessing(self): + def prepare_multiprocessing(self): """ Sets up the proper metric registry for multiprocessing and handles the necessary temporary multiprocessing directory that the prometheus client expects. @@ -93,7 +93,7 @@ def run(self, daemon=True): return port = self.configuration.port self.init_server(daemon=daemon) - self._prepare_multiprocessing() + self.prepare_multiprocessing() self.server.start() logger.info("Prometheus Exporter started on port %s", port) @@ -116,6 +116,7 @@ def restart(self): def update_healthchecks(self, healthcheck_functions: Iterable[Callable], daemon=True) -> None: """Updates the healthcheck functions""" self.healthcheck_functions = healthcheck_functions - self.server.shut_down() + if self.server and self.server.thread and self.server.thread.is_alive(): + self.server.shut_down() self.init_server(daemon=daemon) self.run() diff --git a/logprep/run_logprep.py b/logprep/run_logprep.py index 03542be38..83f77d881 100644 --- a/logprep/run_logprep.py +++ b/logprep/run_logprep.py @@ -82,6 +82,9 @@ def run(configs: tuple[str], version=None) -> None: runner = Runner.get_runner(configuration) logger.debug("Configuration loaded") runner.start() + except SystemExit as error: + logger.error(f"Error during setup: error code {error.code}") + sys.exit(error.code) # pylint: disable=broad-except except Exception as error: if os.environ.get("DEBUG", False): diff --git a/logprep/runner.py b/logprep/runner.py index 8b2bf711f..0448c8eda 100644 --- a/logprep/runner.py +++ b/logprep/runner.py @@ -2,6 +2,7 @@ # pylint: disable=logging-fstring-interpolation +import atexit import logging import sys from importlib.metadata import version @@ -43,8 +44,6 @@ class Runner: """ - scheduler: Scheduler - _runner = None _configuration: Configuration @@ -130,11 +129,12 @@ def get_runner(configuration: Configuration) -> "Runner": # For production, use the get_runner method to create/get access to a singleton! def __init__(self, configuration: Configuration) -> None: + self._manager: PipelineManager | None = None + atexit.register(self.stop_and_exit) self.exit_code = EXITCODES.SUCCESS self._configuration = configuration self.metrics = self.Metrics(labels={"logprep": "unset", "config": "unset"}) self._logger = logging.getLogger("Runner") - self._manager = PipelineManager(configuration) self.scheduler = Scheduler() @@ -144,46 +144,37 @@ def start(self): This runs until an SIGTERM, SIGINT or KeyboardInterrupt signal is received, or an unhandled error occurs. """ - self._set_version_info_metric() self._schedule_config_refresh_job() - self._manager.restart() + self._manager.start() self._logger.info("Startup complete") self._logger.debug("Runner iterating") self._iterate() + + def stop_and_exit(self): + """Stop the runner and exit the process.""" self._logger.info("Shutting down") - self._manager.stop() - self._logger.info("Shutdown complete") - if self._manager.loghandler is not None: - self._manager.loghandler.stop() - sys.exit(self.exit_code.value) + if self._manager: + self._manager.stop() def _iterate(self): for _ in self._keep_iterating(): if self._exit_received: break self.scheduler.run_pending() - if self._should_exit(): - self.exit_code = EXITCODES.PIPELINE_ERROR + if self._manager.should_exit(): + self.exit_code = EXITCODES.PIPELINE_ERROR.value self._logger.error("Restart count exceeded. Exiting.") - break + sys.exit(self.exit_code) self._manager.restart_failed_pipeline() - def _should_exit(self): - return all( - ( - self._configuration.restart_count >= 0, - self._manager.restart_count >= self._configuration.restart_count, - ) - ) - def reload_configuration(self): """Reloads the configuration""" try: self._configuration.reload() self._logger.info("Successfully reloaded configuration") self.metrics.number_of_config_refreshes += 1 - self._manager.restart() + self._manager.reload() self._schedule_config_refresh_job() self._logger.info(f"Configuration version: {self._configuration.version}") self._set_version_info_metric() diff --git a/tests/acceptance/test_config_refresh.py b/tests/acceptance/test_config_refresh.py index 2ab257e4a..b9785caf6 100644 --- a/tests/acceptance/test_config_refresh.py +++ b/tests/acceptance/test_config_refresh.py @@ -1,6 +1,8 @@ # pylint: disable=missing-docstring +import tempfile from pathlib import Path +import pytest from ruamel.yaml import YAML from logprep.util.configuration import Configuration @@ -9,17 +11,46 @@ yaml = YAML(typ="safe", pure=True) +@pytest.fixture(name="config") +def get_config(): + input_file = tempfile.mkstemp(suffix=".input.log")[1] + + config_dict = { + "version": "1", + "process_count": 1, + "timeout": 0.1, + "profile_pipelines": False, + "config_refresh_interval": 5, + "metrics": {"enabled": False}, + "pipeline": [], + "input": { + "file_input": { + "type": "file_input", + "logfile_path": input_file, + "start": "begin", + "interval": 1, + "watch_file": True, + } + }, + "output": { + "jsonl_output": { + "type": "dummy_output", + } + }, + } + + return Configuration(**config_dict) + + def teardown_function(): Path("generated_config.yml").unlink(missing_ok=True) stop_logprep() -def test_two_times_config_refresh_after_5_seconds(tmp_path): - config = Configuration.from_sources(["tests/testdata/config/config.yml"]) - config.config_refresh_interval = 5 - config.metrics = {"enabled": False} +def test_two_times_config_refresh_after_5_seconds(tmp_path, config): config_path = tmp_path / "generated_config.yml" config_path.write_text(config.as_json()) + config = Configuration.from_sources([str(config_path)]) proc = start_logprep(config_path) wait_for_output(proc, "Config refresh interval is set to: 5 seconds", test_timeout=5) config.version = "2" @@ -27,11 +58,10 @@ def test_two_times_config_refresh_after_5_seconds(tmp_path): wait_for_output(proc, "Successfully reloaded configuration", test_timeout=12) config.version = "other version" config_path.write_text(config.as_json()) - wait_for_output(proc, "Successfully reloaded configuration", test_timeout=12) + wait_for_output(proc, "Successfully reloaded configuration", test_timeout=20) -def test_no_config_refresh_after_5_seconds(tmp_path): - config = Configuration.from_sources(["tests/testdata/config/config.yml"]) +def test_no_config_refresh_after_5_seconds(tmp_path, config): config.config_refresh_interval = 5 config.metrics = {"enabled": False} config_path = tmp_path / "generated_config.yml" diff --git a/tests/acceptance/test_full_configuration.py b/tests/acceptance/test_full_configuration.py index 5175c31ab..5ccf14a09 100644 --- a/tests/acceptance/test_full_configuration.py +++ b/tests/acceptance/test_full_configuration.py @@ -166,6 +166,7 @@ def test_logprep_exposes_prometheus_metrics(tmp_path): assert "error" not in output.lower(), "error message" assert "critical" not in output.lower(), "error message" assert "exception" not in output.lower(), "error message" + assert not re.search("Shutting down", output) if "Startup complete" in output: break time.sleep(2) diff --git a/tests/unit/framework/test_pipeline_manager.py b/tests/unit/framework/test_pipeline_manager.py index e2f552b7d..731cb61fd 100644 --- a/tests/unit/framework/test_pipeline_manager.py +++ b/tests/unit/framework/test_pipeline_manager.py @@ -147,7 +147,8 @@ def test_stop_calls_prometheus_cleanup_method(self, tmpdir): def test_prometheus_exporter_is_instanciated_if_metrics_enabled(self): config = deepcopy(self.config) config.metrics = MetricsConfig(enabled=True, port=8000) - manager = PipelineManager(config) + with mock.patch("logprep.metrics.exporter.PrometheusExporter.prepare_multiprocessing"): + manager = PipelineManager(config) assert isinstance(manager.prometheus_exporter, PrometheusExporter) def test_set_count_increases_number_of_pipeline_starts_metric(self): @@ -167,15 +168,7 @@ def test_restart_calls_set_count(self): mock_set_count.assert_called() assert mock_set_count.call_count == 2 - def test_restart_calls_prometheus_exporter_run(self): - config = deepcopy(self.config) - config.metrics = MetricsConfig(enabled=True, port=666) - pipeline_manager = PipelineManager(config) - pipeline_manager.prometheus_exporter = mock.MagicMock() - pipeline_manager.restart() - pipeline_manager.prometheus_exporter.run.assert_called() - - def test_restart_sets_deterministic_pipline_index(self): + def test_restart_sets_deterministic_pipeline_index(self): config = deepcopy(self.config) config.metrics = MetricsConfig(enabled=False, port=666) pipeline_manager = PipelineManager(config) @@ -207,7 +200,7 @@ def test_pipeline_manager_sets_queue_size_for_http_input(self): }, } } - PipelineManager(config) + PipelineManager(config).start() assert HttpInput.messages._maxsize == 100 http_input = Factory.create(config.input) assert http_input.messages._maxsize == 100 @@ -215,6 +208,7 @@ def test_pipeline_manager_sets_queue_size_for_http_input(self): def test_pipeline_manager_setups_logging(self): dictConfig(DEFAULT_LOG_CONFIG) manager = PipelineManager(self.config) + manager.start() assert manager.loghandler is not None assert manager.loghandler.queue == logqueue assert manager.loghandler._thread is None @@ -262,14 +256,26 @@ def test_restart_injects_healthcheck_functions(self): pipeline_manager.restart() pipeline_manager.prometheus_exporter.update_healthchecks.assert_called() - def test_restart_ensures_prometheus_exporter_is_running(self): - config = deepcopy(self.config) - config.metrics = MetricsConfig(enabled=True, port=666) - pipeline_manager = PipelineManager(config) - pipeline_manager.prometheus_exporter._prepare_multiprocessing = mock.MagicMock() - with mock.patch("logprep.util.http.ThreadingHTTPServer"): - pipeline_manager.restart() - pipeline_manager.prometheus_exporter.server.start.assert_called() + def test_reload_calls_set_count_twice(self): + with mock.patch.object(self.manager, "set_count") as mock_set_count: + self.manager.reload() + # drains pipelines down to 0 and scales up to 3 afterwards + mock_set_count.assert_has_calls([mock.call(0), mock.call(3)]) + + def test_should_exit_returns_bool_based_on_restart_count(self): + self.config.restart_count = 2 + manager = PipelineManager(self.config) + assert not manager.should_exit() + manager.restart_count = 1 + assert not manager.should_exit() + manager.restart_count = 2 + assert manager.should_exit() + + def test_stop_calls_stop_on_loghandler(self): + manager = PipelineManager(self.config) + manager.loghandler = mock.MagicMock() + manager.stop() + manager.loghandler.stop.assert_called() class TestThrottlingQueue: diff --git a/tests/unit/test_runner.py b/tests/unit/test_runner.py index 4bd9827d4..3ff790807 100644 --- a/tests/unit/test_runner.py +++ b/tests/unit/test_runner.py @@ -103,9 +103,9 @@ def test_reload_configuration_leaves_old_configuration_in_place_if_new_config_is runner.reload_configuration() assert runner._configuration.version == "1" - def test_reload_invokes_manager_restart_on_config_change(self, runner: Runner): + def test_reload_invokes_manager_reload_on_config_change(self, runner: Runner): runner._configuration.version = "very old version" - with mock.patch.object(runner._manager, "restart") as mock_restart: + with mock.patch.object(runner._manager, "reload") as mock_restart: runner.reload_configuration() mock_restart.assert_called() @@ -117,8 +117,7 @@ def test_set_config_refresh_interval(self, new_value, expected_value, runner): with mock.patch.object(runner, "_manager"): runner._config_refresh_interval = new_value runner._exit_received = True - with pytest.raises(SystemExit, match=str(EXITCODES.SUCCESS.value)): - runner.start() + runner.start() if expected_value is None: assert len(runner.scheduler.jobs) == 0 else: @@ -128,11 +127,19 @@ def test_set_config_refresh_interval(self, new_value, expected_value, runner): def test_iteration_calls_run_pending(self, mock_run_pending, runner): with mock.patch.object(runner, "_manager") as mock_manager: mock_manager.restart_count = 0 - runner._keep_iterating = partial(mock_keep_iterating, 3) - with pytest.raises(SystemExit, match=str(EXITCODES.SUCCESS.value)): + mock_manager.should_exit.side_effect = [False, False, True] + with pytest.raises(SystemExit): runner.start() mock_run_pending.call_count = 3 + def test_iteration_calls_should_exit(self, runner): + with mock.patch.object(runner, "_manager") as mock_manager: + mock_manager.restart_count = 0 + mock_manager.should_exit.side_effect = [False, False, True] + with pytest.raises(SystemExit): + runner.start() + mock_manager.should_exit.call_count = 3 + def test_reload_configuration_schedules_job_if_config_refresh_interval_is_set( self, runner: Runner, configuration: Configuration, config_path: Path ): @@ -271,8 +278,7 @@ def test_start_sets_version_metric(self, runner: Runner): runner._exit_received = True with mock.patch("logprep.metrics.metrics.GaugeMetric.add_with_labels") as mock_add: with mock.patch.object(runner, "_manager"): - with pytest.raises(SystemExit, match=str(EXITCODES.SUCCESS.value)): - runner.start() + runner.start() mock_add.assert_called() mock_add.assert_has_calls( ( @@ -286,14 +292,19 @@ def test_start_sets_version_metric(self, runner: Runner): ) ) - def test_start_calls_manager_stop_after_breaking_the_loop(self, runner: Runner): + def test_stop_and_exit_calls_manager_stop(self, runner: Runner): + runner._exit_received = True + runner.start() with mock.patch.object(runner, "_manager") as mock_manager: - runner._exit_received = True - with pytest.raises(SystemExit, match=str(EXITCODES.SUCCESS.value)): - runner.start() + runner.stop_and_exit() mock_manager.stop.assert_called() mock_manager.restart_failed_pipeline.assert_not_called() + def test_stop_and_exit_is_register_atexit(self, configuration): + with mock.patch("atexit.register") as mock_register: + runner = Runner(configuration) + mock_register.assert_called_with(runner.stop_and_exit) + def test_metric_labels_returns_versions(self, runner: Runner): assert runner._metric_labels == { "logprep": f"{version('logprep')}", @@ -307,12 +318,3 @@ def test_runner_exits_with_pipeline_error_exitcode_if_restart_count_exceeded( mock_manager.restart_count = 5 with pytest.raises(SystemExit, match=str(EXITCODES.PIPELINE_ERROR.value)): runner.start() - - def test_runner_does_not_exits_on_negative_restart_count_parameter(self, runner: Runner): - with mock.patch.object(runner, "_manager") as mock_manager: - runner._keep_iterating = partial(mock_keep_iterating, 3) - mock_manager.restart_count = 5 - runner._configuration.restart_count = -1 - with pytest.raises(SystemExit, match=str(EXITCODES.SUCCESS.value)): - runner.start() - mock_manager.restart_failed_pipeline.call_count = 3 From 02f4183692fc4d6501f1e4457c634552ec457646 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sat, 28 Sep 2024 12:09:25 +0200 Subject: [PATCH 6/7] fix mock --- tests/unit/metrics/test_exporter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/metrics/test_exporter.py b/tests/unit/metrics/test_exporter.py index 742b8edc5..4ff498888 100644 --- a/tests/unit/metrics/test_exporter.py +++ b/tests/unit/metrics/test_exporter.py @@ -15,7 +15,7 @@ @mock.patch( - "logprep.metrics.exporter.PrometheusExporter._prepare_multiprocessing", + "logprep.metrics.exporter.PrometheusExporter.prepare_multiprocessing", new=lambda *args, **kwargs: None, ) class TestPrometheusExporter: @@ -107,7 +107,7 @@ def test_is_running_returns_true_when_server_thread_is_alive(self): @mock.patch( - "logprep.metrics.exporter.PrometheusExporter._prepare_multiprocessing", + "logprep.metrics.exporter.PrometheusExporter.prepare_multiprocessing", new=lambda *args, **kwargs: None, ) class TestHealthEndpoint: From 4be3e2da4a315dc65438db618932e5ea259192ca Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Sat, 28 Sep 2024 12:33:20 +0200 Subject: [PATCH 7/7] fix acceptance tests by ensuring runner stops and exits --- logprep/connector/jsonl/output.py | 2 +- tests/acceptance/util.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/logprep/connector/jsonl/output.py b/logprep/connector/jsonl/output.py index 03b400f3a..fa3e9eb9e 100644 --- a/logprep/connector/jsonl/output.py +++ b/logprep/connector/jsonl/output.py @@ -52,7 +52,7 @@ class Config(Output.Config): failed_events: list __slots__ = [ - "ast_timeout", + "last_timeout", "events", "failed_events", ] diff --git a/tests/acceptance/util.py b/tests/acceptance/util.py index eafa7ed90..9e8704ca7 100644 --- a/tests/acceptance/util.py +++ b/tests/acceptance/util.py @@ -141,9 +141,9 @@ def get_runner_outputs(patched_runner: Runner) -> list: try: patched_runner.start() + patched_runner.stop_and_exit() except SystemExit as error: assert not error.code, f"Runner exited with code {error.code}" - for index, output_path in enumerate(output_paths): parsed_outputs[index] = parse_jsonl(output_path) remove_file_if_exists(output_path)