From 35afb4d080d2e1211a82ed2efe68f76d1e40ea83 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Mon, 16 Oct 2023 11:35:48 +0000 Subject: [PATCH] WIP --- logprep/abc/component.py | 23 ------ logprep/abc/connector.py | 46 ++++++----- logprep/abc/processor.py | 36 +++++++-- logprep/connector/confluent_kafka/input.py | 2 +- logprep/connector/confluent_kafka/output.py | 2 +- logprep/framework/rule_tree/rule_tree.py | 18 +++-- logprep/metrics/metrics.py | 67 +++++++-------- logprep/processor/base/rule.py | 39 ++++++--- logprep/run_logprep.py | 3 - logprep/util/time_measurement.py | 56 ++++--------- tests/unit/metrics/test_metrics.py | 60 +++++++++++--- tests/unit/processor/base.py | 60 +++----------- tests/unit/util/test_time_measurement.py | 90 --------------------- 13 files changed, 204 insertions(+), 298 deletions(-) diff --git a/logprep/abc/component.py b/logprep/abc/component.py index 229cd9259..2a113e6f7 100644 --- a/logprep/abc/component.py +++ b/logprep/abc/component.py @@ -28,30 +28,7 @@ class Metrics: _labels: dict - number_of_processed_events: CounterMetric = field( - factory=lambda: CounterMetric( - description="Number of events that were processed", - name="number_of_processed_events", - ) - ) - """Number of events that were processed""" - - number_of_failed_events: CounterMetric = field( - factory=lambda: CounterMetric( - description="Number of events that were send to error output", - name="number_of_failed_events", - ) - ) - """Number of events that were send to error output""" - _processing_time_per_event_target: Callable = field(default=None) - processing_time_per_event: HistogramMetric = field( - factory=lambda: HistogramMetric( - description="Time in seconds that it took to process an event", - name="processing_time_per_event", - ) - ) - """Time in seconds that it took to process an event""" def __attrs_post_init__(self): for attribute in asdict(self): diff --git a/logprep/abc/connector.py b/logprep/abc/connector.py index 9482af737..b2aea14dc 100644 --- a/logprep/abc/connector.py +++ b/logprep/abc/connector.py @@ -1,36 +1,46 @@ """ abstract module for connectors""" -from attr import define +from attrs import define, field from logprep.abc.component import Component -from logprep.metrics.metrics import calculate_new_average +from logprep.metrics.metrics import CounterMetric, HistogramMetric class Connector(Component): """Abstract Connector Class to define the Interface""" @define(kw_only=True) - class ConnectorMetrics(Component.Metrics): + class Metrics(Component.Metrics): """Tracks statistics about this connector""" - mean_processing_time_per_event: float = 0.0 - """Mean processing time for one event""" - _mean_processing_time_sample_counter: int = 0 - """Helper to calculate mean processing time""" + number_of_processed_events: CounterMetric = field( + factory=lambda: CounterMetric( + description="Number of events that were processed", + name="number_of_processed_events", + ) + ) + """Number of events that were processed""" + + number_of_failed_events: CounterMetric = field( + factory=lambda: CounterMetric( + description="Number of events that were send to error output", + name="number_of_failed_events", + ) + ) + """Number of events that were send to error output""" + + processing_time_per_event: HistogramMetric = field( + factory=lambda: HistogramMetric( + description="Time in seconds that it took to process an event", + name="processing_time_per_event", + ) + ) + """Time in seconds that it took to process an event""" + number_of_warnings: int = 0 """Number of warnings that occurred while processing events""" number_of_errors: int = 0 """Number of errors that occurred while processing events""" - def update_mean_processing_time_per_event(self, new_sample): - """Updates the mean processing time per event""" - new_avg, new_sample_counter = calculate_new_average( - self.mean_processing_time_per_event, - new_sample, - self._mean_processing_time_sample_counter, - ) - self.mean_processing_time_per_event = new_avg - self._mean_processing_time_sample_counter = new_sample_counter - __slots__ = ["metrics"] - metrics: ConnectorMetrics + metrics: Metrics diff --git a/logprep/abc/processor.py b/logprep/abc/processor.py index 877015413..2fc2ee1fb 100644 --- a/logprep/abc/processor.py +++ b/logprep/abc/processor.py @@ -2,7 +2,7 @@ import time from abc import abstractmethod from functools import partial, reduce -from logging import Logger +from logging import DEBUG, Logger from pathlib import Path from typing import TYPE_CHECKING, List, Optional @@ -10,6 +10,7 @@ from logprep.abc.component import Component from logprep.framework.rule_tree.rule_tree import RuleTree, RuleTreeType +from logprep.metrics.metrics import HistogramMetric from logprep.processor.base.exceptions import ( FieldExistsWarning, ProcessingCriticalError, @@ -73,9 +74,17 @@ class Config(Component.Config): class Metrics(Component.Metrics): """Tracks statistics about this processor""" - mean_processing_time_per_event: float = 0.0 - """Mean processing time for one event""" - _mean_processing_time_sample_counter: int = 0 + number_of_processed_events = field(default=None) + number_of_failed_events = field(default=None) + + processing_time_per_event: HistogramMetric = field( + factory=lambda: HistogramMetric( + description="Time in seconds that it took to process an event", + name="processing_time_per_event", + ) + ) + """Time in seconds that it took to process an event""" + number_of_warnings: int = 0 """Number of warnings that occurred while processing events""" number_of_errors: int = 0 @@ -151,7 +160,6 @@ def metric_labels(self) -> dict: """Return the metric labels for this component.""" return super().metric_labels | {"component": "processor", "type": self._config.type} - @TimeMeasurement.measure_time() def process(self, event: dict): """Process a log event by calling the implemented `process` method of the strategy object set in `_strategy` attribute. @@ -162,7 +170,6 @@ def process(self, event: dict): A dictionary representing a log event. """ - self.metrics.number_of_processed_events += 1 self._logger.debug(f"{self.describe()} processing event {event}") self._process_rule_tree(event, self._specific_tree) self._process_rule_tree(event, self._generic_tree) @@ -172,7 +179,7 @@ def _process_rule_tree(self, event: dict, tree: "RuleTree"): applied_rules = set() @TimeMeasurement.measure_time("Rule processing") - def _process_rule(event, rule): + def _process_rule(_, event, rule): begin = time.time() self._apply_rules_wrapper(event, rule) processing_time = time.time() - begin @@ -184,7 +191,7 @@ def _process_rule(event, rule): if self._config.apply_multiple_times: matching_rules = tree.get_matching_rules(event) while matching_rules: - reduce(_process_rule, (event, *matching_rules)) + reduce(_process_rule, (None, event, *matching_rules)) matching_rules = set(tree.get_matching_rules(event)).difference(applied_rules) else: reduce(_process_rule, (event, *tree.get_matching_rules(event))) @@ -262,6 +269,11 @@ def load_rules(self, specific_rules_targets: List[str], generic_rules_targets: L rules = self.rule_class.create_rules_from_target(generic_rules_target) for rule in rules: self._generic_tree.add_rule(rule, self._logger) + if self._logger.isEnabledFor(DEBUG): # pragma: no cover + number_specific_rules = self._specific_tree.number_of_rules + self._logger.debug(f"{self.describe()} loaded {number_specific_rules} specific rules") + number_generic_rules = self._generic_tree.number_of_rules + self._logger.debug(f"{self.describe()} loaded {number_generic_rules} generic rules") @staticmethod def _field_exists(event: dict, dotted_field: str) -> bool: @@ -292,6 +304,14 @@ def _has_missing_values(self, event, rule, source_field_dict): dict(filter(lambda x: x[1] in [None, ""], source_field_dict.items())).keys() ) if missing_fields: + if rule.ignore_missing_fields: + return True + if rule.ignore_missing_fields: + return True + if rule.ignore_missing_fields: + return True + if rule.ignore_missing_fields: + return True if rule.ignore_missing_fields: return True error = BaseException(f"{self.name}: no value for fields: {missing_fields}") diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index 1e75532fa..fd52710ab 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -77,7 +77,7 @@ class ConfluentKafkaInput(Input): """A kafka input connector.""" @define(kw_only=True, slots=False) - class ConnectorMetrics(Input.ConnectorMetrics): + class Metrics(Input.Metrics): """Metrics for ConfluentKafkaInput""" _prefix = "logprep_connector_input_kafka_" diff --git a/logprep/connector/confluent_kafka/output.py b/logprep/connector/confluent_kafka/output.py index 5f8294305..34cfeb65e 100644 --- a/logprep/connector/confluent_kafka/output.py +++ b/logprep/connector/confluent_kafka/output.py @@ -52,7 +52,7 @@ class ConfluentKafkaOutput(Output): """A kafka connector that serves as output connector.""" @define(kw_only=True, slots=False) - class Metrics(Output.ConnectorMetrics): + class Metrics(Output.Metrics): """Metrics for ConfluentKafkaOutput""" _prefix = "logprep_connector_output_kafka_" diff --git a/logprep/framework/rule_tree/rule_tree.py b/logprep/framework/rule_tree/rule_tree.py index d2062efb0..6fa0fba8a 100644 --- a/logprep/framework/rule_tree/rule_tree.py +++ b/logprep/framework/rule_tree/rule_tree.py @@ -9,6 +9,7 @@ from logprep.abc.component import Component from logprep.framework.rule_tree.node import Node from logprep.framework.rule_tree.rule_parser import RuleParser +from logprep.metrics.metrics import HistogramMetric from logprep.util import getter if TYPE_CHECKING: @@ -34,6 +35,13 @@ class Metrics(Component.Metrics): number_of_processed_events = field(default=None) number_of_failed_events = field(default=None) + processing_time_per_event: HistogramMetric = field( + factory=lambda: HistogramMetric( + description="Time in seconds that it took to process an event", + name="processing_time_per_event", + ) + ) + """Time in seconds that it took to process an event""" __slots__ = ( "rule_parser", @@ -45,7 +53,6 @@ class Metrics(Component.Metrics): "_processor_type", "_processor_name", "_root", - "_number_of_rules" ) rule_parser: Optional[RuleParser] @@ -57,7 +64,6 @@ class Metrics(Component.Metrics): _processor_config: "Processor.Config" _processor_type: str _root: Node - _number_of_rules: int def __init__( self, @@ -88,7 +94,6 @@ def __init__( self._processor_type = processor_config.type if processor_name is not None else "" self._setup() self.metrics = self.Metrics(labels=self.metric_labels) - self._number_of_rules = 0 if root: self._root = root @@ -105,6 +110,10 @@ def metric_labels(self) -> dict: "processor_type": self._processor_type, } + @property + def number_of_rules(self) -> int: + return len(self._rule_mapping) + def _setup(self): """Basic setup of rule tree. @@ -148,12 +157,11 @@ def add_rule(self, rule: "Rule", logger: Logger = None): f"\nIgnore and continue with next rule." ) return - self._number_of_rules += 1 for rule_segment in parsed_rule: end_node = self._add_parsed_rule(rule_segment) if rule not in end_node.matching_rules: end_node.matching_rules.append(rule) - self._rule_mapping[rule] = self._number_of_rules - 1 + self._rule_mapping[rule] = self.number_of_rules def _add_parsed_rule(self, parsed_rule: list): """Add parsed rule to rule tree. diff --git a/logprep/metrics/metrics.py b/logprep/metrics/metrics.py index 718fba45b..9129a1f3d 100644 --- a/logprep/metrics/metrics.py +++ b/logprep/metrics/metrics.py @@ -3,9 +3,7 @@ from typing import Callable from attr import asdict, define, field, validators -from prometheus_client import Counter, Histogram - -LOGPREP_REGISTRY = None # to inject a custom registry +from prometheus_client import CollectorRegistry, Counter, Histogram def is_public(attribute, _): @@ -51,62 +49,57 @@ class Metric(ABC): ], default={}, ) - tracker: object = field(default=None) + trackers: dict = {} target: Callable = field(default=None) + _registry: CollectorRegistry = field(default=None) _prefix: str = "logprep_" + @property + def fullname(self): + return f"{self._prefix}{self.name}" + def init_tracker(self): tracker = None - if isinstance(self, CounterMetric): - tracker = Counter( - name=f"{self._prefix}{self.name}", - documentation=self.description, - labelnames=self.labels.keys(), - registry=LOGPREP_REGISTRY, - ) - if isinstance(self, HistogramMetric): - tracker = Histogram( - name=f"{self._prefix}{self.name}", - documentation=self.description, - labelnames=self.labels.keys(), - buckets=(0.000001, 0.00001, 0.0001, 0.001, 0.01, 0.1, 1), - registry=LOGPREP_REGISTRY, - ) - tracker.labels(**self.labels) - self.tracker = tracker + try: + if isinstance(self, CounterMetric): + tracker = Counter( + name=self.fullname, + documentation=self.description, + labelnames=self.labels.keys(), + registry=self._registry, + ) + if isinstance(self, HistogramMetric): + tracker = Histogram( + name=self.fullname, + documentation=self.description, + labelnames=self.labels.keys(), + buckets=(0.000001, 0.00001, 0.0001, 0.001, 0.01, 0.1, 1), + registry=self._registry, + ) + tracker.labels(**self.labels) + + self.trackers.update({self.fullname: tracker}) + except ValueError: + self.trackers.get(self.fullname).labels(**self.labels) @abstractmethod def __add__(self, other): """Add""" - @abstractmethod - def __eq__(self, __value: object) -> bool: - """Equal""" - @define(kw_only=True) class CounterMetric(Metric): - @property - def _value(self) -> int: - return int(self.tracker.collect()[-1].samples[0].value) - def __add__(self, other): - self.tracker.labels(**self.labels).inc(other) + self.trackers.get(self.fullname).labels(**self.labels).inc(other) return self - def __eq__(self, __value: object) -> bool: - return self._value == int(__value) - @define(kw_only=True) class HistogramMetric(Metric): def __add__(self, other): - self.tracker.labels(**self.labels).observe(other) + self.trackers.get(self.fullname).labels(**self.labels).observe(other) return self - def __eq__(self, __value: object) -> bool: - raise NotImplementedError - def calculate_new_average(current_average, next_sample, sample_counter): """Calculate a new average by combining a new sample with a sample counter""" diff --git a/logprep/processor/base/rule.py b/logprep/processor/base/rule.py index 7482e1488..3faa3f577 100644 --- a/logprep/processor/base/rule.py +++ b/logprep/processor/base/rule.py @@ -142,7 +142,11 @@ from logprep.abc.processor import Processor from logprep.filter.expression.filter_expression import FilterExpression from logprep.filter.lucene_filter import LuceneFilter -from logprep.metrics.metrics import calculate_new_average +from logprep.metrics.metrics import ( + CounterMetric, + HistogramMetric, + calculate_new_average, +) from logprep.processor.base.exceptions import InvalidRuleDefinitionError from logprep.util.getter import GetterFactory from logprep.util.helper import camel_to_snake @@ -204,19 +208,32 @@ class Config: """ @define(kw_only=True) - class RuleMetrics(Component.Metrics): + class Metrics(Component.Metrics): """Tracks statistics about the current rule""" - _mean_processing_time: float = 0.0 - _mean_processing_time_sample_counter: int = 0 + number_of_processed_events: CounterMetric = field( + factory=lambda: CounterMetric( + description="Number of events that were processed", + name="number_of_processed_events", + ) + ) + """Number of events that were processed""" + + number_of_failed_events: CounterMetric = field( + factory=lambda: CounterMetric( + description="Number of events that were send to error output", + name="number_of_failed_events", + ) + ) + """Number of events that were send to error output""" - def update_mean_processing_time(self, new_sample): - """Updates the mean processing time of this rule""" - new_avg, new_sample_counter = calculate_new_average( - self._mean_processing_time, new_sample, self._mean_processing_time_sample_counter + processing_time_per_event: HistogramMetric = field( + factory=lambda: HistogramMetric( + description="Time in seconds that it took to process an event", + name="processing_time_per_event", ) - self._mean_processing_time = new_avg - self._mean_processing_time_sample_counter = new_sample_counter + ) + """Time in seconds that it took to process an event""" special_field_types = ["regex_fields", "sigma_fields", "ip_fields", "tests", "tag_on_failure"] @@ -243,7 +260,7 @@ def __init__(self, filter_rule: FilterExpression, config: Config, processor_name self._special_fields = None self.file_name = None self._config = config - self.metrics = self.RuleMetrics(labels=self.metric_labels) + self.metrics = self.Metrics(labels=self.metric_labels) def __eq__(self, other: "Rule") -> bool: return all([other.filter == self._filter, other._config == self._config]) diff --git a/logprep/run_logprep.py b/logprep/run_logprep.py index 5a295c395..59fb7422c 100644 --- a/logprep/run_logprep.py +++ b/logprep/run_logprep.py @@ -170,9 +170,6 @@ def _load_configuration(args): def _setup_metrics_and_time_measurement(args, config, logger): measure_time_config = config.get("metrics", {}).get("measure_time", {}) - TimeMeasurement.TIME_MEASUREMENT_ENABLED = measure_time_config.get("enabled", False) - TimeMeasurement.APPEND_TO_EVENT = measure_time_config.get("append_to_event", False) - logger.debug(f'Metric export enabled: {config.get("metrics", {}).get("enabled", False)}') logger.debug(f"Time measurement enabled: {TimeMeasurement.TIME_MEASUREMENT_ENABLED}") logger.debug(f"Config path: {args.config}") diff --git a/logprep/util/time_measurement.py b/logprep/util/time_measurement.py index 92ba79a4f..b89028305 100644 --- a/logprep/util/time_measurement.py +++ b/logprep/util/time_measurement.py @@ -1,23 +1,11 @@ """This module is used to measure the execution time of functions and add the results to events.""" -import logging -from socket import gethostname from time import time -from typing import TYPE_CHECKING - -from logprep.util.helper import camel_to_snake - -if TYPE_CHECKING: - from logprep.processor.base.rule import Rule class TimeMeasurement: """Measures the execution time of functions and adds the results to events via a decorator.""" - TIME_MEASUREMENT_ENABLED = False - APPEND_TO_EVENT = False - HOSTNAME = gethostname() - @staticmethod def measure_time(name: str = None): """Decorate function to measure execution time for function and add results to event. @@ -26,41 +14,25 @@ def measure_time(name: str = None): ---------- name : str Name to write processing times to in event. - """ def inner_decorator(func): def inner(*args, **kwargs): # nosemgrep - if TimeMeasurement.TIME_MEASUREMENT_ENABLED: - caller = args[0] - first_argument = args[1] - second_argument = args[2] if len(args) > 2 else None - begin = time() - result = func(*args, **kwargs) - end = time() - - processing_time = end - begin - if name in ("Rule processing",): - caller = first_argument - if name in ("RuleTree processing",): - caller = second_argument + caller = args[0] + first_argument = args[1] + second_argument = args[2] if len(args) > 2 else None + begin = time() + result = func(*args, **kwargs) + end = time() + + processing_time = end - begin + if name in ("Rule processing",): + caller = first_argument caller.metrics.processing_time_per_event += processing_time - - if TimeMeasurement.APPEND_TO_EVENT and isinstance(first_argument, dict): - add_processing_times_to_event(first_argument, processing_time, caller, name) - return result - return func(*args, **kwargs) - - def add_processing_times_to_event(event, processing_time, caller, name): # nosemgrep - if not event: - return - if not event.get("processing_times"): - event["processing_times"] = {} - if name is None: - name = f"{camel_to_snake(caller.__class__.__name__)}" - event["processing_times"][name] = float(f"{processing_time:.10f}") - if "hostname" not in event["processing_times"].keys(): - event["processing_times"]["hostname"] = TimeMeasurement.HOSTNAME + if name in ("RuleTree processing",): + caller = second_argument + caller.metrics.processing_time_per_event += processing_time + return result return inner diff --git a/tests/unit/metrics/test_metrics.py b/tests/unit/metrics/test_metrics.py index cf4169509..ede975d82 100644 --- a/tests/unit/metrics/test_metrics.py +++ b/tests/unit/metrics/test_metrics.py @@ -2,6 +2,7 @@ # pylint: disable=protected-access # pylint: disable=attribute-defined-outside-init +import re from prometheus_client import CollectorRegistry, Counter, generate_latest @@ -12,32 +13,34 @@ class TestsMetrics: def setup_method(self): self.custom_registry = CollectorRegistry() - metrics.LOGPREP_REGISTRY = self.custom_registry def test_init_tracker_creates_metric(self): metric = CounterMetric( name="testmetric", description="empty description", labels={"A": "a"}, + registry=self.custom_registry, ) metric.init_tracker() - assert isinstance(metric.tracker, Counter) + assert isinstance(metric.trackers.get(metric.fullname), Counter) def test_counter_metric_sets_labels(self): metric = CounterMetric( name="bla", description="empty description", labels={"pipeline": "pipeline-1"}, + registry=self.custom_registry, ) metric.init_tracker() - assert metric.tracker._labelnames == ("pipeline",) - assert ("pipeline-1",) in metric.tracker._metrics + assert metric.trackers.get(metric.fullname)._labelnames == ("pipeline",) + assert ("pipeline-1",) in metric.trackers.get(metric.fullname)._metrics def test_counter_metric_increments_correctly(self): metric = CounterMetric( name="bla", description="empty description", labels={"pipeline": "1"}, + registry=self.custom_registry, ) metric.init_tracker() metric += 1 @@ -49,6 +52,7 @@ def test_counter_metric_increments_second(self): name="bla", description="empty description", labels={"pipeline": "1"}, + registry=self.custom_registry, ) metric.init_tracker() metric += 1 @@ -56,14 +60,48 @@ def test_counter_metric_increments_second(self): metric_output = generate_latest(self.custom_registry).decode("utf-8") assert 'logprep_bla_total{pipeline="1"} 2.0' in metric_output - def test_metric_check_equality(self): - metric = CounterMetric( + def test_no_duplicated_counter_is_created(self): + metric1 = CounterMetric( name="bla", description="empty description", labels={"pipeline": "1"}, + registry=self.custom_registry, ) - metric.init_tracker() - assert metric == 0 - metric += 1 - assert metric == 1.0 - assert metric == 1 + metric1.init_tracker() + metric2 = CounterMetric( + name="bla", + description="empty description", + labels={"pipeline": "1"}, + registry=self.custom_registry, + ) + metric2.init_tracker() + + assert metric1.trackers == metric2.trackers + metric1 += 1 + metric_output = generate_latest(self.custom_registry).decode("utf-8") + result = re.findall(r'.*logprep_bla_total\{pipeline="1"\} 1\.0.*', metric_output) + assert len(result) == 1 + + def test_no_duplicated_counter_is_created_2(self): + metric1 = CounterMetric( + name="bla", + description="empty description", + labels={"pipeline": "1"}, + registry=self.custom_registry, + ) + metric1.init_tracker() + metric2 = CounterMetric( + name="bla", + description="empty description", + labels={"pipeline": "2"}, + registry=self.custom_registry, + ) + metric2.init_tracker() + + assert metric1.trackers == metric2.trackers + metric1 += 1 + metric_output = generate_latest(self.custom_registry).decode("utf-8") + result = re.findall(r'.*logprep_bla_total\{pipeline="1"\} 1\.0.*', metric_output) + assert len(result) == 1 + result = re.findall(r'.*logprep_bla_total\{pipeline="2"\} 0\.0.*', metric_output) + assert len(result) == 1 diff --git a/tests/unit/processor/base.py b/tests/unit/processor/base.py index 6e233d105..18c85e3b2 100644 --- a/tests/unit/processor/base.py +++ b/tests/unit/processor/base.py @@ -11,6 +11,7 @@ import pytest import requests import responses +from prometheus_client import CollectorRegistry from ruamel.yaml import YAML from logprep.abc.processor import Processor @@ -87,15 +88,12 @@ def setup_method(self) -> None: """ setUp class for the imported TestCase """ - TimeMeasurement.TIME_MEASUREMENT_ENABLED = False - TimeMeasurement.APPEND_TO_EVENT = False self.patchers = [] for name, kwargs in self.mocks.items(): patcher = mock.patch(name, **kwargs) patcher.start() self.patchers.append(patcher) config = {"Test Instance Name": self.CONFIG} - metrics.LOGPREP_REGISTRY = None self.object = Factory.create(configuration=config, logger=self.logger) self.specific_rules = self.set_rules(self.specific_rules_dirs) self.generic_rules = self.set_rules(self.generic_rules_dirs) @@ -109,15 +107,6 @@ def teardown_method(self) -> None: def test_is_a_processor_implementation(self): assert isinstance(self.object, Processor) - def test_process(self): - assert self.object.metrics.number_of_processed_events == 0 - document = { - "event_id": "1234", - "message": "user root logged in", - } - self.object.process(document) - assert self.object.metrics.number_of_processed_events == 1 - def test_generic_specific_rule_trees(self): assert isinstance(self.object._generic_tree, RuleTree) assert isinstance(self.object._specific_tree, RuleTree) @@ -126,23 +115,18 @@ def test_generic_specific_rule_trees_not_empty(self): assert self.object._generic_tree.get_size() > 0 assert self.object._specific_tree.get_size() > 0 - def test_event_processed_count(self): - assert isinstance(self.object.metrics.number_of_processed_events, CounterMetric) - - def test_events_processed_count_counts(self): - assert self.object.metrics.number_of_processed_events == 0 - document = {"foo": "bar"} - for i in range(1, 11): - try: - self.object.process(document) - except ProcessingWarning: - pass - assert self.object.metrics.number_of_processed_events == i - def test_field_exists(self): event = {"a": {"b": "I do not matter"}} assert self.object._field_exists(event, "a.b") + @mock.patch("logging.Logger.debug") + def test_load_rules_with_debug(self, mock_debug): + self.object.load_rules( + specific_rules_targets=self.specific_rules_dirs, + generic_rules_targets=self.generic_rules_dirs, + ) + mock_debug.assert_called() + def test_load_rules(self): self.object._generic_tree = RuleTree() self.object._specific_tree = RuleTree() @@ -220,24 +204,6 @@ def test_rules_returns_all_specific_and_generic_rules(self): object_rules_count = len(self.object.rules) assert all_rules_count == object_rules_count - def test_process_is_measured(self): - TimeMeasurement.TIME_MEASUREMENT_ENABLED = True - TimeMeasurement.APPEND_TO_EVENT = True - event = {"some": "event"} - self.object.process(event) - processing_times = event.get("processing_times") - assert processing_times - - def test_process_measurements_appended_under_processor_config_name(self): - TimeMeasurement.TIME_MEASUREMENT_ENABLED = True - TimeMeasurement.APPEND_TO_EVENT = True - event = {"some": "event"} - self.object.process(event) - processing_times = event.get("processing_times") - config_name = camel_to_snake(self.object.__class__.__name__) - assert processing_times[config_name] - assert isinstance(processing_times[config_name], float) - @mock.patch("logging.Logger.debug") def test_process_writes_debug_messages(self, mock_debug): event = {} @@ -288,12 +254,10 @@ def test_processor_metrics_counts_processed_events(self): def test_metrics_update_mean_processing_times_and_sample_counter(self, get_matching_rules_mock): get_matching_rules_mock.return_value = [mock.MagicMock()] self.object._apply_rules = mock.MagicMock() - assert self.object.metrics.mean_processing_time_per_event == 0 - assert self.object.metrics._mean_processing_time_sample_counter == 0 + assert self.object.metrics.processing_time_per_event == 0 event = {"test": "event"} self.object.process(event) - assert self.object.metrics.mean_processing_time_per_event > 0 - assert self.object.metrics._mean_processing_time_sample_counter == 2 + assert self.object.metrics.processing_time_per_event > 0 @responses.activate def test_accepts_tree_config_from_http(self): @@ -302,7 +266,7 @@ def test_accepts_tree_config_from_http(self): tree_config = Path("tests/testdata/unit/tree_config.json").read_text() responses.add(responses.GET, "http://does.not.matter.bla/tree_config.yml", tree_config) processor = Factory.create({"test instance": config}, self.logger) - assert processor._specific_tree._processor_config.tree_config == "http://does.not.matter.bla/tree_config.yml" + assert processor._specific_tree._config_path == "http://does.not.matter.bla/tree_config.yml" tree_config = json.loads(tree_config) assert processor._specific_tree.priority_dict == tree_config.get("priority_dict") diff --git a/tests/unit/util/test_time_measurement.py b/tests/unit/util/test_time_measurement.py index cb9aee1ae..8fe82378e 100644 --- a/tests/unit/util/test_time_measurement.py +++ b/tests/unit/util/test_time_measurement.py @@ -1,9 +1,6 @@ # pylint: disable=protected-access # pylint: disable=missing-docstring # pylint: disable=attribute-defined-outside-init -import logging - -from logprep.factory import Factory from logprep.util.time_measurement import TimeMeasurement @@ -15,92 +12,5 @@ def setup_method(self): def dummy_method(self, event): # pylint: disable=unused-argument return True - @TimeMeasurement.measure_time("pipeline") - def dummy_method_pipeline(self, event): # pylint: disable=unused-argument - return True - def test_time_measurement_decorator_does_not_change_return(self): - TimeMeasurement.TIME_MEASUREMENT_ENABLED = True - TimeMeasurement.APPEND_TO_EVENT = True - assert self.dummy_method(self.event) - - def test_time_measurement_decorator_appends_processing_times_to_event(self): - TimeMeasurement.TIME_MEASUREMENT_ENABLED = True - TimeMeasurement.APPEND_TO_EVENT = True - self.dummy_method(self.event) - processing_times = self.event.get("processing_times") - assert processing_times - timestamp = processing_times.get("test") - assert timestamp is not None - assert isinstance(timestamp, float) - - def test_time_measurement_decorator_only_writes_times_if_event_is_not_empty(self): - TimeMeasurement.TIME_MEASUREMENT_ENABLED = True - TimeMeasurement.APPEND_TO_EVENT = True - event = {} - self.dummy_method(event) - assert not event - - def test_deactivated_decorator_does_not_do_a_thing(self): - TimeMeasurement.TIME_MEASUREMENT_ENABLED = False - TimeMeasurement.APPEND_TO_EVENT = False assert self.dummy_method(self.event) - assert self.event.get("processing_times") is None - - def test_time_measurement_decorator_does_not_append_processing_times_to_event_if_deactivated( - self, - ): - TimeMeasurement.TIME_MEASUREMENT_ENABLED = True - TimeMeasurement.APPEND_TO_EVENT = False - result = self.dummy_method(self.event) - assert result is True - processing_times = self.event.get("processing_times") - assert processing_times is None - TimeMeasurement.TIME_MEASUREMENT_ENABLED = False - TimeMeasurement.APPEND_TO_EVENT = False - - def test_time_measurement_decorator_is_parameterizable(self): - TimeMeasurement.TIME_MEASUREMENT_ENABLED = True - TimeMeasurement.APPEND_TO_EVENT = True - self.dummy_method_pipeline(self.event) - assert self.event.get("processing_times").get("pipeline") is not None - - def test_time_measurement_decorator_updates_processors_processing_time_statistic(self): - TimeMeasurement.TIME_MEASUREMENT_ENABLED = True - TimeMeasurement.APPEND_TO_EVENT = False - - dropper_config = { - "Dropper1": { - "type": "dropper", - "specific_rules": ["tests/testdata/unit/dropper/rules/specific/"], - "generic_rules": ["tests/testdata/unit/dropper/rules/generic/"], - "tree_config": "tests/testdata/unit/tree_config.json", - } - } - - dropper = Factory.create( - dropper_config, - logging.getLogger("test-logger"), - ) - assert dropper.metrics.mean_processing_time_per_event == 0 - assert dropper.metrics._mean_processing_time_sample_counter == 0 - event = {"test": "event"} - dropper.process(event) - assert dropper.metrics.mean_processing_time_per_event > 0 - assert dropper.metrics._mean_processing_time_sample_counter == 1 - - def test_time_measurement_decorator_updates_connectors_processing_time_statistic(self): - TimeMeasurement.TIME_MEASUREMENT_ENABLED = True - TimeMeasurement.APPEND_TO_EVENT = False - - dummy_input_config = {"Dummy": {"type": "dummy_input", "documents": [{}, {}]}} - - dummy_input = Factory.create( - dummy_input_config, - logging.getLogger("test-logger"), - ) - assert dummy_input.metrics.mean_processing_time_per_event == 0 - assert dummy_input.metrics._mean_processing_time_sample_counter == 0 - dummy_input.get_next(0.001) - assert dummy_input.metrics.mean_processing_time_per_event > 0 - assert dummy_input.metrics._mean_processing_time_sample_counter == 1