From dcfc83c97a83cfd19f3e60b3aafb4c5a30d62969 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Fri, 20 Oct 2023 20:18:28 +0000 Subject: [PATCH] ensure no current offset and committed offset is not added with default labels --- logprep/abc/component.py | 9 +- logprep/connector/confluent_kafka/input.py | 2 + logprep/metrics/metrics.py | 16 +++- .../connector/test_confluent_kafka_input.py | 61 +++++++++++- tests/unit/metrics/test_metrics.py | 95 +++++++++++++++---- 5 files changed, 155 insertions(+), 28 deletions(-) diff --git a/logprep/abc/component.py b/logprep/abc/component.py index 2d0f775c0..c43ec8e01 100644 --- a/logprep/abc/component.py +++ b/logprep/abc/component.py @@ -8,7 +8,7 @@ from attrs import asdict from schedule import Scheduler -from logprep.metrics.metrics import Metric +from logprep.metrics.metrics import Metric, get_default_labels from logprep.util.helper import camel_to_snake @@ -26,15 +26,14 @@ class Config: class Metrics: """Base Metric class to track and expose statistics about logprep""" - _labels: dict = field( - factory=lambda: {"component": None, "name": None, "type": None, "description": None} - ) + _labels: dict = field(factory=get_default_labels) def __attrs_post_init__(self): for attribute in asdict(self): attribute = getattr(self, attribute) if isinstance(attribute, Metric): - attribute.labels = self._labels + if attribute.inject_label_values: + attribute.labels = self._labels attribute.init_tracker() # __dict__ is added to support functools.cached_property diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index b90697191..f0a6aef5f 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -101,6 +101,7 @@ class Metrics(Input.Metrics): factory=lambda: GaugeMetric( description="current offsets of the consumer. Is filled by `_get_raw_event`", name="confluent_kafka_input_current_offsets", + inject_label_values=False, ) ) """current offsets of the consumer. Is filled by `_get_raw_event`""" @@ -108,6 +109,7 @@ class Metrics(Input.Metrics): factory=lambda: GaugeMetric( description="committed offsets of the consumer. Is filled by `_commit_callback`", name="confluent_kafka_input_committed_offsets", + inject_label_values=False, ) ) """committed offsets of the consumer. Is filled by `_commit_callback`""" diff --git a/logprep/metrics/metrics.py b/logprep/metrics/metrics.py index 8c9eddda8..1a9ab054e 100644 --- a/logprep/metrics/metrics.py +++ b/logprep/metrics/metrics.py @@ -6,6 +6,11 @@ from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram +def get_default_labels(): + """returns the default labels""" + return {"component": None, "name": None, "type": None, "description": None} + + @define(kw_only=True) class Metric(ABC): """Metric base class""" @@ -17,15 +22,19 @@ class Metric(ABC): validators.instance_of(dict), validators.deep_mapping( key_validator=validators.instance_of(str), - value_validator=validators.instance_of(str), + value_validator=validators.instance_of((str, type(None))), ), ], - default={}, + factory=get_default_labels, ) _registry: CollectorRegistry = field(default=None) _prefix: str = field(default="logprep_") + inject_label_values: bool = field(default=True) tracker: Union[Counter, Histogram, Gauge] = field(init=False, default=None) + def __attrs_post_init__(self): + self.init_tracker() + @property def fullname(self): """returns the fullname""" @@ -65,7 +74,8 @@ def init_tracker(self) -> None: raise ValueError( f"Metric {self.fullname} already exists with different type" ) from error - self.tracker.labels(**self.labels) + if self.inject_label_values: + self.tracker.labels(**self.labels) @abstractmethod def __add__(self, other): diff --git a/tests/unit/connector/test_confluent_kafka_input.py b/tests/unit/connector/test_confluent_kafka_input.py index ee63a38ae..653f4378c 100644 --- a/tests/unit/connector/test_confluent_kafka_input.py +++ b/tests/unit/connector/test_confluent_kafka_input.py @@ -9,6 +9,7 @@ from unittest import mock import pytest +from attrs import asdict from confluent_kafka import KafkaException from logprep.abc.input import ( @@ -19,6 +20,7 @@ ) from logprep.factory import Factory from logprep.factory_error import InvalidConfigurationError +from logprep.metrics.metrics import Metric from tests.unit.connector.base import BaseInputTestCase from tests.unit.connector.test_confluent_kafka_common import ( CommonConfluentKafkaTestCase, @@ -243,10 +245,6 @@ def test_statistics_interval_can_be_overwritten(self, mock_consumer): mock_consumer.assert_called() assert mock_consumer.call_args[0][0].get("statistics.interval.ms") == "999999999" - def test_init_sets_metrics_properties(self): - assert self.object.metrics._consumer_group_id == "testgroup" - assert self.object.metrics._consumer_client_id == socket.getfqdn() - def test_metrics_expose_returns_data(self): json_string = Path(KAFKA_STATS_JSON_PATH).read_text("utf8") self.object._stats_callback(json_string) @@ -297,3 +295,58 @@ def test_raises_value_error_if_mandatory_parameters_not_set(self): expected_error_message = r"keys are missing: {'(bootstrap.servers|group.id)', '(bootstrap.servers|group.id)'}" # pylint: disable=line-too-long with pytest.raises(InvalidConfigurationError, match=expected_error_message): Factory.create({"test": config}, logger=self.logger) + + def test_expected_metrics_attributes(self): + expected_metrics = { + "commit_failures", + "commit_success", + "current_offsets", + "committed_offsets", + "librdkafka_age", + "librdkafka_rx", + "librdkafka_rx_bytes", + "librdkafka_rxmsgs", + "librdkafka_rxmsg_bytes", + "librdkafka_cgrp_stateage", + "librdkafka_cgrp_rebalance_age", + "librdkafka_cgrp_rebalance_cnt", + "librdkafka_cgrp_assignment_size", + } + metric_attributes = set(asdict(self.object.metrics).keys()) + diffrences = expected_metrics.difference(metric_attributes) + assert not diffrences, str(diffrences) + + def test_expected_metrics_attributes_are_initialized(self): + expected_metrics = { + "commit_failures", + "commit_success", + "current_offsets", + "committed_offsets", + "librdkafka_age", + "librdkafka_rx", + "librdkafka_rx_bytes", + "librdkafka_rxmsgs", + "librdkafka_rxmsg_bytes", + "librdkafka_cgrp_stateage", + "librdkafka_cgrp_rebalance_age", + "librdkafka_cgrp_rebalance_cnt", + "librdkafka_cgrp_assignment_size", + } + metric_attributes = asdict(self.object.metrics, recurse=False) + for metric_name in expected_metrics: + assert metric_attributes.get(metric_name) is not None + assert isinstance(metric_attributes.get(metric_name), Metric) + assert metric_attributes.get(metric_name).tracker is not None + + @pytest.mark.parametrize( + "metric_name", + [ + "current_offsets", + "committed_offsets", + ], + ) + def test_current_offset_not_initialized_with_default_labels(self, metric_name): + metric = getattr(self.object.metrics, metric_name) + assert not metric.tracker._labelnames + metric_object = metric.tracker.collect()[0] + assert len(metric_object.samples) == 0 diff --git a/tests/unit/metrics/test_metrics.py b/tests/unit/metrics/test_metrics.py index 023bc52e0..5c00d4baa 100644 --- a/tests/unit/metrics/test_metrics.py +++ b/tests/unit/metrics/test_metrics.py @@ -7,10 +7,11 @@ import pytest from prometheus_client import CollectorRegistry, Counter, Histogram, generate_latest +from logprep.abc.component import Component from logprep.metrics.metrics import CounterMetric, GaugeMetric, HistogramMetric -class TestsMetrics: +class TestsMetric: def setup_method(self): self.custom_registry = CollectorRegistry() @@ -44,6 +45,23 @@ def test_counter_metric_sets_labels(self): metric.init_tracker() assert metric.tracker._labelnames == ("pipeline",) + def test_initialize_without_labels_initializes_defaults(self): + metric = CounterMetric( + name="bla", + description="empty description", + registry=self.custom_registry, + ) + assert len(metric.tracker._labelnames) == 4 + + def test_initialize_with_empty_labels_raises(self): + with pytest.raises(ValueError): + _ = CounterMetric( + name="bla", + description="empty description", + registry=self.custom_registry, + labels={}, + ) + def test_counter_metric_increments_correctly(self): metric = CounterMetric( name="bla", @@ -51,19 +69,17 @@ def test_counter_metric_increments_correctly(self): labels={"pipeline": "1"}, registry=self.custom_registry, ) - metric.init_tracker() metric += 1 metric_output = generate_latest(self.custom_registry).decode("utf-8") assert 'logprep_bla_total{pipeline="1"} 1.0' in metric_output - def test_counter_metric_increments_second(self): + def test_counter_metric_increments_twice(self): metric = CounterMetric( name="bla", description="empty description", labels={"pipeline": "1"}, registry=self.custom_registry, ) - metric.init_tracker() metric += 1 metric += 1 metric_output = generate_latest(self.custom_registry).decode("utf-8") @@ -76,14 +92,12 @@ def test_no_duplicated_counter_is_created(self): labels={"pipeline": "1"}, registry=self.custom_registry, ) - metric1.init_tracker() metric2 = CounterMetric( name="bla", description="empty description", labels={"pipeline": "1"}, registry=self.custom_registry, ) - metric2.init_tracker() assert metric1.tracker._labelnames == metric2.tracker._labelnames metric1 += 1 @@ -99,14 +113,12 @@ def test_no_duplicated_counter_is_created_2(self): labels={"pipeline": "1"}, registry=self.custom_registry, ) - metric1.init_tracker() metric2 = CounterMetric( name="bla", description="empty description", labels={"pipeline": "2"}, registry=self.custom_registry, ) - metric2.init_tracker() assert metric1.tracker == metric2.tracker metric1 += 1 @@ -117,18 +129,69 @@ def test_no_duplicated_counter_is_created_2(self): assert len(result) == 1 def test_init_tracker_raises_on_try_to_overwrite_tracker_with_different_type(self): - metric1 = CounterMetric( + _ = CounterMetric( name="bla", description="empty description", labels={"pipeline": "1"}, registry=self.custom_registry, ) - metric1.init_tracker() - metric2 = HistogramMetric( - name="bla", + with pytest.raises(ValueError, match="already exists with different type"): + _ = HistogramMetric( + name="bla", + description="empty description", + labels={"pipeline": "2"}, + registry=self.custom_registry, + ) + + +class TestComponentMetric: + class Metrics(Component.Metrics): + """test class""" + + test_metric_number_1: CounterMetric = CounterMetric( + name="test_metric_number_1", description="empty description", - labels={"pipeline": "2"}, - registry=self.custom_registry, ) - with pytest.raises(ValueError, match="already exists with different type"): - metric2.init_tracker() + + test_metric_without_label_values: CounterMetric = CounterMetric( + name="test_metric_number_1", + description="empty description", + inject_label_values=False, + ) + + def setup_method(self): + self.metrics = self.Metrics(labels={"label1": "value1", "label2": "value2"}) + + def test_init(self): + assert self.metrics.test_metric_number_1 is not None + assert isinstance(self.metrics.test_metric_number_1, CounterMetric) + assert self.metrics.test_metric_number_1.tracker is not None + assert isinstance(self.metrics.test_metric_number_1.tracker, Counter) + + def test_label_values_injection(self): + assert self.metrics.test_metric_number_1.tracker._labelnames == ( + "component", + "name", + "type", + "description", + ) + metric = self.metrics.test_metric_number_1 + metric_object = metric.tracker.collect()[0] + assert len(metric_object.samples) == 2 + assert metric_object.samples[0][1] == { + "component": "None", + "name": "None", + "type": "None", + "description": "None", + } + + def test_no_label_values_injection(self): + assert self.metrics.test_metric_without_label_values.tracker._labelnames == ( + "component", + "name", + "type", + "description", + ) + metric = self.metrics.test_metric_without_label_values + metric_object = metric.tracker.collect()[0] + assert len(metric_object.samples) == 0