diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index d09aa8010..7ebbb0b53 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -191,11 +191,10 @@ def stop(self): def restart(self, daemon=True): """Restarts all pipelines""" + if self.prometheus_exporter: + self.prometheus_exporter.run(daemon=daemon) self.set_count(0) self.set_count(self._configuration.process_count) - if not self.prometheus_exporter: - return - self.prometheus_exporter.run(daemon=daemon) def _create_pipeline(self, index) -> multiprocessing.Process: pipeline = Pipeline(pipeline_index=index, config=self._configuration) diff --git a/tests/unit/framework/test_pipeline_manager.py b/tests/unit/framework/test_pipeline_manager.py index 279e51474..b0d609b59 100644 --- a/tests/unit/framework/test_pipeline_manager.py +++ b/tests/unit/framework/test_pipeline_manager.py @@ -262,6 +262,15 @@ def test_restart_injects_healthcheck_functions(self): pipeline_manager.restart() pipeline_manager.prometheus_exporter.update_healthchecks.assert_called() + def test_restart_ensures_prometheus_exporter_is_running(self): + config = deepcopy(self.config) + config.metrics = MetricsConfig(enabled=True, port=666) + pipeline_manager = PipelineManager(config) + pipeline_manager.prometheus_exporter._prepare_multiprocessing = mock.MagicMock() + with mock.patch("logprep.util.http.ThreadingHTTPServer"): + pipeline_manager.restart() + pipeline_manager.prometheus_exporter.server.start.assert_called() + class TestThrottlingQueue: