Skip to content

Commit

Permalink
implement output metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Oct 23, 2023
1 parent 2e19562 commit 1c0f046
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 72 deletions.
22 changes: 22 additions & 0 deletions logprep/connector/confluent_kafka/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,28 @@ class Metrics(Input.Metrics):
)
"""Time since this client instance was created (microseconds)"""

librdkafka_replyq: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="Number of ops (callbacks, events, etc) waiting in queue for application to serve with rd_kafka_poll()",
name="confluent_kafka_input_librdkafka_replyq",
)
)
"""Number of ops (callbacks, events, etc) waiting in queue for application to serve with rd_kafka_poll()"""
librdkafka_tx: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="Total number of requests sent to Kafka brokers",
name="confluent_kafka_input_librdkafka_tx",
)
)
"""Total number of requests sent to Kafka brokers"""
librdkafka_tx_bytes: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="Total number of bytes transmitted to Kafka brokers",
name="confluent_kafka_input_librdkafka_tx_bytes",
)
)
"""Total number of bytes transmitted to Kafka brokers"""

librdkafka_rx: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="Total number of responses received from Kafka brokers",
Expand Down
70 changes: 70 additions & 0 deletions logprep/connector/confluent_kafka/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,76 @@ class Metrics(Output.Metrics):
)
)
"""Time since this client instance was created (microseconds)"""
librdkafka_msg_cnt: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="Current number of messages in producer queues",
name="confluent_kafka_output_librdkafka_msg_cnt",
)
)
"""Current number of messages in producer queues"""
librdkafka_msg_size: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="Current total size of messages in producer queues",
name="confluent_kafka_output_librdkafka_msg_size",
)
)
"""Current total size of messages in producer queues"""
librdkafka_msg_max: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="Threshold: maximum number of messages allowed allowed on the producer queues",
name="confluent_kafka_output_librdkafka_msg_max",
)
)
"""Threshold: maximum number of messages allowed allowed on the producer queues"""
librdkafka_msg_size_max: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="Threshold: maximum total size of messages allowed on the producer queues",
name="confluent_kafka_output_librdkafka_msg_size_max",
)
)
"""Threshold: maximum total size of messages allowed on the producer queues"""
librdkafka_tx: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="Total number of requests sent to Kafka brokers",
name="confluent_kafka_output_librdkafka_tx",
)
)
"""Total number of requests sent to Kafka brokers"""
librdkafka_tx_bytes: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="Total number of bytes transmitted to Kafka brokers",
name="confluent_kafka_output_librdkafka_tx_bytes",
)
)
"""Total number of bytes transmitted to Kafka brokers"""
librdkafka_rx: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="Total number of responses received from Kafka brokers",
name="confluent_kafka_output_librdkafka_rx",
)
)
"""Total number of responses received from Kafka brokers"""
librdkafka_rx_bytes: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="Total number of bytes received from Kafka brokers",
name="confluent_kafka_output_librdkafka_rx_bytes",
)
)
"""Total number of bytes received from Kafka brokers"""
librdkafka_txmsgs: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="Total number of messages transmitted (produced) to Kafka brokers",
name="confluent_kafka_output_librdkafka_txmsgs",
)
)
"""Total number of messages transmitted (produced) to Kafka brokers"""
librdkafka_txmsg_bytes: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="Total number of message bytes (including framing, such as per-Message framing and MessageSet/batch framing) transmitted to Kafka brokers",
name="confluent_kafka_output_librdkafka_txmsg_bytes",
)
)
"""Total number of message bytes (including framing, such as per-Message framing and MessageSet/batch framing) transmitted to Kafka brokers"""

@define(kw_only=True, slots=False)
class Config(Output.Config):
Expand Down
5 changes: 5 additions & 0 deletions tests/unit/connector/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@
from logging import getLogger
from unittest import mock

from prometheus_client import CollectorRegistry

from logprep.abc.connector import Connector
from logprep.abc.input import Input
from logprep.abc.output import Output
from logprep.factory import Factory
from logprep.util.time import TimeParser
from tests.unit.component.base import BaseComponentTestCase

