Skip to content

Commit

Permalink
add tests and implementation for statistics callback
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Oct 23, 2023
1 parent 1c0f046 commit 488e825
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 123 deletions.
29 changes: 20 additions & 9 deletions logprep/connector/confluent_kafka/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
OFFSET_STORED,
}

DEFAULT_RETURN = 0


class ConfluentKafkaInput(Input):
"""A kafka input connector."""
Expand Down Expand Up @@ -291,16 +293,25 @@ def _stats_callback(self, stats: str) -> None:
"""

stats = self._decoder.decode(stats)
self.metrics.librdkafka_age += stats.get("age", 0)
self.metrics.librdkafka_rx += stats.get("rx", 0)
self.metrics.librdkafka_rx_bytes += stats.get("rx_bytes", 0)
self.metrics.librdkafka_rxmsgs += stats.get("rxmsgs", 0)
self.metrics.librdkafka_rxmsg_bytes += stats.get("rxmsg_bytes", 0)
self.metrics.librdkafka_cgrp_stateage += stats.get("cgrp", {}).get("stateage", 0)
self.metrics.librdkafka_cgrp_rebalance_age += stats.get("cgrp", {}).get("rebalance_age", 0)
self.metrics.librdkafka_cgrp_rebalance_cnt += stats.get("cgrp", {}).get("rebalance_cnt", 0)
self.metrics.librdkafka_age += stats.get("age", DEFAULT_RETURN)
self.metrics.librdkafka_rx += stats.get("rx", DEFAULT_RETURN)
self.metrics.librdkafka_tx += stats.get("tx", DEFAULT_RETURN)
self.metrics.librdkafka_rx_bytes += stats.get("rx_bytes", DEFAULT_RETURN)
self.metrics.librdkafka_tx_bytes += stats.get("tx_bytes", DEFAULT_RETURN)
self.metrics.librdkafka_rxmsgs += stats.get("rxmsgs", DEFAULT_RETURN)
self.metrics.librdkafka_rxmsg_bytes += stats.get("rxmsg_bytes", DEFAULT_RETURN)

self.metrics.librdkafka_cgrp_stateage += stats.get("cgrp", {}).get(
"stateage", DEFAULT_RETURN
)
self.metrics.librdkafka_cgrp_rebalance_age += stats.get("cgrp", {}).get(
"rebalance_age", DEFAULT_RETURN
)
self.metrics.librdkafka_cgrp_rebalance_cnt += stats.get("cgrp", {}).get(
"rebalance_cnt", DEFAULT_RETURN
)
self.metrics.librdkafka_cgrp_assignment_size += stats.get("cgrp", {}).get(
"assignment_size", 0
"assignment_size", DEFAULT_RETURN
)

def _commit_callback(
Expand Down
10 changes: 10 additions & 0 deletions logprep/connector/confluent_kafka/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,16 @@ def _stats_callback(self, stats: str) -> None:

stats = self._decoder.decode(stats)
self.metrics.librdkafka_age += stats.get("age", 0)
self.metrics.librdkafka_msg_cnt += stats.get("msg_cnt", 0)
self.metrics.librdkafka_msg_size += stats.get("msg_size", 0)
self.metrics.librdkafka_msg_max += stats.get("msg_max", 0)
self.metrics.librdkafka_msg_size_max += stats.get("msg_size_max", 0)
self.metrics.librdkafka_tx += stats.get("tx", 0)
self.metrics.librdkafka_tx_bytes += stats.get("tx_bytes", 0)
self.metrics.librdkafka_rx += stats.get("rx", 0)
self.metrics.librdkafka_rx_bytes += stats.get("rx_bytes", 0)
self.metrics.librdkafka_txmsgs += stats.get("txmsgs", 0)
self.metrics.librdkafka_txmsg_bytes += stats.get("txmsg_bytes", 0)

def describe(self) -> str:
"""Get name of Kafka endpoint with the bootstrap server.
Expand Down
54 changes: 37 additions & 17 deletions tests/unit/component/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from typing import Callable, Iterable
from unittest import mock

