From 488e8251667db75bc5389bb521f22b4040fc56d1 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Mon, 23 Oct 2023 10:16:01 +0000 Subject: [PATCH] add tests and implementation for statistics callback --- logprep/connector/confluent_kafka/input.py | 29 +++++--- logprep/connector/confluent_kafka/output.py | 10 +++ tests/unit/component/base.py | 54 +++++++++----- tests/unit/connector/base.py | 9 --- .../connector/test_confluent_kafka_common.py | 17 +++++ .../connector/test_confluent_kafka_input.py | 70 ++++++------------- .../connector/test_confluent_kafka_output.py | 52 ++++---------- 7 files changed, 118 insertions(+), 123 deletions(-) diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index 7c48ab194..7ebbcb8a5 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -73,6 +73,8 @@ OFFSET_STORED, } +DEFAULT_RETURN = 0 + class ConfluentKafkaInput(Input): """A kafka input connector.""" @@ -291,16 +293,25 @@ def _stats_callback(self, stats: str) -> None: """ stats = self._decoder.decode(stats) - self.metrics.librdkafka_age += stats.get("age", 0) - self.metrics.librdkafka_rx += stats.get("rx", 0) - self.metrics.librdkafka_rx_bytes += stats.get("rx_bytes", 0) - self.metrics.librdkafka_rxmsgs += stats.get("rxmsgs", 0) - self.metrics.librdkafka_rxmsg_bytes += stats.get("rxmsg_bytes", 0) - self.metrics.librdkafka_cgrp_stateage += stats.get("cgrp", {}).get("stateage", 0) - self.metrics.librdkafka_cgrp_rebalance_age += stats.get("cgrp", {}).get("rebalance_age", 0) - self.metrics.librdkafka_cgrp_rebalance_cnt += stats.get("cgrp", {}).get("rebalance_cnt", 0) + self.metrics.librdkafka_age += stats.get("age", DEFAULT_RETURN) + self.metrics.librdkafka_rx += stats.get("rx", DEFAULT_RETURN) + self.metrics.librdkafka_tx += stats.get("tx", DEFAULT_RETURN) + self.metrics.librdkafka_rx_bytes += stats.get("rx_bytes", DEFAULT_RETURN) + self.metrics.librdkafka_tx_bytes += stats.get("tx_bytes", DEFAULT_RETURN) + self.metrics.librdkafka_rxmsgs += stats.get("rxmsgs", DEFAULT_RETURN) + self.metrics.librdkafka_rxmsg_bytes += stats.get("rxmsg_bytes", DEFAULT_RETURN) + + self.metrics.librdkafka_cgrp_stateage += stats.get("cgrp", {}).get( + "stateage", DEFAULT_RETURN + ) + self.metrics.librdkafka_cgrp_rebalance_age += stats.get("cgrp", {}).get( + "rebalance_age", DEFAULT_RETURN + ) + self.metrics.librdkafka_cgrp_rebalance_cnt += stats.get("cgrp", {}).get( + "rebalance_cnt", DEFAULT_RETURN + ) self.metrics.librdkafka_cgrp_assignment_size += stats.get("cgrp", {}).get( - "assignment_size", 0 + "assignment_size", DEFAULT_RETURN ) def _commit_callback( diff --git a/logprep/connector/confluent_kafka/output.py b/logprep/connector/confluent_kafka/output.py index 876a389a2..1438304b3 100644 --- a/logprep/connector/confluent_kafka/output.py +++ b/logprep/connector/confluent_kafka/output.py @@ -207,6 +207,16 @@ def _stats_callback(self, stats: str) -> None: stats = self._decoder.decode(stats) self.metrics.librdkafka_age += stats.get("age", 0) + self.metrics.librdkafka_msg_cnt += stats.get("msg_cnt", 0) + self.metrics.librdkafka_msg_size += stats.get("msg_size", 0) + self.metrics.librdkafka_msg_max += stats.get("msg_max", 0) + self.metrics.librdkafka_msg_size_max += stats.get("msg_size_max", 0) + self.metrics.librdkafka_tx += stats.get("tx", 0) + self.metrics.librdkafka_tx_bytes += stats.get("tx_bytes", 0) + self.metrics.librdkafka_rx += stats.get("rx", 0) + self.metrics.librdkafka_rx_bytes += stats.get("rx_bytes", 0) + self.metrics.librdkafka_txmsgs += stats.get("txmsgs", 0) + self.metrics.librdkafka_txmsg_bytes += stats.get("txmsg_bytes", 0) def describe(self) -> str: """Get name of Kafka endpoint with the bootstrap server. diff --git a/tests/unit/component/base.py b/tests/unit/component/base.py index 8cba0375a..a60960c9d 100644 --- a/tests/unit/component/base.py +++ b/tests/unit/component/base.py @@ -7,7 +7,9 @@ from typing import Callable, Iterable from unittest import mock +import pytest from attrs import asdict +from prometheus_client import Counter, Gauge, Histogram from logprep.abc.component import Component from logprep.abc.connector import Connector @@ -20,10 +22,26 @@ class BaseComponentTestCase(ABC): CONFIG: dict = {} object: Connector = None logger = getLogger() + expected_metrics: list + + block_list = [ + "_labels", + "_prefix", + "processing_time_per_event", + "number_of_processed_events", + "number_of_failed_events", + "number_of_warnings", + "number_of_errors", + ] def setup_method(self) -> None: config = {"Test Instance Name": self.CONFIG} self.object = Factory.create(configuration=config, logger=self.logger) + self.metric_attributes = asdict( + self.object.metrics, + filter=partial(self.asdict_filter, block_list=self.block_list), + recurse=False, + ) def test_uses_python_slots(self): assert isinstance(self.object.__slots__, Iterable) @@ -64,27 +82,29 @@ def test_custom_metrics_are_metric_objects(self): ), "one of the metrics instance attributes is not an instance of type Metric" def test_no_metrics_with_same_name(self): - metric_attributes = asdict(self.object.metrics, filter=self.asdict_filter, recurse=False) - pairs = itertools.combinations(metric_attributes.values(), 2) + pairs = itertools.combinations(self.metric_attributes.values(), 2) for metric1, metric2 in pairs: assert metric1.name != metric2.name, f"{metric1.name} == {metric2.name}" def test_custom_metrics_adds_custom_prefix_to_metrics_name(self): - block_list = [ - "_labels", - "_prefix", - "processing_time_per_event", - "number_of_processed_events", - "number_of_failed_events", - "number_of_warnings", - "number_of_errors", - ] - metric_attributes = asdict( - self.object.metrics, - filter=partial(self.asdict_filter, block_list=block_list), - recurse=False, - ) - for attribute in metric_attributes.values(): + for attribute in self.metric_attributes.values(): assert attribute.fullname.startswith( f"logprep_{camel_to_snake(self.object.__class__.__name__)}" ), f"{attribute.fullname}, logprep_{camel_to_snake(self.object.__class__.__name__)}" + + def test_expected_metrics_attributes(self): + for expected_metric in self.expected_metrics: + metric_attribute = getattr(self.object.metrics, expected_metric) + assert metric_attribute is not None + assert isinstance(metric_attribute, Metric) + + def test_expected_metrics_attributes_are_initialized(self): + for expected_metric in self.expected_metrics: + metric_attribute = getattr(self.object.metrics, expected_metric) + assert metric_attribute.tracker is not None + possibile_tracker_types = (Counter, Gauge, Histogram) + assert isinstance(metric_attribute.tracker, possibile_tracker_types) + + def test_all_metric_attributes_are_tested(self): + difference = set(self.metric_attributes).difference(set(self.expected_metrics)) + assert not difference, f"{difference} are not defined in `expected_metrics`" diff --git a/tests/unit/connector/base.py b/tests/unit/connector/base.py index 4123b8656..e3ab1a7c2 100644 --- a/tests/unit/connector/base.py +++ b/tests/unit/connector/base.py @@ -9,8 +9,6 @@ from logging import getLogger from unittest import mock -from prometheus_client import CollectorRegistry - from logprep.abc.connector import Connector from logprep.abc.input import Input from logprep.abc.output import Output @@ -18,19 +16,12 @@ from logprep.util.time import TimeParser from tests.unit.component.base import BaseComponentTestCase -CUSTOM_REGISTRY = CollectorRegistry(auto_describe=True) - class BaseConnectorTestCase(BaseComponentTestCase): CONFIG: dict = {} object: Connector = None logger = getLogger() - def setup_method(self) -> None: - config = {"Test Instance Name": self.CONFIG} - self.custom_registry = CUSTOM_REGISTRY - self.object = Factory.create(configuration=config, logger=self.logger) - def test_is_a_connector_implementation(self): assert isinstance(self.object, Connector) diff --git a/tests/unit/connector/test_confluent_kafka_common.py b/tests/unit/connector/test_confluent_kafka_common.py index 812046d9f..b77d31ad5 100644 --- a/tests/unit/connector/test_confluent_kafka_common.py +++ b/tests/unit/connector/test_confluent_kafka_common.py @@ -10,11 +10,14 @@ import pytest from logprep.factory import Factory +from logprep.util.helper import get_dotted_field_value KAFKA_STATS_JSON_PATH = "tests/testdata/kafka_stats_return_value.json" class CommonConfluentKafkaTestCase: + expected_metrics = [] + def test_client_id_is_set_to_hostname(self): self.object.setup() assert self.object._config.kafka_config.get("client.id") == getfqdn() @@ -33,6 +36,20 @@ def test_error_callback_logs_warnings(self): mock_warning.assert_called_with(f"{self.object.describe()}: {test_error}") def test_stats_callback_sets_metric_objetc_attributes(self): + librdkafka_metrics = tuple( + filter(lambda x: x.startswith("librdkafka"), self.expected_metrics) + ) + for metric in librdkafka_metrics: + setattr(self.object.metrics, metric, 0) + json_string = Path(KAFKA_STATS_JSON_PATH).read_text("utf8") + self.object._stats_callback(json_string) + stats_dict = json.loads(json_string) + for metric in librdkafka_metrics: + metric_name = metric.replace("librdkafka_", "").replace("cgrp_", "cgrp.") + metric_value = get_dotted_field_value(stats_dict, metric_name) + assert getattr(self.object.metrics, metric) == metric_value, metric + + def test_stats_set_age_metric_explicitly(self): self.object.metrics.librdkafka_age = 0 json_string = Path(KAFKA_STATS_JSON_PATH).read_text("utf8") self.object._stats_callback(json_string) diff --git a/tests/unit/connector/test_confluent_kafka_input.py b/tests/unit/connector/test_confluent_kafka_input.py index d9cc421b6..9f9ae1735 100644 --- a/tests/unit/connector/test_confluent_kafka_input.py +++ b/tests/unit/connector/test_confluent_kafka_input.py @@ -5,13 +5,10 @@ # pylint: disable=attribute-defined-outside-init import socket from copy import deepcopy -from pathlib import Path from unittest import mock import pytest -from attrs import asdict from confluent_kafka import KafkaException -from prometheus_client import CollectorRegistry from logprep.abc.input import ( CriticalInputError, @@ -21,8 +18,7 @@ ) from logprep.factory import Factory from logprep.factory_error import InvalidConfigurationError -from logprep.metrics.metrics import Metric -from tests.unit.connector.base import CUSTOM_REGISTRY, BaseInputTestCase +from tests.unit.connector.base import BaseInputTestCase from tests.unit.connector.test_confluent_kafka_common import ( CommonConfluentKafkaTestCase, ) @@ -37,6 +33,25 @@ class TestConfluentKafkaInput(BaseInputTestCase, CommonConfluentKafkaTestCase): "topic": "test_input_raw", } + expected_metrics = { + "commit_failures", + "commit_success", + "current_offsets", + "committed_offsets", + "librdkafka_age", + "librdkafka_rx", + "librdkafka_rx_bytes", + "librdkafka_rxmsgs", + "librdkafka_rxmsg_bytes", + "librdkafka_cgrp_stateage", + "librdkafka_cgrp_rebalance_age", + "librdkafka_cgrp_rebalance_cnt", + "librdkafka_cgrp_assignment_size", + "librdkafka_replyq", + "librdkafka_tx", + "librdkafka_tx_bytes", + } + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") def test_get_next_returns_none_if_no_records(self, _): self.object._consumer.poll = mock.MagicMock(return_value=None) @@ -263,51 +278,6 @@ def test_raises_value_error_if_mandatory_parameters_not_set(self): with pytest.raises(InvalidConfigurationError, match=expected_error_message): Factory.create({"test": config}, logger=self.logger) - def test_expected_metrics_attributes(self): - expected_metrics = { - "commit_failures", - "commit_success", - "current_offsets", - "committed_offsets", - "librdkafka_age", - "librdkafka_rx", - "librdkafka_rx_bytes", - "librdkafka_rxmsgs", - "librdkafka_rxmsg_bytes", - "librdkafka_cgrp_stateage", - "librdkafka_cgrp_rebalance_age", - "librdkafka_cgrp_rebalance_cnt", - "librdkafka_cgrp_assignment_size", - } - metric_attributes = set(asdict(self.object.metrics).keys()) - diffrences = expected_metrics.difference(metric_attributes) - assert not diffrences, str(diffrences) - - def test_expected_metrics_attributes_are_initialized(self): - expected_metrics = { - "commit_failures", - "commit_success", - "current_offsets", - "committed_offsets", - "librdkafka_age", - "librdkafka_replyq", - "librdkafka_tx", - "librdkafka_tx_bytes", - "librdkafka_rx", - "librdkafka_rx_bytes", - "librdkafka_rxmsgs", - "librdkafka_rxmsg_bytes", - "librdkafka_cgrp_stateage", - "librdkafka_cgrp_rebalance_age", - "librdkafka_cgrp_rebalance_cnt", - "librdkafka_cgrp_assignment_size", - } - metric_attributes = asdict(self.object.metrics, recurse=False) - for metric_name in expected_metrics: - assert metric_attributes.get(metric_name) is not None - assert isinstance(metric_attributes.get(metric_name), Metric) - assert metric_attributes.get(metric_name).tracker is not None - @pytest.mark.parametrize( "metric_name", [ diff --git a/tests/unit/connector/test_confluent_kafka_output.py b/tests/unit/connector/test_confluent_kafka_output.py index 7003f7dc7..744047c4e 100644 --- a/tests/unit/connector/test_confluent_kafka_output.py +++ b/tests/unit/connector/test_confluent_kafka_output.py @@ -37,6 +37,20 @@ class TestConfluentKafkaOutput(BaseOutputTestCase, CommonConfluentKafkaTestCase) }, } + expected_metrics = { + "librdkafka_age", + "librdkafka_msg_cnt", + "librdkafka_msg_size", + "librdkafka_msg_max", + "librdkafka_msg_size_max", + "librdkafka_tx", + "librdkafka_tx_bytes", + "librdkafka_rx", + "librdkafka_rx_bytes", + "librdkafka_txmsgs", + "librdkafka_txmsg_bytes", + } + @mock.patch("logprep.connector.confluent_kafka.output.Producer", return_value="The Producer") def test_producer_property_instanciates_kafka_producer(self, _): kafka_output = Factory.create({"test connector": self.CONFIG}, logger=self.logger) @@ -144,41 +158,3 @@ def test_raises_value_error_if_mandatory_parameters_not_set(self): expected_error_message = r"keys are missing: {'bootstrap.servers'}" with pytest.raises(InvalidConfigurationError, match=expected_error_message): Factory.create({"test": config}, logger=self.logger) - - def test_expected_metrics_attributes(self): - expected_metrics = { - "librdkafka_age", - "librdkafka_msg_cnt", - "librdkafka_msg_size", - "librdkafka_msg_max", - "librdkafka_msg_size_max", - "librdkafka_tx", - "librdkafka_tx_bytes", - "librdkafka_rx", - "librdkafka_rx_bytes", - "librdkafka_txmsgs", - "librdkafka_txmsg_bytes", - } - metric_attributes = set(asdict(self.object.metrics).keys()) - diffrences = expected_metrics.difference(metric_attributes) - assert not diffrences, str(diffrences) - - def test_expected_metrics_attributes_are_initialized(self): - expected_metrics = { - "librdkafka_age", - "librdkafka_msg_cnt", - "librdkafka_msg_size", - "librdkafka_msg_max", - "librdkafka_msg_size_max", - "librdkafka_tx", - "librdkafka_tx_bytes", - "librdkafka_rx", - "librdkafka_rx_bytes", - "librdkafka_txmsgs", - "librdkafka_txmsg_bytes", - } - metric_attributes = asdict(self.object.metrics, recurse=False) - for metric_name in expected_metrics: - assert metric_attributes.get(metric_name) is not None - assert isinstance(metric_attributes.get(metric_name), Metric) - assert metric_attributes.get(metric_name).tracker is not None