Skip to content

Commit

Permalink
add offset metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Oct 20, 2023
1 parent 978f086 commit 415a0b1
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 102 deletions.
206 changes: 110 additions & 96 deletions logprep/connector/confluent_kafka/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
Input,
WarningInputError,
)
from logprep.metrics.metrics import CounterMetric, GaugeMetric
from logprep.util.validators import keys_in_validator

DEFAULTS = {
Expand Down Expand Up @@ -80,87 +81,99 @@ class ConfluentKafkaInput(Input):
class Metrics(Input.Metrics):
"""Metrics for ConfluentKafkaInput"""

_prefix = "logprep_connector_input_kafka_"

_stats: dict = field(factory=dict)
"""statistcs form librdkafka. Is filled by `_stats_callback`"""
_commit_failures: int = 0
# _stats: dict = field(factory=dict)
# """statistcs form librdkafka. Is filled by `_stats_callback`"""
commit_failures: CounterMetric = field(
factory=lambda: CounterMetric(
description="count of failed commits. Is filled by `_commit_callback",
name="confluent_kafka_input_commit_failures",
)
)
"""count of failed commits. Is filled by `_commit_callback`"""
_commit_success: int = 0
commit_success: CounterMetric = field(
factory=lambda: CounterMetric(
description="count of successful commits. Is filled by `_commit_callback`",
name="confluent_kafka_input_commit_success",
)
)
"""count of successful commits. Is filled by `_commit_callback`"""
_current_offsets: dict = field(factory=dict)
current_offsets: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="current offsets of the consumer. Is filled by `_get_raw_event`",
name="confluent_kafka_input_current_offsets",
)
)
"""current offsets of the consumer. Is filled by `_get_raw_event`"""
_committed_offsets: dict = field(factory=dict)
"""committed offsets of the consumer. Is filled by `_commit_callback`"""
_consumer_group_id: str = ""
"""group id of the consumer. Is filled during initialization"""
_consumer_client_id: str = ""
"""client id of the consumer. Is filled during initialization"""
_consumer_topic: str = ""
"""topic of the consumer. Is filled during initialization"""

@cached_property
def _rdkafka_labels(self) -> str:
client_id = self._consumer_client_id
group_id = self._consumer_group_id
topic = self._consumer_topic
labels = {"client_id": client_id, "group_id": group_id, "topic": topic}
labels = self._labels | labels
labels = [":".join(item) for item in labels.items()]
labels = ",".join(labels)
return labels

def _get_kafka_input_metrics(self) -> dict:
exp = {
f"{self._prefix}kafka_consumer_current_offset;" # nosemgrep
f"{self._rdkafka_labels},partition:{partition}": offset
for partition, offset in self._current_offsets.items()
}
exp |= {
f"{self._prefix}kafka_consumer_committed_offset;" # nosemgrep
f"{self._rdkafka_labels},partition:{partition}": offset
for partition, offset in self._committed_offsets.items()
}
exp.update(
{
f"{self._prefix}kafka_consumer_commit_failures;"
f"{self._rdkafka_labels}": self._commit_failures,
f"{self._prefix}kafka_consumer_commit_success;"
f"{self._rdkafka_labels}": self._commit_success,
}
committed_offsets: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="committed offsets of the consumer. Is filled by `_commit_callback`",
name="confluent_kafka_input_committed_offsets",
)
return exp

def _get_top_level_metrics(self) -> dict:
return {
f"{self._prefix}librdkafka_consumer_{stat};{self._rdkafka_labels}": value
for stat, value in self._stats.items()
if isinstance(value, (int, float))
}
)
"""committed offsets of the consumer. Is filled by `_commit_callback`"""

def _get_cgrp_metrics(self) -> dict:
exp = {}
cgrp = self._stats.get("cgrp", {})
for stat, value in cgrp.items():
if isinstance(value, (int, float)):
exp[f"{self._prefix}librdkafka_cgrp_{stat};{self._rdkafka_labels}"] = value
return exp