import pytest
from attrs import asdict
from prometheus_client import Counter, Gauge, Histogram

from logprep.abc.component import Component
from logprep.abc.connector import Connector
Expand All @@ -20,10 +22,26 @@ class BaseComponentTestCase(ABC):
CONFIG: dict = {}
object: Connector = None
logger = getLogger()
expected_metrics: list

block_list = [
"_labels",
"_prefix",
"processing_time_per_event",
"number_of_processed_events",
"number_of_failed_events",
"number_of_warnings",
"number_of_errors",
]

def setup_method(self) -> None:
config = {"Test Instance Name": self.CONFIG}
self.object = Factory.create(configuration=config, logger=self.logger)
self.metric_attributes = asdict(
self.object.metrics,
filter=partial(self.asdict_filter, block_list=self.block_list),
recurse=False,
)

def test_uses_python_slots(self):
assert isinstance(self.object.__slots__, Iterable)
Expand Down Expand Up @@ -64,27 +82,29 @@ def test_custom_metrics_are_metric_objects(self):
), "one of the metrics instance attributes is not an instance of type Metric"

def test_no_metrics_with_same_name(self):
metric_attributes = asdict(self.object.metrics, filter=self.asdict_filter, recurse=False)
pairs = itertools.combinations(metric_attributes.values(), 2)
pairs = itertools.combinations(self.metric_attributes.values(), 2)
for metric1, metric2 in pairs:
assert metric1.name != metric2.name, f"{metric1.name} == {metric2.name}"

def test_custom_metrics_adds_custom_prefix_to_metrics_name(self):
block_list = [
"_labels",
"_prefix",
"processing_time_per_event",
"number_of_processed_events",
"number_of_failed_events",
"number_of_warnings",
"number_of_errors",
]
metric_attributes = asdict(
self.object.metrics,
filter=partial(self.asdict_filter, block_list=block_list),
recurse=False,
)
for attribute in metric_attributes.values():
for attribute in self.metric_attributes.values():
assert attribute.fullname.startswith(
f"logprep_{camel_to_snake(self.object.__class__.__name__)}"
), f"{attribute.fullname}, logprep_{camel_to_snake(self.object.__class__.__name__)}"

def test_expected_metrics_attributes(self):
for expected_metric in self.expected_metrics:
metric_attribute = getattr(self.object.metrics, expected_metric)
assert metric_attribute is not None
assert isinstance(metric_attribute, Metric)

def test_expected_metrics_attributes_are_initialized(self):
for expected_metric in self.expected_metrics:
metric_attribute = getattr(self.object.metrics, expected_metric)
assert metric_attribute.tracker is not None
possibile_tracker_types = (Counter, Gauge, Histogram)
assert isinstance(metric_attribute.tracker, possibile_tracker_types)

def test_all_metric_attributes_are_tested(self):
difference = set(self.metric_attributes).difference(set(self.expected_metrics))
assert not difference, f"{difference} are not defined in `expected_metrics`"
9 changes: 0 additions & 9 deletions tests/unit/connector/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,19 @@
from logging import getLogger
from unittest import mock

from prometheus_client import CollectorRegistry

from logprep.abc.connector import Connector
from logprep.abc.input import Input
from logprep.abc.output import Output
from logprep.factory import Factory
from logprep.util.time import TimeParser
from tests.unit.component.base import BaseComponentTestCase

CUSTOM_REGISTRY = CollectorRegistry(auto_describe=True)


class BaseConnectorTestCase(BaseComponentTestCase):
CONFIG: dict = {}
object: Connector = None
logger = getLogger()

def setup_method(self) -> None:
config = {"Test Instance Name": self.CONFIG}
self.custom_registry = CUSTOM_REGISTRY
self.object = Factory.create(configuration=config, logger=self.logger)

def test_is_a_connector_implementation(self):
assert isinstance(self.object, Connector)

