diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index 414091732..589519984 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -175,6 +175,8 @@ def restart_failed_pipeline(self): pipeline_index, exit_code, ) + if self.prometheus_exporter: + self.prometheus_exporter.restart() if self._configuration.restart_count < 0: return self.restart_count += 1 @@ -193,8 +195,7 @@ def restart(self): self.set_count(self._configuration.process_count) if not self.prometheus_exporter: return - if not self.prometheus_exporter.is_running: - self.prometheus_exporter.run() + self.prometheus_exporter.restart() def _create_pipeline(self, index) -> multiprocessing.Process: pipeline = Pipeline(pipeline_index=index, config=self._configuration) diff --git a/logprep/metrics/exporter.py b/logprep/metrics/exporter.py index 6f43ca785..b9f5691eb 100644 --- a/logprep/metrics/exporter.py +++ b/logprep/metrics/exporter.py @@ -42,7 +42,6 @@ class PrometheusExporter: """Used to control the prometheus exporter and to manage the metrics""" def __init__(self, configuration: MetricsConfig): - self.is_running = False logger.debug("Initializing Prometheus Exporter") self.configuration = configuration self.server = None @@ -106,12 +105,11 @@ def mark_process_dead(self, pid): def run(self): """Starts the default prometheus http endpoint""" - port = self.configuration.port self.init_server() self._prepare_multiprocessing() self.server.start() + port = self.configuration.port logger.info("Prometheus Exporter started on port %s", port) - self.is_running = True def init_server(self) -> None: """Initializes the server""" diff --git a/tests/unit/framework/test_pipeline_manager.py b/tests/unit/framework/test_pipeline_manager.py index 64754b071..b1a19b97c 100644 --- a/tests/unit/framework/test_pipeline_manager.py +++ b/tests/unit/framework/test_pipeline_manager.py @@ -126,6 +126,21 @@ def test_restart_failed_pipelines_calls_prometheus_cleanup_method(self, tmpdir): prometheus_exporter_mock.mark_process_dead.assert_called() prometheus_exporter_mock.mark_process_dead.assert_called_with(42) + def test_restart_failed_pipelines_restarts_prometheus_server(self, tmpdir): + with mock.patch("os.environ", new={"PROMETHEUS_MULTIPROC_DIR": str(tmpdir)}): + failed_pipeline = mock.MagicMock() + failed_pipeline.is_alive = mock.MagicMock() + failed_pipeline.is_alive.return_value = False + failed_pipeline.pid = 42 + config = deepcopy(self.config) + config.metrics = {"enabled": True, "port": 1234} + config.process_count = 2 + manager = PipelineManager(config) + manager._pipelines = [failed_pipeline] + assert manager.prometheus_exporter.server is None + manager.restart_failed_pipeline() + assert manager.prometheus_exporter.server.thread.is_alive() + def test_restart_failed_pipelines_increases_number_of_failed_pipelines_metrics(self): failed_pipeline = mock.MagicMock() failed_pipeline.is_alive = mock.MagicMock() @@ -173,7 +188,6 @@ def test_restart_calls_prometheus_exporter_run(self): config = deepcopy(self.config) config.metrics = MetricsConfig(enabled=True, port=666) pipeline_manager = PipelineManager(config) - pipeline_manager.prometheus_exporter.is_running = False with mock.patch.object(pipeline_manager.prometheus_exporter, "run") as mock_run: pipeline_manager.restart() mock_run.assert_called()