diff --git a/examples/exampledata/config/grafana/dashboards/logprep-dashboard.json b/examples/exampledata/config/grafana/dashboards/logprep-dashboard.json index 6e978c600..fe6084347 100644 --- a/examples/exampledata/config/grafana/dashboards/logprep-dashboard.json +++ b/examples/exampledata/config/grafana/dashboards/logprep-dashboard.json @@ -18,7 +18,6 @@ "editable": true, "fiscalYearStartMonth": 0, "graphTooltip": 0, - "id": 2, "links": [ { "asDropdown": true, @@ -114,9 +113,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "repeatDirection": "h", "targets": [ { @@ -196,9 +197,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "repeatDirection": "h", "targets": [ { @@ -278,9 +281,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "repeatDirection": "h", "targets": [ { @@ -360,9 +365,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "repeatDirection": "h", "targets": [ { @@ -442,9 +449,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "repeatDirection": "h", "targets": [ { @@ -524,9 +533,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "repeatDirection": "h", "targets": [ { @@ -606,9 +617,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "repeatDirection": "h", "targets": [ { @@ -676,9 +689,11 @@ "fields": "/^logprep$/", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "targets": [ { "datasource": { @@ -761,9 +776,11 @@ "fields": "/^config$/", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "targets": [ { "datasource": { @@ -845,9 +862,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "targets": [ { "datasource": { @@ -934,9 +953,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "repeatDirection": "h", "targets": [ { @@ -1013,9 +1034,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "repeatDirection": "h", "targets": [ { @@ -1096,9 +1119,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "repeatDirection": "h", "targets": [ { @@ -1184,9 +1209,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "targets": [ { "datasource": { @@ -1252,9 +1279,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "targets": [ { "datasource": { @@ -1335,9 +1364,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "repeatDirection": "h", "targets": [ { @@ -1402,9 +1433,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "targets": [ { "datasource": { @@ -1422,7 +1455,6 @@ } ], "title": "Config Refresh Interval", - "transformations": [], "type": "stat" }, { @@ -1473,9 +1505,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "targets": [ { "datasource": { @@ -1493,7 +1527,6 @@ } ], "title": "Config Reloads", - "transformations": [], "type": "stat" }, { @@ -1542,9 +1575,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "targets": [ { "datasource": { @@ -1575,7 +1610,6 @@ "y": 7 }, "id": 131, - "links": [], "options": { "alertInstanceLabelFilter": "", "alertName": "", @@ -1655,9 +1689,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "targets": [ { "datasource": { @@ -1736,9 +1772,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "targets": [ { "datasource": { @@ -1817,9 +1855,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "targets": [ { "datasource": { @@ -1890,9 +1930,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "targets": [ { "datasource": { @@ -1973,9 +2015,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "repeatDirection": "h", "targets": [ { @@ -2001,7 +2045,7 @@ "type": "prometheus", "uid": "promz2394" }, - "description": "Number of objects that were collected by the garbage collector.", + "description": "Number of events in error queue", "fieldConfig": { "defaults": { "color": { @@ -2055,9 +2099,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "repeatDirection": "h", "targets": [ { @@ -2067,7 +2113,7 @@ }, "editorMode": "code", "exemplar": false, - "expr": "sum(rate(python_gc_objects_collected_total[1m]))", + "expr": "sum(logprep_number_of_events_in_error_queue)", "hide": false, "instant": false, "legendFormat": "__auto", @@ -2075,7 +2121,7 @@ "refId": "A" } ], - "title": "collected/s", + "title": "error queue", "type": "stat" }, { @@ -2122,9 +2168,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "targets": [ { "datasource": { @@ -2223,9 +2271,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "repeatDirection": "h", "targets": [ { @@ -2301,9 +2351,11 @@ "fields": "", "values": false }, - "textMode": "auto" + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true }, - "pluginVersion": "10.1.4", + "pluginVersion": "10.4.2", "repeatDirection": "h", "targets": [ { @@ -2349,6 +2401,7 @@ "mode": "palette-classic" }, "custom": { + "axisBorderShow": false, "axisCenteredZero": false, "axisColorMode": "text", "axisLabel": "", @@ -2459,6 +2512,7 @@ "mode": "palette-classic" }, "custom": { + "axisBorderShow": false, "axisCenteredZero": false, "axisColorMode": "text", "axisLabel": "", @@ -2576,6 +2630,7 @@ "mode": "palette-classic" }, "custom": { + "axisBorderShow": false, "axisCenteredZero": false, "axisColorMode": "text", "axisLabel": "", @@ -4104,8 +4159,7 @@ } ], "refresh": "5s", - "schemaVersion": 38, - "style": "dark", + "schemaVersion": 39, "tags": [ "App" ], @@ -4122,4 +4176,4 @@ "uid": "Ijmjihx7z", "version": 1, "weekStart": "" -} +} \ No newline at end of file diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index 85dc8fcf2..f054c9182 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -180,7 +180,7 @@ def __init__(self, configuration: Configuration): self.restart_timeout_ms: int = random.randint(100, 1000) self.metrics = self.Metrics(labels={"component": "manager"}) self.loghandler: LogprepMPQueueListener = None - self._error_queue: multiprocessing.Queue | None = None + self.error_queue: multiprocessing.Queue | None = None self._error_listener: ComponentQueueListener | None = None self._configuration: Configuration = configuration self._pipelines: list[multiprocessing.Process] = [] @@ -200,15 +200,15 @@ def _setup_prometheus_exporter(self): def _setup_error_queue(self): if not self._configuration.error_output: return - self._error_queue = ThrottlingQueue( + self.error_queue = ThrottlingQueue( multiprocessing.get_context(), self._configuration.error_backlog_size ) self._error_listener = ComponentQueueListener( - self._error_queue, "store", self._configuration.error_output + self.error_queue, "store", self._configuration.error_output ) self._error_listener.start() # wait for the error listener to be ready before starting the pipelines - if self._error_queue.get(block=True) is None: + if self.error_queue.get(block=True) is None: self.stop() sys.exit(EXITCODES.ERROR_OUTPUT_NOT_REACHABLE.value) @@ -325,7 +325,7 @@ def _create_pipeline(self, index) -> multiprocessing.Process: pipeline = Pipeline( pipeline_index=index, config=self._configuration, - error_queue=self._error_queue, + error_queue=self.error_queue, ) if pipeline.pipeline_index == 1 and self.prometheus_exporter: self.prometheus_exporter.update_healthchecks(pipeline.get_health_functions()) diff --git a/logprep/runner.py b/logprep/runner.py index 0448c8eda..634b6b2dd 100644 --- a/logprep/runner.py +++ b/logprep/runner.py @@ -58,6 +58,14 @@ class Runner: class Metrics(Component.Metrics): """Metrics for the Logprep Runner.""" + number_of_events_in_error_queue: GaugeMetric = field( + factory=lambda: GaugeMetric( + description="Current number of events in error queue", + name="number_of_events_in_error_queue", + ) + ) + """Current size of the error queue.""" + version_info: GaugeMetric = field( factory=lambda: GaugeMetric( description="Logprep version information", @@ -166,6 +174,8 @@ def _iterate(self): self.exit_code = EXITCODES.PIPELINE_ERROR.value self._logger.error("Restart count exceeded. Exiting.") sys.exit(self.exit_code) + if self._manager.error_queue is not None: + self.metrics.number_of_events_in_error_queue += self._manager.error_queue.qsize() self._manager.restart_failed_pipeline() def reload_configuration(self): diff --git a/tests/unit/framework/test_pipeline_manager.py b/tests/unit/framework/test_pipeline_manager.py index 0297dfe69..c8e5198ec 100644 --- a/tests/unit/framework/test_pipeline_manager.py +++ b/tests/unit/framework/test_pipeline_manager.py @@ -292,14 +292,14 @@ def test_setup_error_queue_sets_error_queue_and_starts_listener(self): with mock.patch("logprep.framework.pipeline_manager.ThrottlingQueue") as mock_queue: mock_queue.get.return_value = "not null" manager = PipelineManager(self.config) - assert manager._error_queue is not None + assert manager.error_queue is not None assert manager._error_listener is not None manager._error_listener.start.assert_called() # pylint: disable=no-member def test_setup_does_not_sets_error_queue_if_no_error_output(self): self.config.error_output = {} manager = PipelineManager(self.config) - assert manager._error_queue is None + assert manager.error_queue is None assert manager._error_listener is None def test_setup_error_queue_raises_system_exit_if_error_listener_fails(self): @@ -310,18 +310,6 @@ def test_setup_error_queue_raises_system_exit_if_error_listener_fails(self): with pytest.raises(SystemExit, match="4"): PipelineManager(self.config) - def test_should_exit_returns_bool_based_on_restart_count(self): - self.config.restart_count = 2 - with mock.patch("logprep.framework.pipeline_manager.ComponentQueueListener"): - with mock.patch("logprep.framework.pipeline_manager.ThrottlingQueue") as mock_queue: - mock_queue.get.return_value = "not null" - manager = PipelineManager(self.config) - assert not manager.should_exit() - manager.restart_count = 1 - assert not manager.should_exit() - manager.restart_count = 2 - assert manager.should_exit() - def test_stop_calls_stop_on_error_listener(self): self.config.error_output = {"dummy": {"type": "dummy_output"}} with mock.patch("logprep.framework.pipeline_manager.ComponentQueueListener"): @@ -331,16 +319,6 @@ def test_stop_calls_stop_on_error_listener(self): manager.stop() manager._error_listener.stop.assert_called() # pylint: disable=no-member - def test_stop_calls_stop_on_loghandler(self): - self.config.error_output = {"dummy": {"type": "dummy_output"}} - with mock.patch("logprep.framework.pipeline_manager.ComponentQueueListener"): - with mock.patch("logprep.framework.pipeline_manager.ThrottlingQueue.get") as mock_get: - mock_get.return_value = "not None" - manager = PipelineManager(self.config) - manager.loghandler = mock.MagicMock() - manager.stop() - manager.loghandler.stop.assert_called() - def test_restart_with_error_output_calls_pipeline_with_error_queue(self): self.config.error_output = {"dummy": {"type": "dummy_output"}} with mock.patch("multiprocessing.Process"): @@ -356,7 +334,7 @@ def test_restart_with_error_output_calls_pipeline_with_error_queue(self): mock_pipeline.assert_called_with( pipeline_index=3, # last call index config=manager._configuration, - error_queue=manager._error_queue, + error_queue=manager.error_queue, ) def test_restart_without_error_output_calls_pipeline_with_error_queue(self): @@ -377,12 +355,6 @@ def test_restart_without_error_output_calls_pipeline_with_error_queue(self): error_queue=None, ) - def test_reload_calls_set_count_twice(self): - with mock.patch.object(self.manager, "set_count") as mock_set_count: - self.manager.reload() - # drains pipelines down to 0 and scales up to 3 afterwards - mock_set_count.assert_has_calls([mock.call(0), mock.call(3)]) - class TestThrottlingQueue: diff --git a/tests/unit/test_runner.py b/tests/unit/test_runner.py index 0fa9776b3..7c533bd49 100644 --- a/tests/unit/test_runner.py +++ b/tests/unit/test_runner.py @@ -131,6 +131,18 @@ def test_iteration_calls_run_pending(self, mock_run_pending, runner): runner.start() mock_run_pending.call_count = 3 + def test_iteration_sets_error_queue_size(self, runner): + with mock.patch.object(runner, "_manager") as mock_manager: + mock_manager.restart_count = 0 + runner.metrics.number_of_events_in_error_queue = 0 + mock_manager.should_exit.side_effect = [False, False, True] + mock_manager.error_queue.qsize.return_value = 42 + with pytest.raises(SystemExit): + runner.start() + assert ( + runner.metrics.number_of_events_in_error_queue == 84 + ) # because of mocking with int + def test_iteration_calls_should_exit(self, runner): with mock.patch.object(runner, "_manager") as mock_manager: mock_manager.restart_count = 0