def expose(self) -> dict:
"""overload of `expose` to add kafka specific metrics
Returns
-------
dict
metrics dictionary
"""
exp = super().expose()
labels = [":".join(item) for item in self._labels.items()]
labels = ",".join(labels)
exp |= self._get_top_level_metrics()
exp |= self._get_cgrp_metrics()
exp |= self._get_kafka_input_metrics()
return exp
# @cached_property
# def _rdkafka_labels(self) -> str:
# client_id = self._consumer_client_id
# group_id = self._consumer_group_id
# topic = self._consumer_topic
# labels = {"client_id": client_id, "group_id": group_id, "topic": topic}
# labels = self._labels | labels
# labels = [":".join(item) for item in labels.items()]
# labels = ",".join(labels)
# return labels

# def _get_kafka_input_metrics(self) -> dict:
# exp = {
# f"{self._prefix}kafka_consumer_current_offset;" # nosemgrep
# f"{self._rdkafka_labels},partition:{partition}": offset
# for partition, offset in self._current_offsets.items()
# }
# exp |= {
# f"{self._prefix}kafka_consumer_committed_offset;" # nosemgrep
# f"{self._rdkafka_labels},partition:{partition}": offset
# for partition, offset in self._committed_offsets.items()
# }
# exp.update(
# {
# f"{self._prefix}kafka_consumer_commit_failures;"
# f"{self._rdkafka_labels}": self._commit_failures,
# f"{self._prefix}kafka_consumer_commit_success;"
# f"{self._rdkafka_labels}": self._commit_success,
# }
# )
# return exp

# def _get_top_level_metrics(self) -> dict:
# return {
# f"{self._prefix}librdkafka_consumer_{stat};{self._rdkafka_labels}": value
# for stat, value in self._stats.items()
# if isinstance(value, (int, float))
# }

# def _get_cgrp_metrics(self) -> dict:
# exp = {}
# cgrp = self._stats.get("cgrp", {})
# for stat, value in cgrp.items():
# if isinstance(value, (int, float)):
# exp[f"{self._prefix}librdkafka_cgrp_{stat};{self._rdkafka_labels}"] = value
# return exp

# def expose(self) -> dict:
# """overload of `expose` to add kafka specific metrics

# Returns
# -------
# dict
# metrics dictionary
# """
# exp = super().expose()
# labels = [":".join(item) for item in self._labels.items()]
# labels = ",".join(labels)
# exp |= self._get_top_level_metrics()
# exp |= self._get_cgrp_metrics()
# exp |= self._get_kafka_input_metrics()
# return exp

@define(kw_only=True, slots=False)
class Config(Input.Config):
Expand Down Expand Up @@ -200,9 +213,6 @@ class Config(Input.Config):
def __init__(self, name: str, configuration: "Connector.Config", logger: Logger) -> None:
super().__init__(name, configuration, logger)
self._last_valid_records = {}
self.metrics._consumer_group_id = self._config.kafka_config["group.id"]
self.metrics._consumer_client_id = self._config.kafka_config.get("client.id", getfqdn())
self.metrics._consumer_topic = self._config.topic

@cached_property
def _consumer(self) -> Consumer:
Expand Down Expand Up @@ -252,7 +262,7 @@ def _stats_callback(self, stats: str) -> None:
details about the data can be found here:
https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md
"""
self.metrics._stats = self._decoder.decode(stats) # pylint: disable=protected-access
# self.metrics._stats = self._decoder.decode(stats) # pylint: disable=protected-access

def _commit_callback(
self, error: Union[KafkaException, None], topic_partitions: list[TopicPartition]
Expand All @@ -273,16 +283,19 @@ def _commit_callback(
if `error` is not None
"""
if error is not None:
self.metrics._commit_failures += 1
self.metrics.commit_failures += 1
raise WarningInputError(
self, f"Could not commit offsets for {topic_partitions}: {error}"
)
self.metrics._commit_success += 1
self.metrics._committed_offsets |= {
topic_partition.partition: topic_partition.offset
for topic_partition in topic_partitions
if topic_partition.offset not in SPECIAL_OFFSETS
}
self.metrics.commit_success += 1
for topic_partition in topic_partitions:
offset = topic_partition.offset
if offset in SPECIAL_OFFSETS:
offset = 0
labels = {
"description": f"topic: {self._config.topic} - partition: {topic_partition.partition}"
}
self.metrics.committed_offsets.add_with_labels(offset, labels)