Expand Down
17 changes: 17 additions & 0 deletions tests/unit/connector/test_confluent_kafka_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@
import pytest

from logprep.factory import Factory
from logprep.util.helper import get_dotted_field_value

KAFKA_STATS_JSON_PATH = "tests/testdata/kafka_stats_return_value.json"


class CommonConfluentKafkaTestCase:
expected_metrics = []

def test_client_id_is_set_to_hostname(self):
self.object.setup()
assert self.object._config.kafka_config.get("client.id") == getfqdn()
Expand All @@ -33,6 +36,20 @@ def test_error_callback_logs_warnings(self):
mock_warning.assert_called_with(f"{self.object.describe()}: {test_error}")

def test_stats_callback_sets_metric_objetc_attributes(self):
librdkafka_metrics = tuple(
filter(lambda x: x.startswith("librdkafka"), self.expected_metrics)
)
for metric in librdkafka_metrics:
setattr(self.object.metrics, metric, 0)
json_string = Path(KAFKA_STATS_JSON_PATH).read_text("utf8")
self.object._stats_callback(json_string)
stats_dict = json.loads(json_string)
for metric in librdkafka_metrics:
metric_name = metric.replace("librdkafka_", "").replace("cgrp_", "cgrp.")
metric_value = get_dotted_field_value(stats_dict, metric_name)
assert getattr(self.object.metrics, metric) == metric_value, metric

def test_stats_set_age_metric_explicitly(self):
self.object.metrics.librdkafka_age = 0
json_string = Path(KAFKA_STATS_JSON_PATH).read_text("utf8")
self.object._stats_callback(json_string)
Expand Down
70 changes: 20 additions & 50 deletions tests/unit/connector/test_confluent_kafka_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@
# pylint: disable=attribute-defined-outside-init
import socket
from copy import deepcopy
from pathlib import Path
from unittest import mock

import pytest
from attrs import asdict
from confluent_kafka import KafkaException
from prometheus_client import CollectorRegistry

from logprep.abc.input import (
CriticalInputError,
Expand All @@ -21,8 +18,7 @@
)
from logprep.factory import Factory
from logprep.factory_error import InvalidConfigurationError
from logprep.metrics.metrics import Metric
from tests.unit.connector.base import CUSTOM_REGISTRY, BaseInputTestCase
from tests.unit.connector.base import BaseInputTestCase
from tests.unit.connector.test_confluent_kafka_common import (
CommonConfluentKafkaTestCase,
)
Expand All @@ -37,6 +33,25 @@ class TestConfluentKafkaInput(BaseInputTestCase, CommonConfluentKafkaTestCase):
"topic": "test_input_raw",
}

expected_metrics = {
"commit_failures",
"commit_success",
"current_offsets",
"committed_offsets",
"librdkafka_age",
"librdkafka_rx",
"librdkafka_rx_bytes",
"librdkafka_rxmsgs",
"librdkafka_rxmsg_bytes",
"librdkafka_cgrp_stateage",
"librdkafka_cgrp_rebalance_age",
"librdkafka_cgrp_rebalance_cnt",
"librdkafka_cgrp_assignment_size",
"librdkafka_replyq",
"librdkafka_tx",
"librdkafka_tx_bytes",
}

@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_get_next_returns_none_if_no_records(self, _):
self.object._consumer.poll = mock.MagicMock(return_value=None)
Expand Down Expand Up @@ -263,51 +278,6 @@ def test_raises_value_error_if_mandatory_parameters_not_set(self):
with pytest.raises(InvalidConfigurationError, match=expected_error_message):
Factory.create({"test": config}, logger=self.logger)

