Skip to content

Commit

Permalink
fix failed event metric
Browse files Browse the repository at this point in the history
* move `number_of_failed_events` metric from connector to component
* count this metric for input and output criticals in pipeline.py enqueue_error
* count this metric for processing criticals in logprep.abc.processor.Processor.apply_rules_wrapper
  • Loading branch information
ekneg54 committed Oct 9, 2024
1 parent 81ababa commit 0f183a1
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 10 deletions.
10 changes: 9 additions & 1 deletion logprep/abc/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from attrs import asdict
from schedule import Scheduler

from logprep.metrics.metrics import Metric
from logprep.metrics.metrics import CounterMetric, Metric
from logprep.util.defaults import DEFAULT_HEALTH_TIMEOUT, EXITCODES
from logprep.util.helper import camel_to_snake

Expand Down Expand Up @@ -47,6 +47,14 @@ class Metrics:

_labels: dict

number_of_failed_events: CounterMetric = field(
factory=lambda: CounterMetric(
description="Number of failed events",
name="number_of_failed_events",
)
)
"""Number of failed events"""

def __attrs_post_init__(self):
for attribute in asdict(self):
attribute = getattr(self, attribute)
Expand Down
8 changes: 0 additions & 8 deletions logprep/abc/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,6 @@ class Metrics(Component.Metrics):
)
"""Number of successful events"""

number_of_failed_events: CounterMetric = field(
factory=lambda: CounterMetric(
description="Number of failed events",
name="number_of_failed_events",
)
)
"""Number of failed events"""

processing_time_per_event: HistogramMetric = field(
factory=lambda: HistogramMetric(
description="Time in seconds that it took to store an event",
Expand Down
1 change: 1 addition & 0 deletions logprep/abc/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class InputError(LogprepException):

def __init__(self, input_connector: "Input", message: str) -> None:
input_connector.metrics.number_of_errors += 1
self.input = input_connector
super().__init__(f"{self.__class__.__name__} in {input_connector.describe()}: {message}")


Expand Down
1 change: 1 addition & 0 deletions logprep/abc/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class OutputError(LogprepException):

def __init__(self, output: "Output", message: str) -> None:
output.metrics.number_of_errors += 1
self.output = output
super().__init__(f"{self.__class__.__name__} in {output.describe()}: {message}")


Expand Down
1 change: 1 addition & 0 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ def _apply_rules_wrapper(self, event: dict, rule: "Rule"):
except ProcessingWarning as error:
self._handle_warning_error(event, rule, error)
except ProcessingCriticalError as error:
self.metrics.number_of_failed_events += 1
self.result.errors.append(error) # is needed to prevent wrapping it in itself
event.clear()
except Exception as error: # pylint: disable=broad-except
Expand Down
10 changes: 9 additions & 1 deletion logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ def enqueue_error(
"errors": ", ".join((str(error.message) for error in errors)),
}
case CriticalInputError():
item.input.metrics.number_of_failed_events += 1
event = {"event": str(item.raw_input), "errors": str(item.message)}
case list():
event = [{"event": str(i), "errors": "Unknown error"} for i in item]
Expand Down Expand Up @@ -400,14 +401,21 @@ def _get_output_error_event(self, item: CriticalOutputError) -> dict:
event = [
{"event": str(i["event"]), "errors": str(i["errors"])} for i in item.raw_input
]
item.output.metrics.number_of_failed_events += len(event)
return event
case CriticalOutputError({"errors": error, "event": event}):
item.output.metrics.number_of_failed_events += 1
return {"event": str(event), "errors": str(error)}
case CriticalOutputError(raw_input) if isinstance(raw_input, dict):
item.output.metrics.number_of_failed_events += 1
return {"event": str(raw_input), "errors": str(item.message)}
case CriticalOutputError(raw_input) if isinstance(raw_input, (list, tuple)):
return [{"event": str(i), "errors": str(item.message)} for i in raw_input]
event = [{"event": str(i), "errors": str(item.message)} for i in raw_input]
item.output.metrics.number_of_failed_events += len(event)
return event
case CriticalOutputError(raw_input) if isinstance(raw_input, (str, bytes)):
item.output.metrics.number_of_failed_events += 1
return {"event": str(raw_input), "errors": str(item.message)}
case _:
item.output.metrics.number_of_failed_events += 1
return {"event": str(item.raw_input), "errors": str(item.message)}
4 changes: 4 additions & 0 deletions tests/unit/component/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ class BaseComponentTestCase(ABC):

metric_attributes: dict

expected_metrics = [
"logprep_number_of_failed_events",
]

def setup_method(self) -> None:
config = {"Test Instance Name": self.CONFIG}
self.object = Factory.create(configuration=config)
Expand Down
1 change: 1 addition & 0 deletions tests/unit/processor/amides/test_amides.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class TestAmides(BaseProcessorTestCase):
}

expected_metrics = [
"logprep_number_of_failed_events",
"logprep_amides_total_cmdlines",
"logprep_amides_new_results",
"logprep_amides_cached_results",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class TestDomainResolver(BaseProcessorTestCase):

expected_metrics = [
"logprep_domain_resolver_total_urls",
"logprep_number_of_failed_events",
"logprep_domain_resolver_resolved_new",
"logprep_domain_resolver_resolved_cached",
"logprep_domain_resolver_timeouts",
Expand Down
1 change: 1 addition & 0 deletions tests/unit/processor/pseudonymizer/test_pseudonymizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,7 @@ class TestPseudonymizer(BaseProcessorTestCase):
}

expected_metrics = [
"logprep_number_of_failed_events",
"logprep_pseudonymizer_pseudonymized_urls",
"logprep_pseudonymizer_new_results",
"logprep_pseudonymizer_cached_results",
Expand Down

0 comments on commit 0f183a1

Please sign in to comment.