diff --git a/logprep/abc/component.py b/logprep/abc/component.py index 5332e9063..2d88306e8 100644 --- a/logprep/abc/component.py +++ b/logprep/abc/component.py @@ -44,6 +44,7 @@ class Metrics: ) """Number of events that were send to error output""" + _processing_time_per_event_target: Callable = field(default=None) processing_time_per_event: HistogramMetric = field( factory=lambda: HistogramMetric( description="Time in seconds that it took to process an event", @@ -57,6 +58,7 @@ def __attrs_post_init__(self): attribute = getattr(self, attribute) if isinstance(attribute, Metric): attribute.labels = self._labels + attribute.target = self._processing_time_per_event_target attribute.tracker = attribute.init_tracker() # __dict__ is added to support functools.cached_property diff --git a/logprep/abc/processor.py b/logprep/abc/processor.py index 6cf46cc27..be18d83a1 100644 --- a/logprep/abc/processor.py +++ b/logprep/abc/processor.py @@ -1,7 +1,7 @@ """Abstract module for processors""" import time from abc import abstractmethod -from functools import reduce +from functools import partial, reduce from logging import Logger from pathlib import Path from typing import TYPE_CHECKING, List, Optional @@ -166,9 +166,11 @@ def process(self, event: dict): self._process_rule_tree(event, self._specific_tree) self._process_rule_tree(event, self._generic_tree) + @TimeMeasurement.measure_time("RuleTree processing") def _process_rule_tree(self, event: dict, tree: "RuleTree"): applied_rules = set() + @TimeMeasurement.measure_time("Rule processing") def _process_rule(event, rule): begin = time.time() self._apply_rules_wrapper(event, rule) diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index 45d4bf45c..d4f7f66e5 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -129,11 +129,6 @@ class Pipeline: class Metrics(Component.Metrics): """Tracks statistics about a pipeline""" - input: Connector.ConnectorMetrics - """Input metrics""" - output: List[Connector.ConnectorMetrics] - """Output metrics""" - kafka_offset: int = 0 """The current offset of the kafka input reader""" mean_processing_time_per_event: float = 0.0 """Mean processing time for one event""" @@ -193,6 +188,9 @@ def __init__( self.pipeline_index = pipeline_index self._encoder = msgspec.msgpack.Encoder() self._decoder = msgspec.msgpack.Decoder() + self.metrics = self.Metrics( + labels=self.metric_labels, + ) @cached_property def _process_name(self) -> str: @@ -210,17 +208,6 @@ def metric_labels(self) -> dict: """Return the metric labels for this component.""" return {"component": "pipeline"} - @cached_property - def metrics(self) -> Metrics: - """The pipeline metrics object""" - if self._prometheus_exporter is None: - return None - return self.Metrics( - input=self._input.metrics, - output=[self._output.get(output).metrics for output in self._output], - labels=self.metric_labels, - ) - @cached_property def _pipeline(self) -> tuple: self.logger.debug(f"Building '{self._process_name}'") @@ -349,9 +336,25 @@ def _get_event(self) -> dict: pass return event - @TimeMeasurement.measure_time("pipeline") + @TimeMeasurement.measure_time(name="process_event") def process_event(self, event: dict): """process all processors for one event""" + + """ + ToDos: + - TimeTracking + - Processor Specific Metrics (Pseudonymizer, Amides, DomainResolver) + - Fix Pseudonymizer str has no match + - count number warnings/errors separatley or delete them from all metrics? + - Tests + - delete metric exposer + - delete SharedCounter (Events in last 5 min: n) + - create Grafana Dashboards + - add pipelinemanager metrics (pipeline restarts) + - clean up PrometheusExporter ("remove stale metric files" stil needed?) + - + """ + event_received = self._encoder.encode(event) extra_outputs = [] for processor in self._pipeline: diff --git a/logprep/metrics/metrics.py b/logprep/metrics/metrics.py index 0cb4fd174..0695bddf7 100644 --- a/logprep/metrics/metrics.py +++ b/logprep/metrics/metrics.py @@ -1,5 +1,6 @@ """This module tracks, calculates, exposes and resets logprep metrics""" from abc import ABC, abstractmethod +from typing import Callable from attr import asdict, define, field, validators from prometheus_client import Counter, Histogram @@ -49,9 +50,11 @@ class Metric(ABC): default={}, ) tracker: object = field(default=None) + target: Callable = field(default=None) _prefix: str = "logprep_" def init_tracker(self): + tracker = None if isinstance(self, CounterMetric): tracker = Counter( name=f"{self._prefix}{self.name}", @@ -64,7 +67,7 @@ def init_tracker(self): name=f"{self._prefix}{self.name}", documentation=self.description, labelnames=self.labels.keys(), - buckets=(0.000001, 0.00001, 0.0001, 0.001, 0.01, 0.1, 1, float("inf")), + buckets=(0.000001, 0.00001, 0.0001, 0.001, 0.01, 0.1, 1), registry=None, ) tracker.labels(**self.labels) diff --git a/logprep/run_logprep.py b/logprep/run_logprep.py index 33cb3dc6c..5a295c395 100644 --- a/logprep/run_logprep.py +++ b/logprep/run_logprep.py @@ -101,7 +101,8 @@ def _run_logprep(arguments, logger: logging.Logger): logger.critical(f"A critical error occurred: {error}") if runner: runner.stop() - sys.exit(1) + raise error + # sys.exit(1) # pylint: enable=broad-except diff --git a/logprep/util/time_measurement.py b/logprep/util/time_measurement.py index 481510612..92ba79a4f 100644 --- a/logprep/util/time_measurement.py +++ b/logprep/util/time_measurement.py @@ -1,10 +1,15 @@ """This module is used to measure the execution time of functions and add the results to events.""" +import logging from socket import gethostname from time import time +from typing import TYPE_CHECKING from logprep.util.helper import camel_to_snake +if TYPE_CHECKING: + from logprep.processor.base.rule import Rule + class TimeMeasurement: """Measures the execution time of functions and adds the results to events via a decorator.""" @@ -29,15 +34,17 @@ def inner(*args, **kwargs): # nosemgrep if TimeMeasurement.TIME_MEASUREMENT_ENABLED: caller = args[0] first_argument = args[1] + second_argument = args[2] if len(args) > 2 else None begin = time() result = func(*args, **kwargs) end = time() processing_time = end - begin - - if hasattr(caller, "metrics"): - if hasattr(caller.metrics, "update_mean_processing_time_per_event"): - caller.metrics.update_mean_processing_time_per_event(processing_time) + if name in ("Rule processing",): + caller = first_argument + if name in ("RuleTree processing",): + caller = second_argument + caller.metrics.processing_time_per_event += processing_time if TimeMeasurement.APPEND_TO_EVENT and isinstance(first_argument, dict): add_processing_times_to_event(first_argument, processing_time, caller, name)