Skip to content

Commit

Permalink
fix exporter not listening afert pipeline restart
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Sep 12, 2024
1 parent 171b2d3 commit e821ae5
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 6 deletions.
5 changes: 3 additions & 2 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ def restart_failed_pipeline(self):
pipeline_index,
exit_code,
)
if self.prometheus_exporter:
self.prometheus_exporter.restart()
if self._configuration.restart_count < 0:
return
self.restart_count += 1
Expand All @@ -193,8 +195,7 @@ def restart(self):
self.set_count(self._configuration.process_count)
if not self.prometheus_exporter:
return
if not self.prometheus_exporter.is_running:
self.prometheus_exporter.run()
self.prometheus_exporter.restart()

def _create_pipeline(self, index) -> multiprocessing.Process:
pipeline = Pipeline(pipeline_index=index, config=self._configuration)
Expand Down
4 changes: 1 addition & 3 deletions logprep/metrics/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class PrometheusExporter:
"""Used to control the prometheus exporter and to manage the metrics"""

def __init__(self, configuration: MetricsConfig):
self.is_running = False
logger.debug("Initializing Prometheus Exporter")
self.configuration = configuration
self.server = None
Expand Down Expand Up @@ -106,12 +105,11 @@ def mark_process_dead(self, pid):

def run(self):
"""Starts the default prometheus http endpoint"""
port = self.configuration.port
self.init_server()
self._prepare_multiprocessing()
self.server.start()
port = self.configuration.port
logger.info("Prometheus Exporter started on port %s", port)
self.is_running = True

def init_server(self) -> None:
"""Initializes the server"""
Expand Down
16 changes: 15 additions & 1 deletion tests/unit/framework/test_pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,21 @@ def test_restart_failed_pipelines_calls_prometheus_cleanup_method(self, tmpdir):
prometheus_exporter_mock.mark_process_dead.assert_called()
prometheus_exporter_mock.mark_process_dead.assert_called_with(42)

def test_restart_failed_pipelines_restarts_prometheus_server(self, tmpdir):
with mock.patch("os.environ", new={"PROMETHEUS_MULTIPROC_DIR": str(tmpdir)}):
failed_pipeline = mock.MagicMock()
failed_pipeline.is_alive = mock.MagicMock()
failed_pipeline.is_alive.return_value = False
failed_pipeline.pid = 42
config = deepcopy(self.config)
config.metrics = {"enabled": True, "port": 1234}
config.process_count = 2
manager = PipelineManager(config)
manager._pipelines = [failed_pipeline]
assert manager.prometheus_exporter.server is None
manager.restart_failed_pipeline()
assert manager.prometheus_exporter.server.thread.is_alive()

def test_restart_failed_pipelines_increases_number_of_failed_pipelines_metrics(self):
failed_pipeline = mock.MagicMock()
failed_pipeline.is_alive = mock.MagicMock()
Expand Down Expand Up @@ -173,7 +188,6 @@ def test_restart_calls_prometheus_exporter_run(self):
config = deepcopy(self.config)
config.metrics = MetricsConfig(enabled=True, port=666)
pipeline_manager = PipelineManager(config)
pipeline_manager.prometheus_exporter.is_running = False
with mock.patch.object(pipeline_manager.prometheus_exporter, "run") as mock_run:
pipeline_manager.restart()
mock_run.assert_called()
Expand Down

0 comments on commit e821ae5

Please sign in to comment.