From 0f183a1ac5fe2eea638b8379fdf97746a84bff55 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Wed, 9 Oct 2024 18:27:41 +0200 Subject: [PATCH] fix failed event metric * move `number_of_failed_events` metric from connector to component * count this metric for input and output criticals in pipeline.py enqueue_error * count this metric for processing criticals in logprep.abc.processor.Processor.apply_rules_wrapper --- logprep/abc/component.py | 10 +++++++++- logprep/abc/connector.py | 8 -------- logprep/abc/input.py | 1 + logprep/abc/output.py | 1 + logprep/abc/processor.py | 1 + logprep/framework/pipeline.py | 10 +++++++++- tests/unit/component/base.py | 4 ++++ tests/unit/processor/amides/test_amides.py | 1 + .../processor/domain_resolver/test_domain_resolver.py | 1 + .../unit/processor/pseudonymizer/test_pseudonymizer.py | 1 + 10 files changed, 28 insertions(+), 10 deletions(-) diff --git a/logprep/abc/component.py b/logprep/abc/component.py index c988458ce..9f5a9ebe9 100644 --- a/logprep/abc/component.py +++ b/logprep/abc/component.py @@ -14,7 +14,7 @@ from attrs import asdict from schedule import Scheduler -from logprep.metrics.metrics import Metric +from logprep.metrics.metrics import CounterMetric, Metric from logprep.util.defaults import DEFAULT_HEALTH_TIMEOUT, EXITCODES from logprep.util.helper import camel_to_snake @@ -47,6 +47,14 @@ class Metrics: _labels: dict + number_of_failed_events: CounterMetric = field( + factory=lambda: CounterMetric( + description="Number of failed events", + name="number_of_failed_events", + ) + ) + """Number of failed events""" + def __attrs_post_init__(self): for attribute in asdict(self): attribute = getattr(self, attribute) diff --git a/logprep/abc/connector.py b/logprep/abc/connector.py index 28e03feec..92bb8a17f 100644 --- a/logprep/abc/connector.py +++ b/logprep/abc/connector.py @@ -21,14 +21,6 @@ class Metrics(Component.Metrics): ) """Number of successful events""" - number_of_failed_events: CounterMetric = field( - factory=lambda: CounterMetric( - description="Number of failed events", - name="number_of_failed_events", - ) - ) - """Number of failed events""" - processing_time_per_event: HistogramMetric = field( factory=lambda: HistogramMetric( description="Time in seconds that it took to store an event", diff --git a/logprep/abc/input.py b/logprep/abc/input.py index 8237ccb96..54ac099c1 100644 --- a/logprep/abc/input.py +++ b/logprep/abc/input.py @@ -27,6 +27,7 @@ class InputError(LogprepException): def __init__(self, input_connector: "Input", message: str) -> None: input_connector.metrics.number_of_errors += 1 + self.input = input_connector super().__init__(f"{self.__class__.__name__} in {input_connector.describe()}: {message}") diff --git a/logprep/abc/output.py b/logprep/abc/output.py index ab5994c20..ad9e2edac 100644 --- a/logprep/abc/output.py +++ b/logprep/abc/output.py @@ -17,6 +17,7 @@ class OutputError(LogprepException): def __init__(self, output: "Output", message: str) -> None: output.metrics.number_of_errors += 1 + self.output = output super().__init__(f"{self.__class__.__name__} in {output.describe()}: {message}") diff --git a/logprep/abc/processor.py b/logprep/abc/processor.py index 8f28b021d..c70a157c4 100644 --- a/logprep/abc/processor.py +++ b/logprep/abc/processor.py @@ -272,6 +272,7 @@ def _apply_rules_wrapper(self, event: dict, rule: "Rule"): except ProcessingWarning as error: self._handle_warning_error(event, rule, error) except ProcessingCriticalError as error: + self.metrics.number_of_failed_events += 1 self.result.errors.append(error) # is needed to prevent wrapping it in itself event.clear() except Exception as error: # pylint: disable=broad-except diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index b68dc1de8..f293b80f2 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -371,6 +371,7 @@ def enqueue_error( "errors": ", ".join((str(error.message) for error in errors)), } case CriticalInputError(): + item.input.metrics.number_of_failed_events += 1 event = {"event": str(item.raw_input), "errors": str(item.message)} case list(): event = [{"event": str(i), "errors": "Unknown error"} for i in item] @@ -400,14 +401,21 @@ def _get_output_error_event(self, item: CriticalOutputError) -> dict: event = [ {"event": str(i["event"]), "errors": str(i["errors"])} for i in item.raw_input ] + item.output.metrics.number_of_failed_events += len(event) return event case CriticalOutputError({"errors": error, "event": event}): + item.output.metrics.number_of_failed_events += 1 return {"event": str(event), "errors": str(error)} case CriticalOutputError(raw_input) if isinstance(raw_input, dict): + item.output.metrics.number_of_failed_events += 1 return {"event": str(raw_input), "errors": str(item.message)} case CriticalOutputError(raw_input) if isinstance(raw_input, (list, tuple)): - return [{"event": str(i), "errors": str(item.message)} for i in raw_input] + event = [{"event": str(i), "errors": str(item.message)} for i in raw_input] + item.output.metrics.number_of_failed_events += len(event) + return event case CriticalOutputError(raw_input) if isinstance(raw_input, (str, bytes)): + item.output.metrics.number_of_failed_events += 1 return {"event": str(raw_input), "errors": str(item.message)} case _: + item.output.metrics.number_of_failed_events += 1 return {"event": str(item.raw_input), "errors": str(item.message)} diff --git a/tests/unit/component/base.py b/tests/unit/component/base.py index c6cbe87fa..8c26f3b8f 100644 --- a/tests/unit/component/base.py +++ b/tests/unit/component/base.py @@ -31,6 +31,10 @@ class BaseComponentTestCase(ABC): metric_attributes: dict + expected_metrics = [ + "logprep_number_of_failed_events", + ] + def setup_method(self) -> None: config = {"Test Instance Name": self.CONFIG} self.object = Factory.create(configuration=config) diff --git a/tests/unit/processor/amides/test_amides.py b/tests/unit/processor/amides/test_amides.py index 8ef9e27b0..11a969187 100644 --- a/tests/unit/processor/amides/test_amides.py +++ b/tests/unit/processor/amides/test_amides.py @@ -25,6 +25,7 @@ class TestAmides(BaseProcessorTestCase): } expected_metrics = [ + "logprep_number_of_failed_events", "logprep_amides_total_cmdlines", "logprep_amides_new_results", "logprep_amides_cached_results", diff --git a/tests/unit/processor/domain_resolver/test_domain_resolver.py b/tests/unit/processor/domain_resolver/test_domain_resolver.py index bb9ea89ba..8e9918d0d 100644 --- a/tests/unit/processor/domain_resolver/test_domain_resolver.py +++ b/tests/unit/processor/domain_resolver/test_domain_resolver.py @@ -34,6 +34,7 @@ class TestDomainResolver(BaseProcessorTestCase): expected_metrics = [ "logprep_domain_resolver_total_urls", + "logprep_number_of_failed_events", "logprep_domain_resolver_resolved_new", "logprep_domain_resolver_resolved_cached", "logprep_domain_resolver_timeouts", diff --git a/tests/unit/processor/pseudonymizer/test_pseudonymizer.py b/tests/unit/processor/pseudonymizer/test_pseudonymizer.py index 1878885af..0746d120f 100644 --- a/tests/unit/processor/pseudonymizer/test_pseudonymizer.py +++ b/tests/unit/processor/pseudonymizer/test_pseudonymizer.py @@ -715,6 +715,7 @@ class TestPseudonymizer(BaseProcessorTestCase): } expected_metrics = [ + "logprep_number_of_failed_events", "logprep_pseudonymizer_pseudonymized_urls", "logprep_pseudonymizer_new_results", "logprep_pseudonymizer_cached_results",