Skip to content

Commit

Permalink
ensure no current offset and committed offset is not added with defau…
Browse files Browse the repository at this point in the history
…lt labels
  • Loading branch information
ekneg54 committed Oct 20, 2023
1 parent 8365faa commit dcfc83c
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 28 deletions.
9 changes: 4 additions & 5 deletions logprep/abc/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from attrs import asdict
from schedule import Scheduler

from logprep.metrics.metrics import Metric
from logprep.metrics.metrics import Metric, get_default_labels
from logprep.util.helper import camel_to_snake


Expand All @@ -26,15 +26,14 @@ class Config:
class Metrics:
"""Base Metric class to track and expose statistics about logprep"""

_labels: dict = field(
factory=lambda: {"component": None, "name": None, "type": None, "description": None}
)
_labels: dict = field(factory=get_default_labels)

def __attrs_post_init__(self):
for attribute in asdict(self):
attribute = getattr(self, attribute)
if isinstance(attribute, Metric):
attribute.labels = self._labels
if attribute.inject_label_values:
attribute.labels = self._labels
attribute.init_tracker()

# __dict__ is added to support functools.cached_property
Expand Down
2 changes: 2 additions & 0 deletions logprep/connector/confluent_kafka/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,15 @@ class Metrics(Input.Metrics):
factory=lambda: GaugeMetric(
description="current offsets of the consumer. Is filled by `_get_raw_event`",
name="confluent_kafka_input_current_offsets",
inject_label_values=False,
)
)
"""current offsets of the consumer. Is filled by `_get_raw_event`"""
committed_offsets: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="committed offsets of the consumer. Is filled by `_commit_callback`",
name="confluent_kafka_input_committed_offsets",
inject_label_values=False,
)
)
"""committed offsets of the consumer. Is filled by `_commit_callback`"""
Expand Down
16 changes: 13 additions & 3 deletions logprep/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram


def get_default_labels():
"""returns the default labels"""
return {"component": None, "name": None, "type": None, "description": None}


@define(kw_only=True)
class Metric(ABC):
"""Metric base class"""
Expand All @@ -17,15 +22,19 @@ class Metric(ABC):
validators.instance_of(dict),
validators.deep_mapping(
key_validator=validators.instance_of(str),
value_validator=validators.instance_of(str),
value_validator=validators.instance_of((str, type(None))),
),
],
default={},
factory=get_default_labels,
)
_registry: CollectorRegistry = field(default=None)
_prefix: str = field(default="logprep_")
inject_label_values: bool = field(default=True)
tracker: Union[Counter, Histogram, Gauge] = field(init=False, default=None)

def __attrs_post_init__(self):
self.init_tracker()

@property
def fullname(self):
"""returns the fullname"""
Expand Down Expand Up @@ -65,7 +74,8 @@ def init_tracker(self) -> None:
raise ValueError(
f"Metric {self.fullname} already exists with different type"
) from error
self.tracker.labels(**self.labels)
if self.inject_label_values:
self.tracker.labels(**self.labels)

@abstractmethod
def __add__(self, other):
Expand Down
61 changes: 57 additions & 4 deletions tests/unit/connector/test_confluent_kafka_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from unittest import mock

import pytest
from attrs import asdict
from confluent_kafka import KafkaException