def test_expected_metrics_attributes(self):
expected_metrics = {
"commit_failures",
"commit_success",
"current_offsets",
"committed_offsets",
"librdkafka_age",
"librdkafka_rx",
"librdkafka_rx_bytes",
"librdkafka_rxmsgs",
"librdkafka_rxmsg_bytes",
"librdkafka_cgrp_stateage",
"librdkafka_cgrp_rebalance_age",
"librdkafka_cgrp_rebalance_cnt",
"librdkafka_cgrp_assignment_size",
}
metric_attributes = set(asdict(self.object.metrics).keys())
diffrences = expected_metrics.difference(metric_attributes)
assert not diffrences, str(diffrences)

def test_expected_metrics_attributes_are_initialized(self):
expected_metrics = {
"commit_failures",
"commit_success",
"current_offsets",
"committed_offsets",
"librdkafka_age",
"librdkafka_replyq",
"librdkafka_tx",
"librdkafka_tx_bytes",
"librdkafka_rx",
"librdkafka_rx_bytes",
"librdkafka_rxmsgs",
"librdkafka_rxmsg_bytes",
"librdkafka_cgrp_stateage",
"librdkafka_cgrp_rebalance_age",
"librdkafka_cgrp_rebalance_cnt",
"librdkafka_cgrp_assignment_size",
}
metric_attributes = asdict(self.object.metrics, recurse=False)
for metric_name in expected_metrics:
assert metric_attributes.get(metric_name) is not None
assert isinstance(metric_attributes.get(metric_name), Metric)
assert metric_attributes.get(metric_name).tracker is not None

@pytest.mark.parametrize(
"metric_name",
[
Expand Down
52 changes: 14 additions & 38 deletions tests/unit/connector/test_confluent_kafka_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ class TestConfluentKafkaOutput(BaseOutputTestCase, CommonConfluentKafkaTestCase)
},
}

expected_metrics = {
"librdkafka_age",
"librdkafka_msg_cnt",
"librdkafka_msg_size",
"librdkafka_msg_max",
"librdkafka_msg_size_max",
"librdkafka_tx",
"librdkafka_tx_bytes",
"librdkafka_rx",
"librdkafka_rx_bytes",
"librdkafka_txmsgs",
"librdkafka_txmsg_bytes",
}

@mock.patch("logprep.connector.confluent_kafka.output.Producer", return_value="The Producer")
def test_producer_property_instanciates_kafka_producer(self, _):
kafka_output = Factory.create({"test connector": self.CONFIG}, logger=self.logger)
Expand Down Expand Up @@ -144,41 +158,3 @@ def test_raises_value_error_if_mandatory_parameters_not_set(self):
expected_error_message = r"keys are missing: {'bootstrap.servers'}"
with pytest.raises(InvalidConfigurationError, match=expected_error_message):
Factory.create({"test": config}, logger=self.logger)

def test_expected_metrics_attributes(self):
expected_metrics = {
"librdkafka_age",
"librdkafka_msg_cnt",
"librdkafka_msg_size",
"librdkafka_msg_max",
"librdkafka_msg_size_max",
"librdkafka_tx",
"librdkafka_tx_bytes",
"librdkafka_rx",
"librdkafka_rx_bytes",
"librdkafka_txmsgs",
"librdkafka_txmsg_bytes",
}
metric_attributes = set(asdict(self.object.metrics).keys())
diffrences = expected_metrics.difference(metric_attributes)
assert not diffrences, str(diffrences)

def test_expected_metrics_attributes_are_initialized(self):
expected_metrics = {
"librdkafka_age",
"librdkafka_msg_cnt",
"librdkafka_msg_size",
"librdkafka_msg_max",
"librdkafka_msg_size_max",
"librdkafka_tx",
"librdkafka_tx_bytes",
"librdkafka_rx",
"librdkafka_rx_bytes",
"librdkafka_txmsgs",
"librdkafka_txmsg_bytes",
}
metric_attributes = asdict(self.object.metrics, recurse=False)
for metric_name in expected_metrics:
assert metric_attributes.get(metric_name) is not None
assert isinstance(metric_attributes.get(metric_name), Metric)
assert metric_attributes.get(metric_name).tracker is not None

0 comments on commit 488e825

Please sign in to comment.