def describe(self) -> str:
"""Get name of Kafka endpoint and bootstrap servers.
Expand Down Expand Up @@ -325,8 +338,8 @@ def _get_raw_event(self, timeout: float) -> bytearray:
self, "A confluent-kafka record contains an error code", kafka_error
)
self._last_valid_records[message.partition()] = message
offset = {message.partition(): message.offset() + 1}
self.metrics._current_offsets |= offset # pylint: disable=protected-access
labels = {"description": f"topic: {self._config.topic} - partition: {message.partition()}"}
self.metrics.current_offsets.add_with_labels(message.offset() + 1, labels)
return message.value()

def _get_event(self, timeout: float) -> Union[Tuple[None, None], Tuple[dict, dict]]:
Expand Down Expand Up @@ -395,17 +408,18 @@ def _handle_offsets(self, offset_handler: Callable) -> None:

def _assign_callback(self, consumer, topic_partitions):
for topic_partition in topic_partitions:
offset, partition = topic_partition.offset, topic_partition.partition
self._logger.info(
f"{consumer.memberid()} was assigned to "
f"topic: {topic_partition.topic} | "
f"partition {topic_partition.partition}"
f"partition {partition}"
)
if topic_partition.offset in SPECIAL_OFFSETS:
continue
partition_offset = {
topic_partition.partition: topic_partition.offset,
}
self.metrics._current_offsets |= partition_offset
if offset in SPECIAL_OFFSETS:
offset = 0

labels = {"description": f"topic: {self._config.topic} - partition: {partition}"}
self.metrics.committed_offsets.add_with_labels(offset, labels)
self.metrics.current_offsets.add_with_labels(offset, labels)

def _revoke_callback(self, consumer, topic_partitions):
for topic_partition in topic_partitions:
Expand Down
6 changes: 2 additions & 4 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
import logging.handlers
import multiprocessing

# pylint: disable=logging-fstring-interpolation
import queue
import warnings
Expand Down Expand Up @@ -348,19 +349,16 @@ def process_event(self, event: dict):

"""
ToDos:
- TimeTracking
- Processor Specific Metrics (Pseudonymizer, Amides, DomainResolver)
- Fix Pseudonymizer str has no match
- count number warnings/errors separatley or delete them from all metrics?
- Tests
- delete metric exposer
- delete SharedCounter (Events in last 5 min: n)
- create Grafana Dashboards
- add pipelinemanager metrics (pipeline restarts)
- clean up PrometheusExporter ("remove stale metric files" stil needed?)
- add Kafka librdkafka metrics
- count warnings
- add version info to metrics
- enforce rule_id should be unique
"""

event_received = self._encoder.encode(event)
Expand Down
15 changes: 13 additions & 2 deletions logprep/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def init_tracker(self):
documentation=self.description,
labelnames=self.labels.keys(),
registry=self._registry,
multiprocess_mode="liveall",
)
tracker.labels(**self.labels)

Expand All @@ -74,7 +75,12 @@ class CounterMetric(Metric):
trackers: dict = {}

def __add__(self, other):
self.trackers.get(self.fullname).labels(**self.labels).inc(other)
return self.add_with_labels(other, self.labels)

def add_with_labels(self, other, labels):
"""Add with labels"""
labels = self.labels | labels
self.trackers.get(self.fullname).labels(**labels).inc(other)
return self


Expand All @@ -96,5 +102,10 @@ class GaugeMetric(Metric):
trackers: dict = {}

def __add__(self, other):
self.trackers.get(self.fullname).labels(**self.labels).set(other)
return self.add_with_labels(other, self.labels)

def add_with_labels(self, other, labels):
"""Add with labels"""
labels = self.labels | labels
self.trackers.get(self.fullname).labels(**labels).set(other)
return self

0 comments on commit 415a0b1

Please sign in to comment.