from logprep.abc.input import (
Expand All @@ -19,6 +20,7 @@
)
from logprep.factory import Factory
from logprep.factory_error import InvalidConfigurationError
from logprep.metrics.metrics import Metric
from tests.unit.connector.base import BaseInputTestCase
from tests.unit.connector.test_confluent_kafka_common import (
CommonConfluentKafkaTestCase,
Expand Down Expand Up @@ -243,10 +245,6 @@ def test_statistics_interval_can_be_overwritten(self, mock_consumer):
mock_consumer.assert_called()
assert mock_consumer.call_args[0][0].get("statistics.interval.ms") == "999999999"

def test_init_sets_metrics_properties(self):
assert self.object.metrics._consumer_group_id == "testgroup"
assert self.object.metrics._consumer_client_id == socket.getfqdn()

def test_metrics_expose_returns_data(self):
json_string = Path(KAFKA_STATS_JSON_PATH).read_text("utf8")
self.object._stats_callback(json_string)
Expand Down Expand Up @@ -297,3 +295,58 @@ def test_raises_value_error_if_mandatory_parameters_not_set(self):
expected_error_message = r"keys are missing: {'(bootstrap.servers|group.id)', '(bootstrap.servers|group.id)'}" # pylint: disable=line-too-long
with pytest.raises(InvalidConfigurationError, match=expected_error_message):
Factory.create({"test": config}, logger=self.logger)

def test_expected_metrics_attributes(self):
expected_metrics = {
"commit_failures",
"commit_success",
"current_offsets",
"committed_offsets",
"librdkafka_age",
"librdkafka_rx",
"librdkafka_rx_bytes",
"librdkafka_rxmsgs",
"librdkafka_rxmsg_bytes",
"librdkafka_cgrp_stateage",
"librdkafka_cgrp_rebalance_age",
"librdkafka_cgrp_rebalance_cnt",
"librdkafka_cgrp_assignment_size",
}
metric_attributes = set(asdict(self.object.metrics).keys())
diffrences = expected_metrics.difference(metric_attributes)
assert not diffrences, str(diffrences)

def test_expected_metrics_attributes_are_initialized(self):
expected_metrics = {
"commit_failures",
"commit_success",
"current_offsets",
"committed_offsets",
"librdkafka_age",
"librdkafka_rx",
"librdkafka_rx_bytes",
"librdkafka_rxmsgs",
"librdkafka_rxmsg_bytes",
"librdkafka_cgrp_stateage",
"librdkafka_cgrp_rebalance_age",
"librdkafka_cgrp_rebalance_cnt",
"librdkafka_cgrp_assignment_size",
}
metric_attributes = asdict(self.object.metrics, recurse=False)
for metric_name in expected_metrics:
assert metric_attributes.get(metric_name) is not None
assert isinstance(metric_attributes.get(metric_name), Metric)
assert metric_attributes.get(metric_name).tracker is not None

@pytest.mark.parametrize(
"metric_name",
[
"current_offsets",
"committed_offsets",
],
)
def test_current_offset_not_initialized_with_default_labels(self, metric_name):
metric = getattr(self.object.metrics, metric_name)
assert not metric.tracker._labelnames
metric_object = metric.tracker.collect()[0]
assert len(metric_object.samples) == 0
95 changes: 79 additions & 16 deletions tests/unit/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
import pytest
from prometheus_client import CollectorRegistry, Counter, Histogram, generate_latest

from logprep.abc.component import Component
from logprep.metrics.metrics import CounterMetric, GaugeMetric, HistogramMetric


class TestsMetrics:
class TestsMetric:
def setup_method(self):
self.custom_registry = CollectorRegistry()

Expand Down Expand Up @@ -44,26 +45,41 @@ def test_counter_metric_sets_labels(self):
metric.init_tracker()
assert metric.tracker._labelnames == ("pipeline",)

def test_initialize_without_labels_initializes_defaults(self):
metric = CounterMetric(
name="bla",
description="empty description",
registry=self.custom_registry,
)
assert len(metric.tracker._labelnames) == 4

def test_initialize_with_empty_labels_raises(self):
with pytest.raises(ValueError):
_ = CounterMetric(
name="bla",
description="empty description",
registry=self.custom_registry,
labels={},
)

def test_counter_metric_increments_correctly(self):
metric = CounterMetric(
name="bla",
description="empty description",
labels={"pipeline": "1"},
registry=self.custom_registry,
)
metric.init_tracker()
metric += 1
metric_output = generate_latest(self.custom_registry).decode("utf-8")
assert 'logprep_bla_total{pipeline="1"} 1.0' in metric_output

def test_counter_metric_increments_second(self):
def test_counter_metric_increments_twice(self):
metric = CounterMetric(
name="bla",
description="empty description",
labels={"pipeline": "1"},
registry=self.custom_registry,
)
metric.init_tracker()
metric += 1
metric += 1
metric_output = generate_latest(self.custom_registry).decode("utf-8")
Expand All @@ -76,14 +92,12 @@ def test_no_duplicated_counter_is_created(self):
labels={"pipeline": "1"},
registry=self.custom_registry,
)
metric1.init_tracker()
metric2 = CounterMetric(
name="bla",
description="empty description",
labels={"pipeline": "1"},
registry=self.custom_registry,
)
metric2.init_tracker()

assert metric1.tracker._labelnames == metric2.tracker._labelnames
metric1 += 1
Expand All @@ -99,14 +113,12 @@ def test_no_duplicated_counter_is_created_2(self):
labels={"pipeline": "1"},
registry=self.custom_registry,
)
metric1.init_tracker()
metric2 = CounterMetric(
name="bla",
description="empty description",
labels={"pipeline": "2"},
registry=self.custom_registry,
)
metric2.init_tracker()

assert metric1.tracker == metric2.tracker
metric1 += 1
Expand All @@ -117,18 +129,69 @@ def test_no_duplicated_counter_is_created_2(self):
assert len(result) == 1

def test_init_tracker_raises_on_try_to_overwrite_tracker_with_different_type(self):
metric1 = CounterMetric(
_ = CounterMetric(
name="bla",
description="empty description",
labels={"pipeline": "1"},
registry=self.custom_registry,
)
metric1.init_tracker()
metric2 = HistogramMetric(
name="bla",
with pytest.raises(ValueError, match="already exists with different type"):
_ = HistogramMetric(
name="bla",
description="empty description",
labels={"pipeline": "2"},
registry=self.custom_registry,
)


class TestComponentMetric:
class Metrics(Component.Metrics):
"""test class"""

test_metric_number_1: CounterMetric = CounterMetric(
name="test_metric_number_1",
description="empty description",
labels={"pipeline": "2"},
registry=self.custom_registry,
)
with pytest.raises(ValueError, match="already exists with different type"):
metric2.init_tracker()

test_metric_without_label_values: CounterMetric = CounterMetric(
name="test_metric_number_1",
description="empty description",
inject_label_values=False,
)

def setup_method(self):
self.metrics = self.Metrics(labels={"label1": "value1", "label2": "value2"})

def test_init(self):
assert self.metrics.test_metric_number_1 is not None
assert isinstance(self.metrics.test_metric_number_1, CounterMetric)
assert self.metrics.test_metric_number_1.tracker is not None
assert isinstance(self.metrics.test_metric_number_1.tracker, Counter)

def test_label_values_injection(self):
assert self.metrics.test_metric_number_1.tracker._labelnames == (
"component",
"name",
"type",
"description",
)
metric = self.metrics.test_metric_number_1
metric_object = metric.tracker.collect()[0]
assert len(metric_object.samples) == 2
assert metric_object.samples[0][1] == {
"component": "None",
"name": "None",
"type": "None",
"description": "None",
}

def test_no_label_values_injection(self):
assert self.metrics.test_metric_without_label_values.tracker._labelnames == (
"component",
"name",
"type",
"description",
)
metric = self.metrics.test_metric_without_label_values
metric_object = metric.tracker.collect()[0]
assert len(metric_object.samples) == 0

0 comments on commit dcfc83c

Please sign in to comment.