From e3cb524df7487e54593d35e39bf77b75d03d1291 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Fri, 9 Jun 2023 10:08:14 +0300 Subject: [PATCH 01/25] Karapace metrics --- README.rst | 9 ++ karapace.config.json | 5 +- karapace/config.py | 3 + karapace/karapacemetrics.py | 169 +++++++++++++++++++++++++++ karapace/rapu.py | 16 +++ karapace/schema_registry.py | 2 + karapace/statsd.py | 15 ++- requirements/requirements-dev.txt | 16 +-- requirements/requirements-typing.txt | 6 +- requirements/requirements.in | 2 + requirements/requirements.txt | 6 +- 11 files changed, 235 insertions(+), 14 deletions(-) create mode 100644 karapace/karapacemetrics.py diff --git a/README.rst b/README.rst index 20e9fb786..23a50f92c 100644 --- a/README.rst +++ b/README.rst @@ -458,6 +458,15 @@ Keys to take special care are the ones needed to configure Kafka and advertised_ * - ``master_election_strategy`` - ``lowest`` - Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup) + * - ``metrics_mode`` + - ``statsd`` + - Statistics server mode. For karapace supports ststsd server + * - ``statsd_uri`` + - ``127.0.0.1:8125`` + - Host:Port of statsd server + * - ``metrics_extended`` + - ``true`` + - Enable extended metrics. Extended metrics: connections_active, request_size_avg, request_size_max, response_size_avg, response_size_max Authentication and authorization of Karapace Schema Registry REST API diff --git a/karapace.config.json b/karapace.config.json index 55303ff4d..bded35aa6 100644 --- a/karapace.config.json +++ b/karapace.config.json @@ -27,5 +27,8 @@ "registry_authfile": null, "topic_name": "_schemas", "protobuf_runtime_directory": "runtime", - "session_timeout_ms": 10000 + "session_timeout_ms": 10000, + "metrics_mode": "statsd", + "statsd_uri": "127.0.0.1:8125", + "metrics_extended": true } diff --git a/karapace/config.py b/karapace/config.py index ac094f5dd..7dcebe2b1 100644 --- a/karapace/config.py +++ b/karapace/config.py @@ -142,6 +142,9 @@ class ConfigDefaults(Config, total=False): "karapace_registry": False, "master_election_strategy": "lowest", "protobuf_runtime_directory": "runtime", + "metrics_mode": "statsd", + "statsd_uri": "127.0.0.1:8125", + "metrics_extended": True, } SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD] diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py new file mode 100644 index 000000000..2f8acb850 --- /dev/null +++ b/karapace/karapacemetrics.py @@ -0,0 +1,169 @@ +""" +karapace - metrics +Supports collection of system metrics +list of supported metrics: +connections-active - The number of active HTTP(S) connections to server. + Data collected inside aiohttp request handler. + +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +from datetime import datetime +from kafka.metrics import MetricName, Metrics +from kafka.metrics.measurable_stat import AbstractMeasurableStat +from kafka.metrics.stats import Avg, Max, Rate, Total +from karapace.config import Config +from karapace.statsd import StatsClient +from typing import Dict, Optional + +import schedule +import threading +import time + + +class Value(AbstractMeasurableStat): + """ + An AbstractSampledStat that maintains a simple average over its samples. + """ + + def __init__(self) -> None: + super().__init__() + self.value = 0.0 + + # pylint: disable=unused-argument + def measure(self, config: object, now: int) -> float: + return self.value + + def record(self, config: object, value: float, time_ms: int) -> None: + self.value = value + + +class Singleton(type): + _instances: Dict["Singleton", "Singleton"] = {} + + def __call__(cls, *args: str, **kwargs: int) -> "Singleton": + if cls not in cls._instances: + instance = super().__call__(*args, **kwargs) + cls._instances[cls] = instance + return cls._instances[cls] + + +class KarapaceMetrics(metaclass=Singleton): + def __init__(self) -> None: + self.active: Optional[object] = None + self.stats_client: Optional[StatsClient] = None + self.is_ready = False + self.metrics = Metrics() + self.event = threading.Event() + self.worker_thread = threading.Thread(target=self.worker) + self.lock = threading.Lock() + + def setup(self, stats_client: StatsClient, config: Config) -> None: + self.active = config.get("metrics_extended") + if not self.active: + return + with self.lock: + if self.is_ready: + return + self.is_ready = True + + sensor = self.metrics.sensor("connections-active") + sensor.add(MetricName("connections-active", "kafka-metrics"), Total()) + + sensor = self.metrics.sensor("request-size") + sensor.add(MetricName("request-size-max", "kafka-metrics"), Max()) + sensor.add(MetricName("request-size-avg", "kafka-metrics"), Avg()) + + sensor = self.metrics.sensor("response-size") + sensor.add(MetricName("response-size-max", "kafka-metrics"), Max()) + sensor.add(MetricName("response-size-avg", "kafka-metrics"), Avg()) + + sensor = self.metrics.sensor("master-slave-role") + sensor.add(MetricName("master-slave-role", "kafka-metrics"), Value()) + + sensor = self.metrics.sensor("request-error-rate") + sensor.add(MetricName("request-error-rate", "kafka-metrics"), Rate()) + + sensor = self.metrics.sensor("request-rate") + sensor.add(MetricName("request-rate", "kafka-metrics"), Rate()) + + sensor = self.metrics.sensor("response-rate") + sensor.add(MetricName("response-rate", "kafka-metrics"), Rate()) + + sensor = self.metrics.sensor("response-byte-rate") + sensor.add(MetricName("response-byte-rate", "kafka-metrics"), Rate()) + + sensor = self.metrics.sensor("latency") + sensor.add(MetricName("latency-max", "kafka-metrics"), Max()) + sensor.add(MetricName("latency-avg", "kafka-metrics"), Avg()) + + self.stats_client = stats_client + + schedule.every(10).seconds.do(self.schedule) + + self.worker_thread.start() + + def connection(self) -> None: + if not self.active: + return + timestamp = int(datetime.utcnow().timestamp() * 1e3) + self.metrics.get_sensor("connections-active").record(1.0, timestamp) + + def request(self, size: int) -> None: + if not self.active: + return + timestamp = int(datetime.utcnow().timestamp() * 1e3) + self.metrics.get_sensor("request-size").record(size, timestamp) + self.metrics.get_sensor("request-rate").record(1, timestamp) + + def response(self, size: int) -> None: + if not self.active: + return + timestamp = int(datetime.utcnow().timestamp() * 1e3) + self.metrics.get_sensor("connections-active").record(-1.0, timestamp) + self.metrics.get_sensor("response-size").record(size, timestamp) + self.metrics.get_sensor("response-byte-rate").record(size, timestamp) + self.metrics.get_sensor("response-rate").record(1, timestamp) + + def are_we_master(self, is_master: bool) -> None: + if not self.active: + return + timestamp = int(datetime.utcnow().timestamp() * 1e3) + self.metrics.get_sensor("master-slave-role").record(int(is_master), timestamp) + + def latency(self, latency_ms: float) -> None: + if not self.active: + return + timestamp = int(datetime.utcnow().timestamp() * 1e3) + self.metrics.get_sensor("latency").record(latency_ms, timestamp) + + def error(self) -> None: + if not self.active: + return + timestamp = int(datetime.utcnow().timestamp() * 1e3) + self.metrics.get_sensor("request-error-rate").record(1, timestamp) + + def report(self) -> None: + if not self.active: + return + if isinstance(self.stats_client, StatsClient): + for metric_name in self.metrics.metrics: + value = self.metrics.metrics[metric_name].value() + self.stats_client.gauge(metric_name.name, value) + + def schedule(self) -> None: + self.report() + + def worker(self) -> None: + while True: + if self.event.is_set(): + break + schedule.run_pending() + time.sleep(1) + + def cleanup(self) -> None: + if not self.active: + return + self.report() + self.event.set() + self.worker_thread.join() diff --git a/karapace/rapu.py b/karapace/rapu.py index c5408ea7d..4a66e9d02 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -9,6 +9,7 @@ from accept_types import get_best_match from http import HTTPStatus from karapace.config import Config, create_server_ssl_context +from karapace.karapacemetrics import KarapaceMetrics from karapace.statsd import StatsClient from karapace.utils import json_decode, json_encode from karapace.version import __version__ @@ -134,6 +135,8 @@ def __init__( if content_type: self.headers["Content-Type"] = content_type super().__init__(f"HTTPResponse {status.value}") + if not is_success(status): + KarapaceMetrics().error() def ok(self) -> bool: """True if resposne has a 2xx status_code""" @@ -169,6 +172,7 @@ def __init__( self.stats = StatsClient(config=config) self.app.on_cleanup.append(self.close_by_app) self.not_ready_handler = not_ready_handler + KarapaceMetrics().setup(self.stats, config) async def close_by_app(self, app: aiohttp.web.Application) -> None: # pylint: disable=unused-argument await self.close() @@ -180,6 +184,7 @@ async def close(self) -> None: set as hook because the awaitables have to run inside the event loop created by the aiohttp library. """ + KarapaceMetrics().cleanup() self.stats.close() @staticmethod @@ -266,15 +271,23 @@ async def _handle_request( url=request.url, path_for_stats=path_for_stats, ) + + KarapaceMetrics().connection() try: if request.method == "OPTIONS": + # self.metrics.request(0) origin = request.headers.get("Origin") if not origin: raise HTTPResponse(body="OPTIONS missing Origin", status=HTTPStatus.BAD_REQUEST) headers = self.cors_and_server_headers_for_request(request=rapu_request, origin=origin) + raise HTTPResponse(body=b"", status=HTTPStatus.OK, headers=headers) body = await request.read() + if body: + KarapaceMetrics().request(len(body)) + else: + KarapaceMetrics().request(0) if json_request: if not body: raise HTTPResponse(body="Missing request JSON body", status=HTTPStatus.BAD_REQUEST) @@ -382,6 +395,7 @@ async def _handle_request( ) headers = {"Content-Type": "application/json"} resp = aiohttp.web.Response(body=body, status=status.value, headers=headers) + except asyncio.CancelledError: self.log.debug("Client closed connection") raise @@ -390,6 +404,8 @@ async def _handle_request( self.log.exception("Unexpected error handling user request: %s %s", request.method, request.url) resp = aiohttp.web.Response(text="Internal Server Error", status=HTTPStatus.INTERNAL_SERVER_ERROR.value) finally: + KarapaceMetrics().response(resp.content_length) + KarapaceMetrics().latency((time.monotonic() - start_time) * 1000) self.stats.timing( self.app_request_metric, time.monotonic() - start_time, diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index a745b7600..475faf9e0 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -22,6 +22,7 @@ VersionNotFoundException, ) from karapace.in_memory_database import InMemoryDatabase +from karapace.karapacemetrics import KarapaceMetrics from karapace.key_format import KeyFormatter from karapace.master_coordinator import MasterCoordinator from karapace.messaging import KarapaceProducer @@ -123,6 +124,7 @@ async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str | elif not ignore_readiness and self.schema_reader.ready is False: LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready) else: + KarapaceMetrics().are_we_master(are_we_master) return are_we_master, master_url await asyncio.sleep(1.0) diff --git a/karapace/statsd.py b/karapace/statsd.py index be37d92ef..1b4ab9bcd 100644 --- a/karapace/statsd.py +++ b/karapace/statsd.py @@ -19,6 +19,7 @@ import logging import socket import time +import urllib STATSD_HOST: Final = "127.0.0.1" STATSD_PORT: Final = 8125 @@ -32,7 +33,19 @@ def __init__( host: str = STATSD_HOST, port: int = STATSD_PORT, ) -> None: - self._dest_addr: Final = (host, port) + _host = host + _port = port + + if config.get("metrics_mode") == "statsd": + statsd_uri = config.get("statsd_uri") + if statsd_uri: + srv = urllib.parse.urlsplit("//" + str(statsd_uri)) + if srv.hostname: + _host = str(srv.hostname) + if srv.port: + _port = int(srv.port) + + self._dest_addr: Final = (_host, _port) self._socket: Final = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._tags: Final = config.get("tags", {}) self.sentry_client: Final = get_sentry_client(sentry_config=config.get("sentry", None)) diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index c0bdf48a9..54c85f3e8 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -8,7 +8,7 @@ accept-types==0.4.1 # via -r requirements.txt aiohttp==3.8.4 # via -r requirements.txt -aiokafka==0.8.0 +aiokafka==0.8.1 # via -r requirements.txt aiosignal==1.3.1 # via @@ -87,7 +87,7 @@ geventhttpclient==2.0.9 # via locust greenlet==2.0.2 # via gevent -hypothesis==6.75.7 +hypothesis==6.76.0 # via -r requirements-dev.in idna==3.4 # via @@ -113,7 +113,7 @@ kafka-python @ https://github.com/aiven/kafka-python/archive/1b95333c9628152066f # aiokafka locust==2.15.1 # via -r requirements-dev.in -markupsafe==2.1.2 +markupsafe==2.1.3 # via # jinja2 # werkzeug @@ -174,7 +174,9 @@ rich==12.5.1 # via -r requirements.txt roundrobin==0.0.4 # via locust -sentry-sdk==1.24.0 +schedule==1.2.0 + # via -r requirements.txt +sentry-sdk==1.25.1 # via -r requirements-dev.in six==1.16.0 # via @@ -194,20 +196,20 @@ tenacity==8.2.2 # via -r requirements.txt tomli==2.0.1 # via pytest -typing-extensions==4.6.2 +typing-extensions==4.6.3 # via # -r requirements.txt # locust # rich ujson==5.7.0 # via -r requirements.txt -urllib3==1.26.16 +urllib3==2.0.3 # via # requests # sentry-sdk watchfiles==0.19.0 # via -r requirements.txt -werkzeug==2.3.4 +werkzeug==2.3.5 # via # flask # locust diff --git a/requirements/requirements-typing.txt b/requirements/requirements-typing.txt index ea4657ecb..32bc10e69 100644 --- a/requirements/requirements-typing.txt +++ b/requirements/requirements-typing.txt @@ -12,7 +12,7 @@ mypy==1.3.0 # via -r requirements-typing.in mypy-extensions==1.0.0 # via mypy -sentry-sdk==1.24.0 +sentry-sdk==1.25.1 # via -r requirements-typing.in tomli==2.0.1 # via @@ -20,11 +20,11 @@ tomli==2.0.1 # mypy types-jsonschema==4.17.0.8 # via -r requirements-typing.in -typing-extensions==4.6.2 +typing-extensions==4.6.3 # via # -c requirements-dev.txt # mypy -urllib3==1.26.16 +urllib3==2.0.3 # via # -c requirements-dev.txt # sentry-sdk diff --git a/requirements/requirements.in b/requirements/requirements.in index 59ae8b39a..13de9a0a0 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -11,9 +11,11 @@ tenacity<9 typing-extensions ujson<6 watchfiles<1 +schedule xxhash~=3.0 rich~=12.5.0 + # Patched dependencies # # Note: It is important to use commits to reference patched dependencies. This diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 9bc49e18d..1537c3051 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -8,7 +8,7 @@ accept-types==0.4.1 # via -r requirements.in aiohttp==3.8.4 # via -r requirements.in -aiokafka==0.8.0 +aiokafka==0.8.1 # via -r requirements.in aiosignal==1.3.1 # via aiohttp @@ -64,6 +64,8 @@ python-dateutil==2.8.2 # via -r requirements.in rich==12.5.1 # via -r requirements.in +schedule==1.2.0 + # via -r requirements.in six==1.16.0 # via # isodate @@ -73,7 +75,7 @@ sniffio==1.3.0 # via anyio tenacity==8.2.2 # via -r requirements.in -typing-extensions==4.6.2 +typing-extensions==4.6.3 # via # -r requirements.in # rich From 8dab84dd341a0766c54f863a5ff1e3d491ced56d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Sat, 10 Jun 2023 11:14:36 +0300 Subject: [PATCH 02/25] fixup issues --- karapace/karapacemetrics.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index 2f8acb850..8eca080ab 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -39,7 +39,7 @@ def record(self, config: object, value: float, time_ms: int) -> None: class Singleton(type): - _instances: Dict["Singleton", "Singleton"] = {} + _instances: Dict["type[Singleton]", "Singleton"] = {} def __call__(cls, *args: str, **kwargs: int) -> "Singleton": if cls not in cls._instances: @@ -54,7 +54,7 @@ def __init__(self) -> None: self.stats_client: Optional[StatsClient] = None self.is_ready = False self.metrics = Metrics() - self.event = threading.Event() + self.stop_event = threading.Event() self.worker_thread = threading.Thread(target=self.worker) self.lock = threading.Lock() @@ -99,7 +99,7 @@ def setup(self, stats_client: StatsClient, config: Config) -> None: self.stats_client = stats_client - schedule.every(10).seconds.do(self.schedule) + schedule.every(10).seconds.do(self.report) self.worker_thread.start() @@ -150,13 +150,12 @@ def report(self) -> None: for metric_name in self.metrics.metrics: value = self.metrics.metrics[metric_name].value() self.stats_client.gauge(metric_name.name, value) - - def schedule(self) -> None: - self.report() + else: + raise RuntimeError def worker(self) -> None: while True: - if self.event.is_set(): + if self.stop_event.is_set(): break schedule.run_pending() time.sleep(1) @@ -165,5 +164,5 @@ def cleanup(self) -> None: if not self.active: return self.report() - self.event.set() + self.stop_event.set() self.worker_thread.join() From 2898e318ca9bb29ebdec03a9e5ffb2b9cdea5b6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Sat, 10 Jun 2023 11:23:45 +0300 Subject: [PATCH 03/25] fixup issues --- karapace/karapacemetrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index 8eca080ab..86ec9b893 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -39,7 +39,7 @@ def record(self, config: object, value: float, time_ms: int) -> None: class Singleton(type): - _instances: Dict["type[Singleton]", "Singleton"] = {} + _instances: Dict["Singleton", "Singleton"] = {} def __call__(cls, *args: str, **kwargs: int) -> "Singleton": if cls not in cls._instances: From c974579cb3f40f78fee02012c3d443e0cd62584f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Mon, 12 Jun 2023 12:59:59 +0300 Subject: [PATCH 04/25] fixup annotations issue --- karapace/karapacemetrics.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index 86ec9b893..1c4042d35 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -8,13 +8,14 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ +from __future__ import annotations + from datetime import datetime from kafka.metrics import MetricName, Metrics from kafka.metrics.measurable_stat import AbstractMeasurableStat from kafka.metrics.stats import Avg, Max, Rate, Total from karapace.config import Config from karapace.statsd import StatsClient -from typing import Dict, Optional import schedule import threading @@ -39,9 +40,9 @@ def record(self, config: object, value: float, time_ms: int) -> None: class Singleton(type): - _instances: Dict["Singleton", "Singleton"] = {} + _instances: dict[type[object], Singleton] = {} - def __call__(cls, *args: str, **kwargs: int) -> "Singleton": + def __call__(cls, *args: str, **kwargs: int) -> Singleton: if cls not in cls._instances: instance = super().__call__(*args, **kwargs) cls._instances[cls] = instance @@ -50,8 +51,8 @@ def __call__(cls, *args: str, **kwargs: int) -> "Singleton": class KarapaceMetrics(metaclass=Singleton): def __init__(self) -> None: - self.active: Optional[object] = None - self.stats_client: Optional[StatsClient] = None + self.active: object | None = None + self.stats_client: StatsClient | None = None self.is_ready = False self.metrics = Metrics() self.stop_event = threading.Event() @@ -144,15 +145,13 @@ def error(self) -> None: self.metrics.get_sensor("request-error-rate").record(1, timestamp) def report(self) -> None: - if not self.active: - return - if isinstance(self.stats_client, StatsClient): - for metric_name in self.metrics.metrics: - value = self.metrics.metrics[metric_name].value() - self.stats_client.gauge(metric_name.name, value) - else: + if not self.active or not isinstance(self.stats_client, StatsClient): raise RuntimeError + for metric_name in self.metrics.metrics: + value = self.metrics.metrics[metric_name].value() + self.stats_client.gauge(metric_name.name, value) + def worker(self) -> None: while True: if self.stop_event.is_set(): From 7256f5d5d30404fcb71b83510c290b0ad3e475a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Mon, 12 Jun 2023 13:05:28 +0300 Subject: [PATCH 05/25] fixup exception message --- karapace/karapacemetrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index 1c4042d35..73c4ea402 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -146,7 +146,7 @@ def error(self) -> None: def report(self) -> None: if not self.active or not isinstance(self.stats_client, StatsClient): - raise RuntimeError + raise RuntimeError("no StatsClient available") for metric_name in self.metrics.metrics: value = self.metrics.metrics[metric_name].value() From ab6ae9682c4c9ad37e723937a84ca40aff732413 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Mon, 12 Jun 2023 14:47:59 +0300 Subject: [PATCH 06/25] get rid of multiple instances of class --- karapace/karapacemetrics.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index 73c4ea402..09a8779ef 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -40,13 +40,13 @@ def record(self, config: object, value: float, time_ms: int) -> None: class Singleton(type): - _instances: dict[type[object], Singleton] = {} + _instance: Singleton def __call__(cls, *args: str, **kwargs: int) -> Singleton: - if cls not in cls._instances: + if cls != cls._instance: instance = super().__call__(*args, **kwargs) - cls._instances[cls] = instance - return cls._instances[cls] + cls._instance = instance + return cls._instance class KarapaceMetrics(metaclass=Singleton): From 733d1f2583ec59a10bb93b68b53c62dcfa6acab9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Mon, 12 Jun 2023 15:23:13 +0300 Subject: [PATCH 07/25] fixup issue --- karapace/karapacemetrics.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index 09a8779ef..3284d7b50 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -40,10 +40,10 @@ def record(self, config: object, value: float, time_ms: int) -> None: class Singleton(type): - _instance: Singleton + _instance: Singleton | None = None def __call__(cls, *args: str, **kwargs: int) -> Singleton: - if cls != cls._instance: + if cls._instance is None: instance = super().__call__(*args, **kwargs) cls._instance = instance return cls._instance From 8751eea67625a0ee4b5415844c6ca4a504247f44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Fri, 16 Jun 2023 12:25:29 +0300 Subject: [PATCH 08/25] change code to send raw data only --- karapace/karapacemetrics.py | 123 +++++++++++++----------------------- 1 file changed, 43 insertions(+), 80 deletions(-) diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index 3284d7b50..74c72bb05 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -11,34 +11,17 @@ from __future__ import annotations from datetime import datetime -from kafka.metrics import MetricName, Metrics -from kafka.metrics.measurable_stat import AbstractMeasurableStat -from kafka.metrics.stats import Avg, Max, Rate, Total +from kafka.metrics import Metrics from karapace.config import Config from karapace.statsd import StatsClient +import os +import psutil import schedule import threading import time -class Value(AbstractMeasurableStat): - """ - An AbstractSampledStat that maintains a simple average over its samples. - """ - - def __init__(self) -> None: - super().__init__() - self.value = 0.0 - - # pylint: disable=unused-argument - def measure(self, config: object, now: int) -> float: - return self.value - - def record(self, config: object, value: float, time_ms: int) -> None: - self.value = value - - class Singleton(type): _instance: Singleton | None = None @@ -58,6 +41,9 @@ def __init__(self) -> None: self.stop_event = threading.Event() self.worker_thread = threading.Thread(target=self.worker) self.lock = threading.Lock() + self.error_count = 0 + self.app_host = "" + self.app_port = 8081 def setup(self, stats_client: StatsClient, config: Config) -> None: self.active = config.get("metrics_extended") @@ -67,90 +53,68 @@ def setup(self, stats_client: StatsClient, config: Config) -> None: if self.is_ready: return self.is_ready = True + if self.stats_client: + self.stats_client = stats_client + else: + self.active = False + return + app_host = config.get("host") + app_port = config.get("port") + if app_host and app_port: + self.app_host = app_host + self.app_port = app_port + else: + raise RuntimeError("No application host or port defined in application") - sensor = self.metrics.sensor("connections-active") - sensor.add(MetricName("connections-active", "kafka-metrics"), Total()) - - sensor = self.metrics.sensor("request-size") - sensor.add(MetricName("request-size-max", "kafka-metrics"), Max()) - sensor.add(MetricName("request-size-avg", "kafka-metrics"), Avg()) - - sensor = self.metrics.sensor("response-size") - sensor.add(MetricName("response-size-max", "kafka-metrics"), Max()) - sensor.add(MetricName("response-size-avg", "kafka-metrics"), Avg()) - - sensor = self.metrics.sensor("master-slave-role") - sensor.add(MetricName("master-slave-role", "kafka-metrics"), Value()) - - sensor = self.metrics.sensor("request-error-rate") - sensor.add(MetricName("request-error-rate", "kafka-metrics"), Rate()) - - sensor = self.metrics.sensor("request-rate") - sensor.add(MetricName("request-rate", "kafka-metrics"), Rate()) - - sensor = self.metrics.sensor("response-rate") - sensor.add(MetricName("response-rate", "kafka-metrics"), Rate()) - - sensor = self.metrics.sensor("response-byte-rate") - sensor.add(MetricName("response-byte-rate", "kafka-metrics"), Rate()) - - sensor = self.metrics.sensor("latency") - sensor.add(MetricName("latency-max", "kafka-metrics"), Max()) - sensor.add(MetricName("latency-avg", "kafka-metrics"), Avg()) - - self.stats_client = stats_client - - schedule.every(10).seconds.do(self.report) + schedule.every(10).seconds.do(self.connections) self.worker_thread.start() - def connection(self) -> None: - if not self.active: - return - timestamp = int(datetime.utcnow().timestamp() * 1e3) - self.metrics.get_sensor("connections-active").record(1.0, timestamp) - def request(self, size: int) -> None: if not self.active: return - timestamp = int(datetime.utcnow().timestamp() * 1e3) - self.metrics.get_sensor("request-size").record(size, timestamp) - self.metrics.get_sensor("request-rate").record(1, timestamp) + if not isinstance(self.stats_client, StatsClient): + raise RuntimeError("no StatsClient available") + self.stats_client.gauge("request-size", size) def response(self, size: int) -> None: if not self.active: return - timestamp = int(datetime.utcnow().timestamp() * 1e3) - self.metrics.get_sensor("connections-active").record(-1.0, timestamp) - self.metrics.get_sensor("response-size").record(size, timestamp) - self.metrics.get_sensor("response-byte-rate").record(size, timestamp) - self.metrics.get_sensor("response-rate").record(1, timestamp) + if not isinstance(self.stats_client, StatsClient): + raise RuntimeError("no StatsClient available") + self.stats_client.gauge("response-size", size) def are_we_master(self, is_master: bool) -> None: if not self.active: return - timestamp = int(datetime.utcnow().timestamp() * 1e3) - self.metrics.get_sensor("master-slave-role").record(int(is_master), timestamp) + self.stats_client.gauge("master-slave-role", int(is_master)) def latency(self, latency_ms: float) -> None: if not self.active: return - timestamp = int(datetime.utcnow().timestamp() * 1e3) - self.metrics.get_sensor("latency").record(latency_ms, timestamp) + if not isinstance(self.stats_client, StatsClient): + raise RuntimeError("no StatsClient available") + self.stats_client.gauge("master-slave-role", latency_ms) def error(self) -> None: if not self.active: return - timestamp = int(datetime.utcnow().timestamp() * 1e3) - self.metrics.get_sensor("request-error-rate").record(1, timestamp) - - def report(self) -> None: - if not self.active or not isinstance(self.stats_client, StatsClient): + if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") + self.error_count += 1 + self.stats_client.gauge("error", self.error_count) - for metric_name in self.metrics.metrics: - value = self.metrics.metrics[metric_name].value() - self.stats_client.gauge(metric_name.name, value) + def connections(self) -> None: + if not self.active: + return + if not isinstance(self.stats_client, StatsClient): + raise RuntimeError("no StatsClient available") + psutil.Process(os.getpid()).connections() + connections = 0 + for conn in psutil.net_connections(kind="tcp"): + if conn.laddr[0] == self.app_host and conn.laddr[1] == self.app_port and conn.status == "ESTABLISHED": + connections += 1 + self.stats_client.gauge("connections-active", connections) def worker(self) -> None: while True: @@ -162,6 +126,5 @@ def worker(self) -> None: def cleanup(self) -> None: if not self.active: return - self.report() self.stop_event.set() self.worker_thread.join() From fedff8f3d9b2b0f2845df28ef25b5b9567aa0d80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Thu, 22 Jun 2023 10:22:52 +0300 Subject: [PATCH 09/25] fixup --- karapace/karapacemetrics.py | 1 - 1 file changed, 1 deletion(-) diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index 74c72bb05..c769355ba 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -10,7 +10,6 @@ """ from __future__ import annotations -from datetime import datetime from kafka.metrics import Metrics from karapace.config import Config from karapace.statsd import StatsClient From b70ae03a5efde731294e6eed746b61eac35eb548 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Thu, 22 Jun 2023 11:21:06 +0300 Subject: [PATCH 10/25] fixup code --- karapace/karapacemetrics.py | 4 ++++ requirements/requirements-dev.txt | 7 ++++--- requirements/requirements-typing.txt | 2 +- requirements/requirements.in | 1 + requirements/requirements.txt | 2 ++ 5 files changed, 12 insertions(+), 4 deletions(-) diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index c769355ba..19ab05c09 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -86,6 +86,8 @@ def response(self, size: int) -> None: def are_we_master(self, is_master: bool) -> None: if not self.active: return + if not isinstance(self.stats_client, StatsClient): + raise RuntimeError("no StatsClient available") self.stats_client.gauge("master-slave-role", int(is_master)) def latency(self, latency_ms: float) -> None: @@ -111,6 +113,8 @@ def connections(self) -> None: psutil.Process(os.getpid()).connections() connections = 0 for conn in psutil.net_connections(kind="tcp"): + if not conn.laddr: + continue if conn.laddr[0] == self.app_host and conn.laddr[1] == self.app_port and conn.status == "ESTABLISHED": connections += 1 self.stats_client.gauge("connections-active", connections) diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index 7919157c2..eb8948203 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -87,7 +87,7 @@ geventhttpclient==2.0.9 # via locust greenlet==2.0.2 # via gevent -hypothesis==6.78.3 +hypothesis==6.79.1 # via -r requirements-dev.in idna==3.4 # via @@ -95,7 +95,7 @@ idna==3.4 # anyio # requests # yarl -importlib-metadata==6.6.0 +importlib-metadata==6.7.0 # via flask importlib-resources==5.12.0 # via @@ -141,13 +141,14 @@ pkgutil-resolve-name==1.3.10 # via # -r requirements.txt # jsonschema -pluggy==1.0.0 +pluggy==1.2.0 # via pytest protobuf==3.20.3 # via -r requirements.txt psutil==5.9.5 # via # -r requirements-dev.in + # -r requirements.txt # locust # pytest-xdist pygments==2.15.1 diff --git a/requirements/requirements-typing.txt b/requirements/requirements-typing.txt index 32bc10e69..3659aec84 100644 --- a/requirements/requirements-typing.txt +++ b/requirements/requirements-typing.txt @@ -8,7 +8,7 @@ certifi==2023.5.7 # via # -c requirements-dev.txt # sentry-sdk -mypy==1.3.0 +mypy==1.4.0 # via -r requirements-typing.in mypy-extensions==1.0.0 # via mypy diff --git a/requirements/requirements.in b/requirements/requirements.in index 806e20b6f..fbee224b5 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -14,6 +14,7 @@ watchfiles<1 schedule xxhash~=3.0 rich~=12.5.0 +psutil # Patched dependencies diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 671753e9a..ffbeb28a0 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -60,6 +60,8 @@ pkgutil-resolve-name==1.3.10 # via jsonschema protobuf==3.20.3 # via -r requirements.in +psutil==5.9.5 + # via -r requirements.in pygments==2.15.1 # via rich pyrsistent==0.19.3 From a0387a35759b750b11c2476061c553f0bd55bf39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Thu, 22 Jun 2023 23:28:19 +0300 Subject: [PATCH 11/25] fixup --- karapace/karapacemetrics.py | 9 +++++---- karapace/rapu.py | 1 - 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index 19ab05c09..47af8971c 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -33,7 +33,7 @@ def __call__(cls, *args: str, **kwargs: int) -> Singleton: class KarapaceMetrics(metaclass=Singleton): def __init__(self) -> None: - self.active: object | None = None + self.active = False self.stats_client: StatsClient | None = None self.is_ready = False self.metrics = Metrics() @@ -52,7 +52,7 @@ def setup(self, stats_client: StatsClient, config: Config) -> None: if self.is_ready: return self.is_ready = True - if self.stats_client: + if not self.stats_client: self.stats_client = stats_client else: self.active = False @@ -66,7 +66,6 @@ def setup(self, stats_client: StatsClient, config: Config) -> None: raise RuntimeError("No application host or port defined in application") schedule.every(10).seconds.do(self.connections) - self.worker_thread.start() def request(self, size: int) -> None: @@ -130,4 +129,6 @@ def cleanup(self) -> None: if not self.active: return self.stop_event.set() - self.worker_thread.join() + if self.worker_thread.is_alive(): + self.worker_thread.join() + diff --git a/karapace/rapu.py b/karapace/rapu.py index fb9d1fe31..677918b97 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -275,7 +275,6 @@ async def _handle_request( path_for_stats=path_for_stats, ) - KarapaceMetrics().connection() try: if request.method == "OPTIONS": # self.metrics.request(0) From 358facc614ef67fa017676a9b515c281d7d03f81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Thu, 22 Jun 2023 23:48:53 +0300 Subject: [PATCH 12/25] fixup --- karapace/karapacemetrics.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index 47af8971c..d73a136e2 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -33,7 +33,7 @@ def __call__(cls, *args: str, **kwargs: int) -> Singleton: class KarapaceMetrics(metaclass=Singleton): def __init__(self) -> None: - self.active = False + self.active: object | None = None self.stats_client: StatsClient | None = None self.is_ready = False self.metrics = Metrics() @@ -131,4 +131,3 @@ def cleanup(self) -> None: self.stop_event.set() if self.worker_thread.is_alive(): self.worker_thread.join() - From 8533959346beed87644af522b53caf66a1fd7290 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Tue, 8 Aug 2023 16:27:16 +0300 Subject: [PATCH 13/25] improve code by request --- README.rst | 15 +++++++-------- karapace.config.json | 6 +++--- karapace/config.py | 7 +++++-- karapace/karapacemetrics.py | 30 ++++++++---------------------- karapace/rapu.py | 3 +-- karapace/statsd.py | 15 ++------------- 6 files changed, 26 insertions(+), 50 deletions(-) diff --git a/README.rst b/README.rst index ecfcd28a9..56c0ae89a 100644 --- a/README.rst +++ b/README.rst @@ -461,16 +461,15 @@ Keys to take special care are the ones needed to configure Kafka and advertised_ * - ``master_election_strategy`` - ``lowest`` - Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup) - * - ``metrics_mode`` - - ``statsd`` - - Statistics server mode. For karapace supports ststsd server - * - ``statsd_uri`` - - ``127.0.0.1:8125`` - - Host:Port of statsd server * - ``metrics_extended`` - ``true`` - - Enable extended metrics. Extended metrics: connections_active, request_size_avg, request_size_max, response_size_avg, response_size_max - + - Enable extended metrics. Extended metrics: connections_active, [request|response]_size + * - ``statsd_host`` + - ``127.0.0.1`` + - Host of statsd server + * - ``statsd_port`` + - ``8125`` + - Port of statsd server Authentication and authorization of Karapace Schema Registry REST API ===================================================================== diff --git a/karapace.config.json b/karapace.config.json index bded35aa6..3a4fe0a43 100644 --- a/karapace.config.json +++ b/karapace.config.json @@ -28,7 +28,7 @@ "topic_name": "_schemas", "protobuf_runtime_directory": "runtime", "session_timeout_ms": 10000, - "metrics_mode": "statsd", - "statsd_uri": "127.0.0.1:8125", - "metrics_extended": true + "metrics_extended": true, + "statsd_host": "127.0.0.1", + "statsd_port": 8125 } diff --git a/karapace/config.py b/karapace/config.py index e56217f1e..4725c3a76 100644 --- a/karapace/config.py +++ b/karapace/config.py @@ -75,6 +75,9 @@ class Config(TypedDict): karapace_registry: bool master_election_strategy: str protobuf_runtime_directory: str + metrics_extended: bool + statsd_host: str + statsd_port: int sentry: NotRequired[Mapping[str, object]] tags: NotRequired[Mapping[str, object]] @@ -143,9 +146,9 @@ class ConfigDefaults(Config, total=False): "karapace_registry": False, "master_election_strategy": "lowest", "protobuf_runtime_directory": "runtime", - "metrics_mode": "statsd", - "statsd_uri": "127.0.0.1:8125", "metrics_extended": True, + "statsd_host": "127.0.0.1", + "statsd_port": 8125, } SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD] diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index d73a136e2..8f90436f9 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -10,7 +10,6 @@ """ from __future__ import annotations -from kafka.metrics import Metrics from karapace.config import Config from karapace.statsd import StatsClient @@ -33,19 +32,15 @@ def __call__(cls, *args: str, **kwargs: int) -> Singleton: class KarapaceMetrics(metaclass=Singleton): def __init__(self) -> None: - self.active: object | None = None + self.active = False self.stats_client: StatsClient | None = None self.is_ready = False - self.metrics = Metrics() self.stop_event = threading.Event() self.worker_thread = threading.Thread(target=self.worker) self.lock = threading.Lock() - self.error_count = 0 - self.app_host = "" - self.app_port = 8081 def setup(self, stats_client: StatsClient, config: Config) -> None: - self.active = config.get("metrics_extended") + self.active = config.get("metrics_extended") or False if not self.active: return with self.lock: @@ -57,13 +52,6 @@ def setup(self, stats_client: StatsClient, config: Config) -> None: else: self.active = False return - app_host = config.get("host") - app_port = config.get("port") - if app_host and app_port: - self.app_host = app_host - self.app_port = app_port - else: - raise RuntimeError("No application host or port defined in application") schedule.every(10).seconds.do(self.connections) self.worker_thread.start() @@ -94,27 +82,25 @@ def latency(self, latency_ms: float) -> None: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") - self.stats_client.gauge("master-slave-role", latency_ms) + self.stats_client.timing("latency_ms", latency_ms) def error(self) -> None: if not self.active: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") - self.error_count += 1 - self.stats_client.gauge("error", self.error_count) + self.stats_client.increase("error_total", 1) def connections(self) -> None: if not self.active: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") - psutil.Process(os.getpid()).connections() connections = 0 - for conn in psutil.net_connections(kind="tcp"): - if not conn.laddr: - continue - if conn.laddr[0] == self.app_host and conn.laddr[1] == self.app_port and conn.status == "ESTABLISHED": + karapace_proc = psutil.Process(os.getpid()) + + for conn in karapace_proc.connections(kind="tcp"): + if conn.laddr and conn.status == "ESTABLISHED": connections += 1 self.stats_client.gauge("connections-active", connections) diff --git a/karapace/rapu.py b/karapace/rapu.py index 677918b97..ecf53963d 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -19,7 +19,7 @@ import aiohttp.web import aiohttp.web_exceptions import asyncio -import cgi # pylint: disable=deprecated-module +import cgi import hashlib import logging import re @@ -277,7 +277,6 @@ async def _handle_request( try: if request.method == "OPTIONS": - # self.metrics.request(0) origin = request.headers.get("Origin") if not origin: raise HTTPResponse(body="OPTIONS missing Origin", status=HTTPStatus.BAD_REQUEST) diff --git a/karapace/statsd.py b/karapace/statsd.py index 1b4ab9bcd..8115f76f9 100644 --- a/karapace/statsd.py +++ b/karapace/statsd.py @@ -19,7 +19,6 @@ import logging import socket import time -import urllib STATSD_HOST: Final = "127.0.0.1" STATSD_PORT: Final = 8125 @@ -33,18 +32,8 @@ def __init__( host: str = STATSD_HOST, port: int = STATSD_PORT, ) -> None: - _host = host - _port = port - - if config.get("metrics_mode") == "statsd": - statsd_uri = config.get("statsd_uri") - if statsd_uri: - srv = urllib.parse.urlsplit("//" + str(statsd_uri)) - if srv.hostname: - _host = str(srv.hostname) - if srv.port: - _port = int(srv.port) - + _host = config.get("statsd_host") if "statsd_host" in config else host + _port = config.get("statsd_port") if "statsd_port" in config else port self._dest_addr: Final = (_host, _port) self._socket: Final = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._tags: Final = config.get("tags", {}) From 90e221cde5f49ec4edd51e6308afd09fbc9668f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Tue, 8 Aug 2023 16:43:22 +0300 Subject: [PATCH 14/25] add psutil typing support --- requirements/requirements-typing.in | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements/requirements-typing.in b/requirements/requirements-typing.in index c55f35548..1d11d3198 100644 --- a/requirements/requirements-typing.in +++ b/requirements/requirements-typing.in @@ -5,3 +5,4 @@ mypy types-jsonschema sentry-sdk types-cachetools +types-psutil From 4c485766b978ffce6f3f99d0ec01f1fa93721d92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Tue, 8 Aug 2023 20:33:27 +0300 Subject: [PATCH 15/25] fixup --- karapace/rapu.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/karapace/rapu.py b/karapace/rapu.py index ecf53963d..d5c827b3a 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -19,7 +19,7 @@ import aiohttp.web import aiohttp.web_exceptions import asyncio -import cgi +import cgi # pylint: disable=deprecated-module import hashlib import logging import re From f9cb6d8d47f02179678c6e5638293fdfc079f86c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Tue, 8 Aug 2023 20:39:26 +0300 Subject: [PATCH 16/25] fixup --- karapace/rapu.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/karapace/rapu.py b/karapace/rapu.py index d5c827b3a..c506445a4 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -19,7 +19,7 @@ import aiohttp.web import aiohttp.web_exceptions import asyncio -import cgi # pylint: disable=deprecated-module +import cgi # pylint: disable=deprecated-module import hashlib import logging import re From 9cdcba75ebd12ea20d3a549dd80a8855dd6bd04a Mon Sep 17 00:00:00 2001 From: libretto Date: Sat, 2 Sep 2023 18:56:27 +0300 Subject: [PATCH 17/25] add prometheus support --- karapace.config.json | 7 +++- karapace/base_stats.py | 58 ++++++++++++++++++++++++++ karapace/config.py | 11 ++++- karapace/karapacemetrics.py | 22 ++++++++-- karapace/prometheus.py | 69 +++++++++++++++++++++++++++++++ karapace/rapu.py | 8 ++-- karapace/schema_reader.py | 4 +- karapace/statsd.py | 28 ++----------- requirements/requirements-dev.txt | 2 + requirements/requirements.in | 1 + requirements/requirements.txt | 2 + 11 files changed, 175 insertions(+), 37 deletions(-) create mode 100644 karapace/base_stats.py create mode 100644 karapace/prometheus.py diff --git a/karapace.config.json b/karapace.config.json index 3a4fe0a43..798ab2ffe 100644 --- a/karapace.config.json +++ b/karapace.config.json @@ -28,7 +28,12 @@ "topic_name": "_schemas", "protobuf_runtime_directory": "runtime", "session_timeout_ms": 10000, + "stats_service": "statsd", "metrics_extended": true, "statsd_host": "127.0.0.1", - "statsd_port": 8125 + "statsd_port": 8125, + "prometheus_host": "127.0.0.1", + "prometheus_port": 8005 + + } diff --git a/karapace/base_stats.py b/karapace/base_stats.py new file mode 100644 index 000000000..0c467abaf --- /dev/null +++ b/karapace/base_stats.py @@ -0,0 +1,58 @@ +""" +karapace - basestats + +Supports base class for statsd and prometheus protocols: + +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +from __future__ import annotations + +from abc import ABC, abstractmethod +from contextlib import contextmanager +from karapace.config import Config +from karapace.sentry import get_sentry_client +from typing import Final, Iterator + +import time + + +class StatsClient(ABC): + @abstractmethod + def __init__( + self, + config: Config, + ) -> None: + self._tags: Final = config.get("tags", {}) + self.sentry_client: Final = get_sentry_client(sentry_config=config.get("sentry", None)) + + @contextmanager + def timing_manager(self, metric: str, tags: dict | None = None) -> Iterator[None]: + start_time = time.monotonic() + yield + self.timing(metric, time.monotonic() - start_time, tags) + + @abstractmethod + def gauge(self, metric: str, value: float, tags: dict | None = None) -> None: + pass + + @abstractmethod + def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None: + pass + + @abstractmethod + def timing(self, metric: str, value: float, tags: dict | None = None) -> None: + pass + + def unexpected_exception(self, ex: Exception, where: str, tags: dict | None = None) -> None: + all_tags = { + "exception": ex.__class__.__name__, + "where": where, + } + all_tags.update(tags or {}) + self.increase("exception", tags=all_tags) + scope_args = {**(tags or {}), "where": where} + self.sentry_client.unexpected_exception(error=ex, where=where, tags=scope_args) + + def close(self) -> None: + self.sentry_client.close() diff --git a/karapace/config.py b/karapace/config.py index 4c93a2b0c..1fc7f1a59 100644 --- a/karapace/config.py +++ b/karapace/config.py @@ -75,12 +75,16 @@ class Config(TypedDict): karapace_registry: bool master_election_strategy: str protobuf_runtime_directory: str + stats_service: str metrics_extended: bool statsd_host: str statsd_port: int + prometheus_host: str | None + prometheus_port: int | None - sentry: NotRequired[Mapping[str, object]] - tags: NotRequired[Mapping[str, object]] + +sentry: NotRequired[Mapping[str, object]] +tags: NotRequired[Mapping[str, object]] class ConfigDefaults(Config, total=False): @@ -147,9 +151,12 @@ class ConfigDefaults(Config, total=False): "karapace_registry": False, "master_election_strategy": "lowest", "protobuf_runtime_directory": "runtime", + "stats_service": "statsd", "metrics_extended": True, "statsd_host": "127.0.0.1", "statsd_port": 8125, + "prometheus_host": "127.0.0.1", + "prometheus_port": 8005, } SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD] diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index 8f90436f9..5e3acaa89 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -10,8 +10,10 @@ """ from __future__ import annotations +from karapace.base_stats import StatsClient from karapace.config import Config -from karapace.statsd import StatsClient +from karapace.prometheus import PrometheusClient +from karapace.statsd import StatsdClient import os import psutil @@ -20,6 +22,10 @@ import time +class MetricsException(Exception): + pass + + class Singleton(type): _instance: Singleton | None = None @@ -33,14 +39,22 @@ def __call__(cls, *args: str, **kwargs: int) -> Singleton: class KarapaceMetrics(metaclass=Singleton): def __init__(self) -> None: self.active = False - self.stats_client: StatsClient | None = None + self.stats_client: StatsClient | None self.is_ready = False self.stop_event = threading.Event() self.worker_thread = threading.Thread(target=self.worker) self.lock = threading.Lock() def setup(self, stats_client: StatsClient, config: Config) -> None: - self.active = config.get("metrics_extended") or False + stats_service = config.get("stats_service") + if stats_service == "statsd": + self.stats_client = StatsdClient(config=config) + elif stats_service == "prometheus": + self.stats_client = PrometheusClient(config=config) + else: + raise MetricsException('Config variable "stats_service" is not defined') + + self.active = config.get("metrics_extended") if not self.active: return with self.lock: @@ -112,6 +126,8 @@ def worker(self) -> None: time.sleep(1) def cleanup(self) -> None: + if self.stats_client: + self.stats_client.close() if not self.active: return self.stop_event.set() diff --git a/karapace/prometheus.py b/karapace/prometheus.py new file mode 100644 index 000000000..2ecd78c37 --- /dev/null +++ b/karapace/prometheus.py @@ -0,0 +1,69 @@ +""" +karapace - prometheus + +Supports telegraf's statsd protocol extension for 'key=value' tags: + + https://github.com/influxdata/telegraf/tree/master/plugins/inputs/statsd + +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +from __future__ import annotations + +from karapace.base_stats import StatsClient +from karapace.config import Config +from prometheus_client import Counter, Gauge, start_http_server, Summary +from typing import Final + +import logging + +LOG = logging.getLogger(__name__) +HOST: Final = "127.0.0.1" +PORT: Final = 8005 + + +class PrometheusException(Exception): + pass + + +class PrometheusClient(StatsClient): + server_is_active = False + + def __init__(self, config: Config, host: str = HOST, port: int = PORT) -> None: + super().__init__(config) + + _host = config.get("prometheus_host") if "prometheus_host" in config else host + _port = config.get("prometheus_port") if "prometheus_port" in config else port + if _host is None: + raise PrometheusException("prometheus_host host is undefined") + if _port is None: + raise PrometheusException("prometheus_host port is undefined") + if not self.server_is_active: + start_http_server(_port, _host) + self.server_is_active = True + else: + raise PrometheusException("Double instance of Prometheus interface") + self._gauge: dict[str, Gauge] = dict() + self._summary: dict[str, Summary] = dict() + self._counter: dict[str, Counter] = dict() + + def gauge(self, metric: str, value: float, tags: dict | None = None) -> None: + m = self._gauge.get(metric) + if m is None: + m = Gauge(metric, metric) + self._gauge[metric] = m + m.set(value) + + def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None: + m = self._counter.get(metric) + if m is None: + m = Counter(metric, metric) + self._counter[metric] = m + m.inc(inc_value) + + def timing(self, metric: str, value: float, tags: dict | None = None) -> None: + m = self._summary.get(metric) + if m is None: + m = Summary(metric, metric) + self._summary[metric] = m + m.observe(value) diff --git a/karapace/rapu.py b/karapace/rapu.py index c506445a4..8157f22cb 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -10,7 +10,6 @@ from http import HTTPStatus from karapace.config import Config, create_server_ssl_context from karapace.karapacemetrics import KarapaceMetrics -from karapace.statsd import StatsClient from karapace.utils import json_decode, json_encode from karapace.version import __version__ from typing import Callable, Dict, NoReturn, Optional, overload, Union @@ -19,7 +18,7 @@ import aiohttp.web import aiohttp.web_exceptions import asyncio -import cgi # pylint: disable=deprecated-module +import cgi import hashlib import logging import re @@ -169,10 +168,10 @@ def __init__( self.app_request_metric = f"{app_name}_request" self.app = self._create_aiohttp_application(config=config) self.log = logging.getLogger(self.app_name) - self.stats = StatsClient(config=config) + KarapaceMetrics().setup(config) + self.stats = KarapaceMetrics().stats_client self.app.on_cleanup.append(self.close_by_app) self.not_ready_handler = not_ready_handler - KarapaceMetrics().setup(self.stats, config) def _create_aiohttp_application(self, *, config: Config) -> aiohttp.web.Application: return aiohttp.web.Application(client_max_size=config["http_request_max_size"]) @@ -188,7 +187,6 @@ async def close(self) -> None: created by the aiohttp library. """ KarapaceMetrics().cleanup() - self.stats.close() @staticmethod def cors_and_server_headers_for_request(*, request, origin="*"): # pylint: disable=unused-argument diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 3dec4a887..0bc8175be 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -24,13 +24,13 @@ from karapace.dependency import Dependency from karapace.errors import InvalidReferences, InvalidSchema from karapace.in_memory_database import InMemoryDatabase +from karapace.karapacemetrics import KarapaceMetrics from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode from karapace.master_coordinator import MasterCoordinator from karapace.offset_watcher import OffsetWatcher from karapace.protobuf.schema import ProtobufSchema from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents -from karapace.statsd import StatsClient from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject from karapace.utils import json_decode, JSONDecodeError, KarapaceKafkaClient from threading import Event, Thread @@ -127,7 +127,7 @@ def __init__( self.topic_replication_factor = self.config["replication_factor"] self.consumer: KafkaConsumer | None = None self._offset_watcher = offset_watcher - self.stats = StatsClient(config=config) + self.stats = KarapaceMetrics().stats_client # Thread synchronization objects # - offset is used by the REST API to wait until this thread has diff --git a/karapace/statsd.py b/karapace/statsd.py index 8115f76f9..b54bcd5bd 100644 --- a/karapace/statsd.py +++ b/karapace/statsd.py @@ -10,22 +10,20 @@ """ from __future__ import annotations -from contextlib import contextmanager +from karapace.base_stats import StatsClient from karapace.config import Config -from karapace.sentry import get_sentry_client -from typing import Any, Final, Iterator +from typing import Any, Final import datetime import logging import socket -import time STATSD_HOST: Final = "127.0.0.1" STATSD_PORT: Final = 8125 LOG = logging.getLogger(__name__) -class StatsClient: +class StatsdClient(StatsClient): def __init__( self, config: Config, @@ -36,14 +34,6 @@ def __init__( _port = config.get("statsd_port") if "statsd_port" in config else port self._dest_addr: Final = (_host, _port) self._socket: Final = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self._tags: Final = config.get("tags", {}) - self.sentry_client: Final = get_sentry_client(sentry_config=config.get("sentry", None)) - - @contextmanager - def timing_manager(self, metric: str, tags: dict | None = None) -> Iterator[None]: - start_time = time.monotonic() - yield - self.timing(metric, time.monotonic() - start_time, tags) def gauge(self, metric: str, value: float, tags: dict | None = None) -> None: self._send(metric, b"g", value, tags) @@ -54,16 +44,6 @@ def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> def timing(self, metric: str, value: float, tags: dict | None = None) -> None: self._send(metric, b"ms", value, tags) - def unexpected_exception(self, ex: Exception, where: str, tags: dict | None = None) -> None: - all_tags = { - "exception": ex.__class__.__name__, - "where": where, - } - all_tags.update(tags or {}) - self.increase("exception", tags=all_tags) - scope_args = {**(tags or {}), "where": where} - self.sentry_client.unexpected_exception(error=ex, where=where, tags=scope_args) - def _send(self, metric: str, metric_type: bytes, value: Any, tags: dict | None) -> None: if None in self._dest_addr: # stats sending is disabled @@ -95,4 +75,4 @@ def _send(self, metric: str, metric_type: bytes, value: Any, tags: dict | None) def close(self) -> None: self._socket.close() - self.sentry_client.close() + super().close() diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index 1d3a947d2..864dc0f3b 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -151,6 +151,8 @@ pkgutil-resolve-name==1.3.10 # jsonschema pluggy==1.2.0 # via pytest +prometheus-client==0.17.0 + # via -r requirements.txt protobuf==3.20.3 # via -r requirements.txt psutil==5.9.5 diff --git a/requirements/requirements.in b/requirements/requirements.in index 8e6893e2b..f4e95c895 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -13,6 +13,7 @@ ujson<6 watchfiles<1 schedule psutil +prometheus-client xxhash~=3.3 rich~=12.6.0 cachetools==5.3.1 diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 912af3587..857ffb3d0 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -65,6 +65,8 @@ packaging==23.1 # via aiokafka pkgutil-resolve-name==1.3.10 # via jsonschema +prometheus-client==0.17.0 + # via -r requirements.in protobuf==3.20.3 # via -r requirements.in psutil==5.9.5 From 30a30ad53990e1689a989e41ce14a6383940a056 Mon Sep 17 00:00:00 2001 From: libretto Date: Sat, 2 Sep 2023 20:41:28 +0300 Subject: [PATCH 18/25] refactoring --- karapace/{karapacemetrics.py => metrics.py} | 2 +- karapace/rapu.py | 18 +++++++++--------- karapace/schema_reader.py | 4 ++-- karapace/schema_registry.py | 4 ++-- 4 files changed, 14 insertions(+), 14 deletions(-) rename karapace/{karapacemetrics.py => metrics.py} (98%) diff --git a/karapace/karapacemetrics.py b/karapace/metrics.py similarity index 98% rename from karapace/karapacemetrics.py rename to karapace/metrics.py index 5e3acaa89..c05465d9f 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/metrics.py @@ -36,7 +36,7 @@ def __call__(cls, *args: str, **kwargs: int) -> Singleton: return cls._instance -class KarapaceMetrics(metaclass=Singleton): +class Metrics(metaclass=Singleton): def __init__(self) -> None: self.active = False self.stats_client: StatsClient | None diff --git a/karapace/rapu.py b/karapace/rapu.py index 8157f22cb..cecd19381 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -9,7 +9,7 @@ from accept_types import get_best_match from http import HTTPStatus from karapace.config import Config, create_server_ssl_context -from karapace.karapacemetrics import KarapaceMetrics +from karapace.metrics import Metrics from karapace.utils import json_decode, json_encode from karapace.version import __version__ from typing import Callable, Dict, NoReturn, Optional, overload, Union @@ -135,7 +135,7 @@ def __init__( self.headers["Content-Type"] = content_type super().__init__(f"HTTPResponse {status.value}") if not is_success(status): - KarapaceMetrics().error() + Metrics().error() def ok(self) -> bool: """True if resposne has a 2xx status_code""" @@ -168,8 +168,8 @@ def __init__( self.app_request_metric = f"{app_name}_request" self.app = self._create_aiohttp_application(config=config) self.log = logging.getLogger(self.app_name) - KarapaceMetrics().setup(config) - self.stats = KarapaceMetrics().stats_client + Metrics().setup(config) + self.stats = Metrics().stats_client self.app.on_cleanup.append(self.close_by_app) self.not_ready_handler = not_ready_handler @@ -186,7 +186,7 @@ async def close(self) -> None: set as hook because the awaitables have to run inside the event loop created by the aiohttp library. """ - KarapaceMetrics().cleanup() + Metrics().cleanup() @staticmethod def cors_and_server_headers_for_request(*, request, origin="*"): # pylint: disable=unused-argument @@ -284,9 +284,9 @@ async def _handle_request( body = await request.read() if body: - KarapaceMetrics().request(len(body)) + Metrics().request(len(body)) else: - KarapaceMetrics().request(0) + Metrics().request(0) if json_request: if not body: raise HTTPResponse(body="Missing request JSON body", status=HTTPStatus.BAD_REQUEST) @@ -403,8 +403,8 @@ async def _handle_request( self.log.exception("Unexpected error handling user request: %s %s", request.method, request.url) resp = aiohttp.web.Response(text="Internal Server Error", status=HTTPStatus.INTERNAL_SERVER_ERROR.value) finally: - KarapaceMetrics().response(resp.content_length) - KarapaceMetrics().latency((time.monotonic() - start_time) * 1000) + Metrics().response(resp.content_length) + Metrics().latency((time.monotonic() - start_time) * 1000) self.stats.timing( self.app_request_metric, time.monotonic() - start_time, diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 0bc8175be..8b060a37b 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -24,9 +24,9 @@ from karapace.dependency import Dependency from karapace.errors import InvalidReferences, InvalidSchema from karapace.in_memory_database import InMemoryDatabase -from karapace.karapacemetrics import KarapaceMetrics from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode from karapace.master_coordinator import MasterCoordinator +from karapace.metrics import Metrics from karapace.offset_watcher import OffsetWatcher from karapace.protobuf.schema import ProtobufSchema from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema @@ -127,7 +127,7 @@ def __init__( self.topic_replication_factor = self.config["replication_factor"] self.consumer: KafkaConsumer | None = None self._offset_watcher = offset_watcher - self.stats = KarapaceMetrics().stats_client + self.stats = Metrics().stats_client # Thread synchronization objects # - offset is used by the REST API to wait until this thread has diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index 8f251d3d8..e29e167c7 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -22,10 +22,10 @@ VersionNotFoundException, ) from karapace.in_memory_database import InMemoryDatabase -from karapace.karapacemetrics import KarapaceMetrics from karapace.key_format import KeyFormatter from karapace.master_coordinator import MasterCoordinator from karapace.messaging import KarapaceProducer +from karapace.metrics import Metrics from karapace.offset_watcher import OffsetWatcher from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema from karapace.schema_reader import KafkaSchemaReader @@ -124,7 +124,7 @@ async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str | elif not ignore_readiness and self.schema_reader.ready is False: LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready) else: - KarapaceMetrics().are_we_master(are_we_master) + Metrics().are_we_master(are_we_master) return are_we_master, master_url await asyncio.sleep(1.0) From 0c73a1a5e3c15fda0d4895d01f2fbfca84e302e9 Mon Sep 17 00:00:00 2001 From: libretto Date: Sat, 2 Sep 2023 20:44:23 +0300 Subject: [PATCH 19/25] refactor --- karapace/{karapacemetrics.py => metrics.py} | 2 +- karapace/rapu.py | 16 ++++++++-------- karapace/schema_registry.py | 4 ++-- 3 files changed, 11 insertions(+), 11 deletions(-) rename karapace/{karapacemetrics.py => metrics.py} (98%) diff --git a/karapace/karapacemetrics.py b/karapace/metrics.py similarity index 98% rename from karapace/karapacemetrics.py rename to karapace/metrics.py index 8f90436f9..bf45bec3a 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/metrics.py @@ -30,7 +30,7 @@ def __call__(cls, *args: str, **kwargs: int) -> Singleton: return cls._instance -class KarapaceMetrics(metaclass=Singleton): +class Metrics(metaclass=Singleton): def __init__(self) -> None: self.active = False self.stats_client: StatsClient | None = None diff --git a/karapace/rapu.py b/karapace/rapu.py index c506445a4..5978d3c30 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -9,7 +9,7 @@ from accept_types import get_best_match from http import HTTPStatus from karapace.config import Config, create_server_ssl_context -from karapace.karapacemetrics import KarapaceMetrics +from karapace.metrics import Metrics from karapace.statsd import StatsClient from karapace.utils import json_decode, json_encode from karapace.version import __version__ @@ -136,7 +136,7 @@ def __init__( self.headers["Content-Type"] = content_type super().__init__(f"HTTPResponse {status.value}") if not is_success(status): - KarapaceMetrics().error() + Metrics().error() def ok(self) -> bool: """True if resposne has a 2xx status_code""" @@ -172,7 +172,7 @@ def __init__( self.stats = StatsClient(config=config) self.app.on_cleanup.append(self.close_by_app) self.not_ready_handler = not_ready_handler - KarapaceMetrics().setup(self.stats, config) + Metrics().setup(self.stats, config) def _create_aiohttp_application(self, *, config: Config) -> aiohttp.web.Application: return aiohttp.web.Application(client_max_size=config["http_request_max_size"]) @@ -187,7 +187,7 @@ async def close(self) -> None: set as hook because the awaitables have to run inside the event loop created by the aiohttp library. """ - KarapaceMetrics().cleanup() + Metrics().cleanup() self.stats.close() @staticmethod @@ -286,9 +286,9 @@ async def _handle_request( body = await request.read() if body: - KarapaceMetrics().request(len(body)) + Metrics().request(len(body)) else: - KarapaceMetrics().request(0) + Metrics().request(0) if json_request: if not body: raise HTTPResponse(body="Missing request JSON body", status=HTTPStatus.BAD_REQUEST) @@ -405,8 +405,8 @@ async def _handle_request( self.log.exception("Unexpected error handling user request: %s %s", request.method, request.url) resp = aiohttp.web.Response(text="Internal Server Error", status=HTTPStatus.INTERNAL_SERVER_ERROR.value) finally: - KarapaceMetrics().response(resp.content_length) - KarapaceMetrics().latency((time.monotonic() - start_time) * 1000) + Metrics().response(resp.content_length) + Metrics().latency((time.monotonic() - start_time) * 1000) self.stats.timing( self.app_request_metric, time.monotonic() - start_time, diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index 8f251d3d8..804d7495e 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -22,7 +22,7 @@ VersionNotFoundException, ) from karapace.in_memory_database import InMemoryDatabase -from karapace.karapacemetrics import KarapaceMetrics +from karapace.metrics import Metrics from karapace.key_format import KeyFormatter from karapace.master_coordinator import MasterCoordinator from karapace.messaging import KarapaceProducer @@ -124,7 +124,7 @@ async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str | elif not ignore_readiness and self.schema_reader.ready is False: LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready) else: - KarapaceMetrics().are_we_master(are_we_master) + Metrics().are_we_master(are_we_master) return are_we_master, master_url await asyncio.sleep(1.0) From c495c506dcdec57e8b3e4371ec4bd86b926450da Mon Sep 17 00:00:00 2001 From: libretto Date: Sat, 2 Sep 2023 20:46:19 +0300 Subject: [PATCH 20/25] fixup --- karapace/schema_registry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index 804d7495e..e29e167c7 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -22,10 +22,10 @@ VersionNotFoundException, ) from karapace.in_memory_database import InMemoryDatabase -from karapace.metrics import Metrics from karapace.key_format import KeyFormatter from karapace.master_coordinator import MasterCoordinator from karapace.messaging import KarapaceProducer +from karapace.metrics import Metrics from karapace.offset_watcher import OffsetWatcher from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema from karapace.schema_reader import KafkaSchemaReader From 33f6cebe84badfe185ad6ebea372b49d924e240b Mon Sep 17 00:00:00 2001 From: libretto Date: Tue, 5 Sep 2023 15:22:08 +0300 Subject: [PATCH 21/25] fixup issues --- karapace/metrics.py | 8 ++------ karapace/statsd.py | 1 - 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/karapace/metrics.py b/karapace/metrics.py index 5bbcb6b29..a0aff7e0d 100644 --- a/karapace/metrics.py +++ b/karapace/metrics.py @@ -52,8 +52,9 @@ def setup(self, config: Config) -> None: elif stats_service == "prometheus": self.stats_client = PrometheusClient(config=config) else: + self.active = False raise MetricsException('Config variable "stats_service" is not defined') - if config.get("metrics_extended"): # for mypy check pass + if config.get("metrics_extended"): # for mypy check pass self.active = True if not self.active: return @@ -61,11 +62,6 @@ def setup(self, config: Config) -> None: if self.is_ready: return self.is_ready = True - if not self.stats_client: - self.stats_client = stats_client - else: - self.active = False - return schedule.every(10).seconds.do(self.connections) self.worker_thread.start() diff --git a/karapace/statsd.py b/karapace/statsd.py index 4c1cefe50..fcd3c8572 100644 --- a/karapace/statsd.py +++ b/karapace/statsd.py @@ -30,7 +30,6 @@ def __init__( host: str = STATSD_HOST, port: int = STATSD_PORT, ) -> None: - super().__init__(config) self._tags: Final[dict] = config.get("tags", {}) _host = config.get("statsd_host") if "statsd_host" in config else host From 34fa7dcef5a363f21d50e12432fb8430488b7637 Mon Sep 17 00:00:00 2001 From: libretto Date: Tue, 5 Sep 2023 19:58:37 +0300 Subject: [PATCH 22/25] fixup --- karapace/metrics.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/karapace/metrics.py b/karapace/metrics.py index a0aff7e0d..03ee39718 100644 --- a/karapace/metrics.py +++ b/karapace/metrics.py @@ -38,7 +38,6 @@ def __call__(cls, *args: str, **kwargs: int) -> Singleton: class Metrics(metaclass=Singleton): def __init__(self) -> None: - self.active = False self.stats_client: StatsClient self.is_ready = False self.stop_event = threading.Event() @@ -47,17 +46,14 @@ def __init__(self) -> None: def setup(self, config: Config) -> None: stats_service = config.get("stats_service") + if not config.get("metrics_extended"): + return if stats_service == "statsd": self.stats_client = StatsdClient(config=config) elif stats_service == "prometheus": self.stats_client = PrometheusClient(config=config) else: - self.active = False raise MetricsException('Config variable "stats_service" is not defined') - if config.get("metrics_extended"): # for mypy check pass - self.active = True - if not self.active: - return with self.lock: if self.is_ready: return @@ -67,42 +63,42 @@ def setup(self, config: Config) -> None: self.worker_thread.start() def request(self, size: int) -> None: - if not self.active: + if not self.is_ready or self.stats_client is None: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") self.stats_client.gauge("request-size", size) def response(self, size: int) -> None: - if not self.active: + if not self.is_ready or self.stats_client is None: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") self.stats_client.gauge("response-size", size) def are_we_master(self, is_master: bool) -> None: - if not self.active: + if not self.is_ready or self.stats_client is None: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") self.stats_client.gauge("master-slave-role", int(is_master)) def latency(self, latency_ms: float) -> None: - if not self.active: + if not self.is_ready or self.stats_client is None: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") self.stats_client.timing("latency_ms", latency_ms) def error(self) -> None: - if not self.active: + if not self.is_ready or self.stats_client is None: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") self.stats_client.increase("error_total", 1) def connections(self) -> None: - if not self.active: + if not self.is_ready or self.stats_client is None: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") @@ -124,7 +120,7 @@ def worker(self) -> None: def cleanup(self) -> None: if self.stats_client: self.stats_client.close() - if not self.active: + if not self.is_ready: return self.stop_event.set() if self.worker_thread.is_alive(): From 46605f4a24d09825b6a6cfb35ef05041ad10bccf Mon Sep 17 00:00:00 2001 From: libretto Date: Tue, 5 Sep 2023 20:50:08 +0300 Subject: [PATCH 23/25] fixup --- karapace.config.json | 3 +-- karapace/config.py | 5 ++--- karapace/statsd.py | 2 +- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/karapace.config.json b/karapace.config.json index 798ab2ffe..f07af6710 100644 --- a/karapace.config.json +++ b/karapace.config.json @@ -33,7 +33,6 @@ "statsd_host": "127.0.0.1", "statsd_port": 8125, "prometheus_host": "127.0.0.1", - "prometheus_port": 8005 - + "prometheus_port": 8005, } diff --git a/karapace/config.py b/karapace/config.py index 1fc7f1a59..b3016eb07 100644 --- a/karapace/config.py +++ b/karapace/config.py @@ -82,9 +82,8 @@ class Config(TypedDict): prometheus_host: str | None prometheus_port: int | None - -sentry: NotRequired[Mapping[str, object]] -tags: NotRequired[Mapping[str, object]] + sentry: NotRequired[Mapping[str, object]] + tags: NotRequired[Mapping[str, object]] class ConfigDefaults(Config, total=False): diff --git a/karapace/statsd.py b/karapace/statsd.py index fcd3c8572..36fdc24a6 100644 --- a/karapace/statsd.py +++ b/karapace/statsd.py @@ -31,7 +31,7 @@ def __init__( port: int = STATSD_PORT, ) -> None: super().__init__(config) - self._tags: Final[dict] = config.get("tags", {}) + self._tags: Final = config.get("tags", {}) _host = config.get("statsd_host") if "statsd_host" in config else host _port = config.get("statsd_port") if "statsd_port" in config else port self._dest_addr: Final = (_host, _port) From f670782e4c50f360dfd64e0737dda7cce78d1c94 Mon Sep 17 00:00:00 2001 From: libretto Date: Tue, 5 Sep 2023 23:39:15 +0300 Subject: [PATCH 24/25] fixup --- karapace/rapu.py | 2 +- requirements/requirements-dev.txt | 33 ++++++++++++++-------------- requirements/requirements-typing.txt | 4 +++- requirements/requirements.txt | 12 +++++----- 4 files changed, 27 insertions(+), 24 deletions(-) diff --git a/karapace/rapu.py b/karapace/rapu.py index cecd19381..ed8bb43c8 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -18,7 +18,7 @@ import aiohttp.web import aiohttp.web_exceptions import asyncio -import cgi +import cgi # pylint: disable=deprecated-module import hashlib import logging import re diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index 1461090b1..f023fcc5b 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -14,11 +14,11 @@ aiosignal==1.3.1 # via # -r requirements.txt # aiohttp -anyio==3.7.1 +anyio==4.0.0 # via # -r requirements.txt # watchfiles -async-timeout==4.0.2 +async-timeout==4.0.3 # via # -r requirements.txt # aiohttp @@ -48,7 +48,7 @@ charset-normalizer==3.2.0 # -r requirements.txt # aiohttp # requests -click==8.1.6 +click==8.1.7 # via flask commonmark==0.9.1 # via @@ -56,7 +56,7 @@ commonmark==0.9.1 # rich configargparse==1.7 # via locust -exceptiongroup==1.1.2 +exceptiongroup==1.1.3 # via # -r requirements.txt # anyio @@ -66,9 +66,9 @@ execnet==2.0.2 # via pytest-xdist fancycompleter==0.9.1 # via pdbpp -filelock==3.12.2 +filelock==3.12.3 # via -r requirements-dev.in -flask==2.3.2 +flask==2.3.3 # via # flask-basicauth # flask-cors @@ -82,15 +82,15 @@ frozenlist==1.4.0 # -r requirements.txt # aiohttp # aiosignal -gevent==23.7.0 +gevent==23.9.0.post1 # via # geventhttpclient # locust -geventhttpclient==2.0.9 +geventhttpclient==2.0.10 # via locust greenlet==2.0.2 # via gevent -hypothesis==6.82.7 +hypothesis==6.84.1 # via -r requirements-dev.in idna==3.4 # via @@ -149,9 +149,9 @@ pkgutil-resolve-name==1.3.10 # via # -r requirements.txt # jsonschema -pluggy==1.2.0 +pluggy==1.3.0 # via pytest -prometheus-client==0.17.0 +prometheus-client==0.17.1 # via -r requirements.txt protobuf==3.20.3 # via -r requirements.txt @@ -168,7 +168,7 @@ pygments==2.16.1 # rich pyrepl==0.9.0 # via fancycompleter -pytest==7.4.0 +pytest==7.4.1 # via # -r requirements-dev.in # pytest-timeout @@ -179,7 +179,7 @@ pytest-xdist[psutil]==3.3.1 # via -r requirements-dev.in python-dateutil==2.8.2 # via -r requirements.txt -pyzmq==25.1.0 +pyzmq==25.1.1 # via locust referencing==0.30.2 # via @@ -194,7 +194,7 @@ rich==12.6.0 # via -r requirements.txt roundrobin==0.0.4 # via locust -rpds-py==0.9.2 +rpds-py==0.10.2 # via # -r requirements.txt # jsonschema @@ -215,13 +215,14 @@ sniffio==1.3.0 # anyio sortedcontainers==2.4.0 # via hypothesis -tenacity==8.2.2 +tenacity==8.2.3 # via -r requirements.txt tomli==2.0.1 # via pytest typing-extensions==4.7.1 # via # -r requirements.txt + # filelock # locust # rich ujson==5.8.0 @@ -232,7 +233,7 @@ urllib3==2.0.4 # sentry-sdk watchfiles==0.20.0 # via -r requirements.txt -werkzeug==2.3.6 +werkzeug==2.3.7 # via # flask # locust diff --git a/requirements/requirements-typing.txt b/requirements/requirements-typing.txt index 8e58e4e53..f035ad784 100644 --- a/requirements/requirements-typing.txt +++ b/requirements/requirements-typing.txt @@ -8,7 +8,7 @@ certifi==2023.7.22 # via # -c requirements-dev.txt # sentry-sdk -mypy==1.4.1 +mypy==1.5.1 # via -r requirements-typing.in mypy-extensions==1.0.0 # via mypy @@ -24,6 +24,8 @@ types-cachetools==5.3.0.6 # via -r requirements-typing.in types-jsonschema==4.17.0.10 # via -r requirements-typing.in +types-psutil==5.9.5.16 + # via -r requirements-typing.in typing-extensions==4.7.1 # via # -c requirements-dev.txt diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 34a4b11d0..3651ae066 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -12,9 +12,9 @@ aiokafka==0.8.1 # via -r requirements.in aiosignal==1.3.1 # via aiohttp -anyio==3.7.1 +anyio==4.0.0 # via watchfiles -async-timeout==4.0.2 +async-timeout==4.0.3 # via # aiohttp # aiokafka @@ -31,7 +31,7 @@ charset-normalizer==3.2.0 # via aiohttp commonmark==0.9.1 # via rich -exceptiongroup==1.1.2 +exceptiongroup==1.1.3 # via anyio frozenlist==1.4.0 # via @@ -65,7 +65,7 @@ packaging==23.1 # via aiokafka pkgutil-resolve-name==1.3.10 # via jsonschema -prometheus-client==0.17.0 +prometheus-client==0.17.1 # via -r requirements.in protobuf==3.20.3 # via -r requirements.in @@ -81,7 +81,7 @@ referencing==0.30.2 # jsonschema-specifications rich==12.6.0 # via -r requirements.in -rpds-py==0.9.2 +rpds-py==0.10.2 # via # jsonschema # referencing @@ -93,7 +93,7 @@ six==1.16.0 # python-dateutil sniffio==1.3.0 # via anyio -tenacity==8.2.2 +tenacity==8.2.3 # via -r requirements.in typing-extensions==4.7.1 # via From ff8bf58d42b3d8e27c9e73a5d3f3615ba4e23a6e Mon Sep 17 00:00:00 2001 From: libretto Date: Wed, 6 Sep 2023 13:52:12 +0300 Subject: [PATCH 25/25] fixup --- karapace/rapu.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/karapace/rapu.py b/karapace/rapu.py index ed8bb43c8..6ab3b2419 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -18,7 +18,7 @@ import aiohttp.web import aiohttp.web_exceptions import asyncio -import cgi # pylint: disable=deprecated-module +import cgi # pylint: disable=deprecated-module import hashlib import logging import re