diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index 5519c7849..a595d98cb 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -53,6 +53,7 @@ Input, WarningInputError, ) +from logprep.metrics.metrics import CounterMetric, GaugeMetric from logprep.util.validators import keys_in_validator DEFAULTS = { @@ -80,87 +81,99 @@ class ConfluentKafkaInput(Input): class Metrics(Input.Metrics): """Metrics for ConfluentKafkaInput""" - _prefix = "logprep_connector_input_kafka_" - - _stats: dict = field(factory=dict) - """statistcs form librdkafka. Is filled by `_stats_callback`""" - _commit_failures: int = 0 + # _stats: dict = field(factory=dict) + # """statistcs form librdkafka. Is filled by `_stats_callback`""" + commit_failures: CounterMetric = field( + factory=lambda: CounterMetric( + description="count of failed commits. Is filled by `_commit_callback", + name="confluent_kafka_input_commit_failures", + ) + ) """count of failed commits. Is filled by `_commit_callback`""" - _commit_success: int = 0 + commit_success: CounterMetric = field( + factory=lambda: CounterMetric( + description="count of successful commits. Is filled by `_commit_callback`", + name="confluent_kafka_input_commit_success", + ) + ) """count of successful commits. Is filled by `_commit_callback`""" - _current_offsets: dict = field(factory=dict) + current_offsets: GaugeMetric = field( + factory=lambda: GaugeMetric( + description="current offsets of the consumer. Is filled by `_get_raw_event`", + name="confluent_kafka_input_current_offsets", + ) + ) """current offsets of the consumer. Is filled by `_get_raw_event`""" - _committed_offsets: dict = field(factory=dict) - """committed offsets of the consumer. Is filled by `_commit_callback`""" - _consumer_group_id: str = "" - """group id of the consumer. Is filled during initialization""" - _consumer_client_id: str = "" - """client id of the consumer. Is filled during initialization""" - _consumer_topic: str = "" - """topic of the consumer. Is filled during initialization""" - - @cached_property - def _rdkafka_labels(self) -> str: - client_id = self._consumer_client_id - group_id = self._consumer_group_id - topic = self._consumer_topic - labels = {"client_id": client_id, "group_id": group_id, "topic": topic} - labels = self._labels | labels - labels = [":".join(item) for item in labels.items()] - labels = ",".join(labels) - return labels - - def _get_kafka_input_metrics(self) -> dict: - exp = { - f"{self._prefix}kafka_consumer_current_offset;" # nosemgrep - f"{self._rdkafka_labels},partition:{partition}": offset - for partition, offset in self._current_offsets.items() - } - exp |= { - f"{self._prefix}kafka_consumer_committed_offset;" # nosemgrep - f"{self._rdkafka_labels},partition:{partition}": offset - for partition, offset in self._committed_offsets.items() - } - exp.update( - { - f"{self._prefix}kafka_consumer_commit_failures;" - f"{self._rdkafka_labels}": self._commit_failures, - f"{self._prefix}kafka_consumer_commit_success;" - f"{self._rdkafka_labels}": self._commit_success, - } + committed_offsets: GaugeMetric = field( + factory=lambda: GaugeMetric( + description="committed offsets of the consumer. Is filled by `_commit_callback`", + name="confluent_kafka_input_committed_offsets", ) - return exp - - def _get_top_level_metrics(self) -> dict: - return { - f"{self._prefix}librdkafka_consumer_{stat};{self._rdkafka_labels}": value - for stat, value in self._stats.items() - if isinstance(value, (int, float)) - } + ) + """committed offsets of the consumer. Is filled by `_commit_callback`""" - def _get_cgrp_metrics(self) -> dict: - exp = {} - cgrp = self._stats.get("cgrp", {}) - for stat, value in cgrp.items(): - if isinstance(value, (int, float)): - exp[f"{self._prefix}librdkafka_cgrp_{stat};{self._rdkafka_labels}"] = value - return exp - - def expose(self) -> dict: - """overload of `expose` to add kafka specific metrics - - Returns - ------- - dict - metrics dictionary - """ - exp = super().expose() - labels = [":".join(item) for item in self._labels.items()] - labels = ",".join(labels) - exp |= self._get_top_level_metrics() - exp |= self._get_cgrp_metrics() - exp |= self._get_kafka_input_metrics() - return exp + # @cached_property + # def _rdkafka_labels(self) -> str: + # client_id = self._consumer_client_id + # group_id = self._consumer_group_id + # topic = self._consumer_topic + # labels = {"client_id": client_id, "group_id": group_id, "topic": topic} + # labels = self._labels | labels + # labels = [":".join(item) for item in labels.items()] + # labels = ",".join(labels) + # return labels + + # def _get_kafka_input_metrics(self) -> dict: + # exp = { + # f"{self._prefix}kafka_consumer_current_offset;" # nosemgrep + # f"{self._rdkafka_labels},partition:{partition}": offset + # for partition, offset in self._current_offsets.items() + # } + # exp |= { + # f"{self._prefix}kafka_consumer_committed_offset;" # nosemgrep + # f"{self._rdkafka_labels},partition:{partition}": offset + # for partition, offset in self._committed_offsets.items() + # } + # exp.update( + # { + # f"{self._prefix}kafka_consumer_commit_failures;" + # f"{self._rdkafka_labels}": self._commit_failures, + # f"{self._prefix}kafka_consumer_commit_success;" + # f"{self._rdkafka_labels}": self._commit_success, + # } + # ) + # return exp + + # def _get_top_level_metrics(self) -> dict: + # return { + # f"{self._prefix}librdkafka_consumer_{stat};{self._rdkafka_labels}": value + # for stat, value in self._stats.items() + # if isinstance(value, (int, float)) + # } + + # def _get_cgrp_metrics(self) -> dict: + # exp = {} + # cgrp = self._stats.get("cgrp", {}) + # for stat, value in cgrp.items(): + # if isinstance(value, (int, float)): + # exp[f"{self._prefix}librdkafka_cgrp_{stat};{self._rdkafka_labels}"] = value + # return exp + + # def expose(self) -> dict: + # """overload of `expose` to add kafka specific metrics + + # Returns + # ------- + # dict + # metrics dictionary + # """ + # exp = super().expose() + # labels = [":".join(item) for item in self._labels.items()] + # labels = ",".join(labels) + # exp |= self._get_top_level_metrics() + # exp |= self._get_cgrp_metrics() + # exp |= self._get_kafka_input_metrics() + # return exp @define(kw_only=True, slots=False) class Config(Input.Config): @@ -200,9 +213,6 @@ class Config(Input.Config): def __init__(self, name: str, configuration: "Connector.Config", logger: Logger) -> None: super().__init__(name, configuration, logger) self._last_valid_records = {} - self.metrics._consumer_group_id = self._config.kafka_config["group.id"] - self.metrics._consumer_client_id = self._config.kafka_config.get("client.id", getfqdn()) - self.metrics._consumer_topic = self._config.topic @cached_property def _consumer(self) -> Consumer: @@ -252,7 +262,7 @@ def _stats_callback(self, stats: str) -> None: details about the data can be found here: https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md """ - self.metrics._stats = self._decoder.decode(stats) # pylint: disable=protected-access + # self.metrics._stats = self._decoder.decode(stats) # pylint: disable=protected-access def _commit_callback( self, error: Union[KafkaException, None], topic_partitions: list[TopicPartition] @@ -273,16 +283,19 @@ def _commit_callback( if `error` is not None """ if error is not None: - self.metrics._commit_failures += 1 + self.metrics.commit_failures += 1 raise WarningInputError( self, f"Could not commit offsets for {topic_partitions}: {error}" ) - self.metrics._commit_success += 1 - self.metrics._committed_offsets |= { - topic_partition.partition: topic_partition.offset - for topic_partition in topic_partitions - if topic_partition.offset not in SPECIAL_OFFSETS - } + self.metrics.commit_success += 1 + for topic_partition in topic_partitions: + offset = topic_partition.offset + if offset in SPECIAL_OFFSETS: + offset = 0 + labels = { + "description": f"topic: {self._config.topic} - partition: {topic_partition.partition}" + } + self.metrics.committed_offsets.add_with_labels(offset, labels) def describe(self) -> str: """Get name of Kafka endpoint and bootstrap servers. @@ -325,8 +338,8 @@ def _get_raw_event(self, timeout: float) -> bytearray: self, "A confluent-kafka record contains an error code", kafka_error ) self._last_valid_records[message.partition()] = message - offset = {message.partition(): message.offset() + 1} - self.metrics._current_offsets |= offset # pylint: disable=protected-access + labels = {"description": f"topic: {self._config.topic} - partition: {message.partition()}"} + self.metrics.current_offsets.add_with_labels(message.offset() + 1, labels) return message.value() def _get_event(self, timeout: float) -> Union[Tuple[None, None], Tuple[dict, dict]]: @@ -395,17 +408,18 @@ def _handle_offsets(self, offset_handler: Callable) -> None: def _assign_callback(self, consumer, topic_partitions): for topic_partition in topic_partitions: + offset, partition = topic_partition.offset, topic_partition.partition self._logger.info( f"{consumer.memberid()} was assigned to " f"topic: {topic_partition.topic} | " - f"partition {topic_partition.partition}" + f"partition {partition}" ) - if topic_partition.offset in SPECIAL_OFFSETS: - continue - partition_offset = { - topic_partition.partition: topic_partition.offset, - } - self.metrics._current_offsets |= partition_offset + if offset in SPECIAL_OFFSETS: + offset = 0 + + labels = {"description": f"topic: {self._config.topic} - partition: {partition}"} + self.metrics.committed_offsets.add_with_labels(offset, labels) + self.metrics.current_offsets.add_with_labels(offset, labels) def _revoke_callback(self, consumer, topic_partitions): for topic_partition in topic_partitions: diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index 003197d26..3e6913cff 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -8,6 +8,7 @@ import logging import logging.handlers import multiprocessing + # pylint: disable=logging-fstring-interpolation import queue import warnings @@ -348,19 +349,16 @@ def process_event(self, event: dict): """ ToDos: - - TimeTracking - Processor Specific Metrics (Pseudonymizer, Amides, DomainResolver) - Fix Pseudonymizer str has no match - - count number warnings/errors separatley or delete them from all metrics? - Tests - - delete metric exposer - delete SharedCounter (Events in last 5 min: n) - create Grafana Dashboards - add pipelinemanager metrics (pipeline restarts) - clean up PrometheusExporter ("remove stale metric files" stil needed?) - add Kafka librdkafka metrics - - count warnings - add version info to metrics + - enforce rule_id should be unique """ event_received = self._encoder.encode(event) diff --git a/logprep/metrics/metrics.py b/logprep/metrics/metrics.py index 84723c190..b554328e0 100644 --- a/logprep/metrics/metrics.py +++ b/logprep/metrics/metrics.py @@ -55,6 +55,7 @@ def init_tracker(self): documentation=self.description, labelnames=self.labels.keys(), registry=self._registry, + multiprocess_mode="liveall", ) tracker.labels(**self.labels) @@ -74,7 +75,12 @@ class CounterMetric(Metric): trackers: dict = {} def __add__(self, other): - self.trackers.get(self.fullname).labels(**self.labels).inc(other) + return self.add_with_labels(other, self.labels) + + def add_with_labels(self, other, labels): + """Add with labels""" + labels = self.labels | labels + self.trackers.get(self.fullname).labels(**labels).inc(other) return self @@ -96,5 +102,10 @@ class GaugeMetric(Metric): trackers: dict = {} def __add__(self, other): - self.trackers.get(self.fullname).labels(**self.labels).set(other) + return self.add_with_labels(other, self.labels) + + def add_with_labels(self, other, labels): + """Add with labels""" + labels = self.labels | labels + self.trackers.get(self.fullname).labels(**labels).set(other) return self