CUSTOM_REGISTRY = CollectorRegistry(auto_describe=True)


class BaseConnectorTestCase(BaseComponentTestCase):
CONFIG: dict = {}
Expand All @@ -24,6 +28,7 @@ class BaseConnectorTestCase(BaseComponentTestCase):

def setup_method(self) -> None:
config = {"Test Instance Name": self.CONFIG}
self.custom_registry = CUSTOM_REGISTRY
self.object = Factory.create(configuration=config, logger=self.logger)

def test_is_a_connector_implementation(self):
Expand Down
52 changes: 11 additions & 41 deletions tests/unit/connector/test_confluent_kafka_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import pytest
from attrs import asdict
from confluent_kafka import KafkaException
from prometheus_client import CollectorRegistry

from logprep.abc.input import (
CriticalInputError,
Expand All @@ -21,7 +22,7 @@
from logprep.factory import Factory
from logprep.factory_error import InvalidConfigurationError
from logprep.metrics.metrics import Metric
from tests.unit.connector.base import BaseInputTestCase
from tests.unit.connector.base import CUSTOM_REGISTRY, BaseInputTestCase
from tests.unit.connector.test_confluent_kafka_common import (
CommonConfluentKafkaTestCase,
)
Expand Down Expand Up @@ -197,16 +198,19 @@ def test_commit_callback_raises_warning_error_and_counts_failures(self):
assert self.object._commit_failures == 1

def test_commit_callback_counts_commit_success(self):
assert self.object.metrics._commit_success == 0
self.object.metrics.commit_success = 0
self.object._commit_callback(None, [mock.MagicMock()])
assert self.object.metrics._commit_success == 1
assert self.object.metrics.commit_success == 1

def test_commit_callback_sets_committed_offsets(self):
mock_add = mock.MagicMock()
self.object.metrics.committed_offsets.add_with_labels = mock_add
topic_partion = mock.MagicMock()
topic_partion.partition = 99
topic_partion.offset = 666
self.object._commit_callback(None, [topic_partion])
assert self.object.metrics._committed_offsets == {99: 666}
call_args = 666, {"description": "topic: test_input_raw - partition: 99"}
mock_add.assert_called_with(*call_args)

@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_default_config_is_injected(self, mock_consumer):
Expand Down Expand Up @@ -245,43 +249,6 @@ def test_statistics_interval_can_be_overwritten(self, mock_consumer):
mock_consumer.assert_called()
assert mock_consumer.call_args[0][0].get("statistics.interval.ms") == "999999999"

def test_metrics_expose_returns_data(self):
json_string = Path(KAFKA_STATS_JSON_PATH).read_text("utf8")
self.object._stats_callback(json_string)
client_id = socket.getfqdn()
# pylint: disable=line-too-long
expected = {
"logprep_connector_number_of_processed_events;direction:input,name:Test Instance Name,type:confluentkafka_input": 0.0,
"logprep_connector_mean_processing_time_per_event;direction:input,name:Test Instance Name,type:confluentkafka_input": 0.0,
"logprep_connector_number_of_warnings;direction:input,name:Test Instance Name,type:confluentkafka_input": 0.0,
"logprep_connector_number_of_errors;direction:input,name:Test Instance Name,type:confluentkafka_input": 0.0,
f"logprep_connector_librdkafka_consumer_ts;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 5016483227792,
f"logprep_connector_librdkafka_consumer_time;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 1527060869,
f"logprep_connector_librdkafka_consumer_replyq;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 0,
f"logprep_connector_librdkafka_consumer_msg_cnt;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 22710,
f"logprep_connector_librdkafka_consumer_msg_size;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 704010,
f"logprep_connector_librdkafka_consumer_msg_max;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 500000,
f"logprep_connector_librdkafka_consumer_msg_size_max;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 1073741824,
f"logprep_connector_librdkafka_consumer_simple_cnt;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 0,
f"logprep_connector_librdkafka_consumer_metadata_cache_cnt;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 1,
f"logprep_connector_librdkafka_consumer_tx;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 631,
f"logprep_connector_librdkafka_consumer_tx_bytes;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 168584479,
f"logprep_connector_librdkafka_consumer_rx;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 631,
f"logprep_connector_librdkafka_consumer_rx_bytes;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 31084,
f"logprep_connector_librdkafka_consumer_txmsgs;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 4300753,
f"logprep_connector_librdkafka_consumer_txmsg_bytes;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 133323343,
f"logprep_connector_librdkafka_consumer_rxmsgs;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 0,
f"logprep_connector_librdkafka_consumer_rxmsg_bytes;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 0,
f"logprep_connector_librdkafka_cgrp_stateage;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 996,
f"logprep_connector_librdkafka_cgrp_rebalance_age;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 0,
f"logprep_connector_librdkafka_cgrp_rebalance_cnt;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 0,
f"logprep_connector_librdkafka_cgrp_assignment_size;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 0,
f"logprep_connector_kafka_consumer_commit_failures;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 0,
f"logprep_connector_kafka_consumer_commit_success;direction:input,name:Test Instance Name,type:confluentkafka_input,client_id:{client_id},group_id:testgroup,topic:test_input_raw": 0,
}
# pylint: enable=line-too-long
assert self.object.metrics.expose() == expected

@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_raises_fatal_input_error_if_poll_raises_runtime_error(self, _):
self.object._consumer.poll.side_effect = RuntimeError("test error")
Expand Down Expand Up @@ -323,6 +290,9 @@ def test_expected_metrics_attributes_are_initialized(self):
"current_offsets",
"committed_offsets",
"librdkafka_age",
"librdkafka_replyq",
"librdkafka_tx",
"librdkafka_tx_bytes",
"librdkafka_rx",
"librdkafka_rx_bytes",
"librdkafka_rxmsgs",
Expand Down
71 changes: 40 additions & 31 deletions tests/unit/connector/test_confluent_kafka_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
from unittest import mock

import pytest
from attrs import asdict

from logprep.abc.output import CriticalOutputError, FatalOutputError
from logprep.factory import Factory
from logprep.factory_error import InvalidConfigurationError
from logprep.metrics.metrics import Metric
from tests.unit.connector.base import BaseOutputTestCase
from tests.unit.connector.test_confluent_kafka_common import (
CommonConfluentKafkaTestCase,
Expand Down Expand Up @@ -136,40 +138,47 @@ def test_setup_raises_fatal_output_error_on_invalid_config(self):
with pytest.raises(FatalOutputError, match="No such configuration property"):
self.object.setup()

def test_metrics_expose_returns_data(self):
json_string = Path(KAFKA_STATS_JSON_PATH).read_text("utf8")
self.object._stats_callback(json_string)
client_id = socket.getfqdn()
# pylint: disable=line-too-long
expected = {
"logprep_connector_number_of_processed_events;direction:output,name:Test Instance Name,type:confluentkafka_output": 0.0,
"logprep_connector_mean_processing_time_per_event;direction:output,name:Test Instance Name,type:confluentkafka_output": 0.0,
"logprep_connector_number_of_warnings;direction:output,name:Test Instance Name,type:confluentkafka_output": 0.0,
"logprep_connector_number_of_errors;direction:output,name:Test Instance Name,type:confluentkafka_output": 0.0,
f"logprep_connector_librdkafka_producer_ts;direction:output,name:Test Instance Name,type:confluentkafka_output,client_id:{client_id}": 5016483227792,
f"logprep_connector_librdkafka_producer_time;direction:output,name:Test Instance Name,type:confluentkafka_output,client_id:{client_id}": 1527060869,
f"logprep_connector_librdkafka_producer_replyq;direction:output,name:Test Instance Name,type:confluentkafka_output,client_id:{client_id}": 0,
f"logprep_connector_librdkafka_producer_msg_cnt;direction:output,name:Test Instance Name,type:confluentkafka_output,client_id:{client_id}": 22710,
f"logprep_connector_librdkafka_producer_msg_size;direction:output,name:Test Instance Name,type:confluentkafka_output,client_id:{client_id}": 704010,
f"logprep_connector_librdkafka_producer_msg_max;direction:output,name:Test Instance Name,type:confluentkafka_output,client_id:{client_id}": 500000,
f"logprep_connector_librdkafka_producer_msg_size_max;direction:output,name:Test Instance Name,type:confluentkafka_output,client_id:{client_id}": 1073741824,
f"logprep_connector_librdkafka_producer_simple_cnt;direction:output,name:Test Instance Name,type:confluentkafka_output,client_id:{client_id}": 0,
f"logprep_connector_librdkafka_producer_metadata_cache_cnt;direction:output,name:Test Instance Name,type:confluentkafka_output,client_id:{client_id}": 1,
f"logprep_connector_librdkafka_producer_tx;direction:output,name:Test Instance Name,type:confluentkafka_output,client_id:{client_id}": 631,
f"logprep_connector_librdkafka_producer_tx_bytes;direction:output,name:Test Instance Name,type:confluentkafka_output,client_id:{client_id}": 168584479,
f"logprep_connector_librdkafka_producer_rx;direction:output,name:Test Instance Name,type:confluentkafka_output,client_id:{client_id}": 631,
f"logprep_connector_librdkafka_producer_rx_bytes;direction:output,name:Test Instance Name,type:confluentkafka_output,client_id:{client_id}": 31084,
f"logprep_connector_librdkafka_producer_txmsgs;direction:output,name:Test Instance Name,type:confluentkafka_output,client_id:{client_id}": 4300753,
f"logprep_connector_librdkafka_producer_txmsg_bytes;direction:output,name:Test Instance Name,type:confluentkafka_output,client_id:{client_id}": 133323343,
f"logprep_connector_librdkafka_producer_rxmsgs;direction:output,name:Test Instance Name,type:confluentkafka_output,client_id:{client_id}": 0,
f"logprep_connector_librdkafka_producer_rxmsg_bytes;direction:output,name:Test Instance Name,type:confluentkafka_output,client_id:{client_id}": 0,
}
# pylint: enable=line-too-long
assert self.object.metrics.expose() == expected

def test_raises_value_error_if_mandatory_parameters_not_set(self):
config = deepcopy(self.CONFIG)
config.get("kafka_config").pop("bootstrap.servers")
expected_error_message = r"keys are missing: {'bootstrap.servers'}"
with pytest.raises(InvalidConfigurationError, match=expected_error_message):
Factory.create({"test": config}, logger=self.logger)

def test_expected_metrics_attributes(self):
expected_metrics = {
"librdkafka_age",
"librdkafka_msg_cnt",
"librdkafka_msg_size",
"librdkafka_msg_max",
"librdkafka_msg_size_max",
"librdkafka_tx",
"librdkafka_tx_bytes",
"librdkafka_rx",
"librdkafka_rx_bytes",
"librdkafka_txmsgs",
"librdkafka_txmsg_bytes",
}
metric_attributes = set(asdict(self.object.metrics).keys())
diffrences = expected_metrics.difference(metric_attributes)
assert not diffrences, str(diffrences)

def test_expected_metrics_attributes_are_initialized(self):
expected_metrics = {
"librdkafka_age",
"librdkafka_msg_cnt",
"librdkafka_msg_size",
"librdkafka_msg_max",
"librdkafka_msg_size_max",
"librdkafka_tx",
"librdkafka_tx_bytes",
"librdkafka_rx",
"librdkafka_rx_bytes",
"librdkafka_txmsgs",
"librdkafka_txmsg_bytes",
}
metric_attributes = asdict(self.object.metrics, recurse=False)
for metric_name in expected_metrics:
assert metric_attributes.get(metric_name) is not None
assert isinstance(metric_attributes.get(metric_name), Metric)
assert metric_attributes.get(metric_name).tracker is not None

0 comments on commit 1c0f046

Please sign in to comment.