Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus client metrics support #710

Closed
wants to merge 34 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e3cb524
Karapace metrics
libretto Jun 9, 2023
32ce060
Merge branch 'master' into karapace-metrics
libretto Jun 9, 2023
8dab84d
fixup issues
libretto Jun 10, 2023
2898e31
fixup issues
libretto Jun 10, 2023
c974579
fixup annotations issue
libretto Jun 12, 2023
7256f5d
fixup exception message
libretto Jun 12, 2023
ab6ae96
get rid of multiple instances of class
libretto Jun 12, 2023
733d1f2
fixup issue
libretto Jun 12, 2023
8751eea
change code to send raw data only
libretto Jun 16, 2023
53d3e4b
merge with master
libretto Jun 16, 2023
fedff8f
fixup
libretto Jun 22, 2023
31d16d4
Merge branch 'master' into karapace-metrics
libretto Jun 22, 2023
b70ae03
fixup code
libretto Jun 22, 2023
a0387a3
fixup
libretto Jun 22, 2023
358facc
fixup
libretto Jun 22, 2023
a064624
merge
libretto Jul 3, 2023
8533959
improve code by request
libretto Aug 8, 2023
ac48829
merge with main
libretto Aug 8, 2023
90e221c
add psutil typing support
libretto Aug 8, 2023
4c48576
fixup
libretto Aug 8, 2023
f9cb6d8
fixup
libretto Aug 8, 2023
765864b
Merge branch 'main' into karapace-metrics
libretto Aug 30, 2023
9cdcba7
add prometheus support
libretto Sep 2, 2023
073aa16
merge with master
libretto Sep 2, 2023
75b2913
Merge branch 'karapace-metrics' into prometheus
libretto Sep 2, 2023
30a30ad
refactoring
libretto Sep 2, 2023
0c73a1a
refactor
libretto Sep 2, 2023
c495c50
fixup
libretto Sep 2, 2023
61e659d
fixup
libretto Sep 2, 2023
33f6ceb
fixup issues
libretto Sep 5, 2023
34fa7dc
fixup
libretto Sep 5, 2023
46605f4
fixup
libretto Sep 5, 2023
f670782
fixup
libretto Sep 5, 2023
ff8bf58
fixup
libretto Sep 6, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,15 @@ Keys to take special care are the ones needed to configure Kafka and advertised_
* - ``master_election_strategy``
- ``lowest``
- Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup)

* - ``metrics_extended``
- ``true``
- Enable extended metrics. Extended metrics: connections_active, [request|response]_size
* - ``statsd_host``
- ``127.0.0.1``
- Host of statsd server
* - ``statsd_port``
- ``8125``
- Port of statsd server

Authentication and authorization of Karapace Schema Registry REST API
=====================================================================
Expand Down
9 changes: 8 additions & 1 deletion karapace.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,12 @@
"registry_authfile": null,
"topic_name": "_schemas",
"protobuf_runtime_directory": "runtime",
"session_timeout_ms": 10000
"session_timeout_ms": 10000,
"stats_service": "statsd",
"metrics_extended": true,
"statsd_host": "127.0.0.1",
"statsd_port": 8125,
"prometheus_host": "127.0.0.1",
"prometheus_port": 8005,

}
57 changes: 57 additions & 0 deletions karapace/base_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
karapace - basestats

Supports base class for statsd and prometheus protocols:

Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from abc import ABC, abstractmethod
from contextlib import contextmanager
from karapace.config import Config
from karapace.sentry import get_sentry_client
from typing import Final, Iterator

import time


class StatsClient(ABC):
@abstractmethod
def __init__(
self,
config: Config,
) -> None:
self.sentry_client: Final = get_sentry_client(sentry_config=config.get("sentry", None))

@contextmanager
def timing_manager(self, metric: str, tags: dict | None = None) -> Iterator[None]:
start_time = time.monotonic()
yield
self.timing(metric, time.monotonic() - start_time, tags)

@abstractmethod
def gauge(self, metric: str, value: float, tags: dict | None = None) -> None:
pass

@abstractmethod
def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None:
pass

@abstractmethod
def timing(self, metric: str, value: float, tags: dict | None = None) -> None:
pass

def unexpected_exception(self, ex: Exception, where: str, tags: dict | None = None) -> None:
all_tags = {
"exception": ex.__class__.__name__,
"where": where,
}
all_tags.update(tags or {})
self.increase("exception", tags=all_tags)
scope_args = {**(tags or {}), "where": where}
self.sentry_client.unexpected_exception(error=ex, where=where, tags=scope_args)

def close(self) -> None:
self.sentry_client.close()
12 changes: 12 additions & 0 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ class Config(TypedDict):
karapace_registry: bool
master_election_strategy: str
protobuf_runtime_directory: str
stats_service: str
metrics_extended: bool
statsd_host: str
statsd_port: int
prometheus_host: str | None
prometheus_port: int | None

sentry: NotRequired[Mapping[str, object]]
tags: NotRequired[Mapping[str, object]]
Expand Down Expand Up @@ -144,6 +150,12 @@ class ConfigDefaults(Config, total=False):
"karapace_registry": False,
"master_election_strategy": "lowest",
"protobuf_runtime_directory": "runtime",
"stats_service": "statsd",
"metrics_extended": True,
"statsd_host": "127.0.0.1",
"statsd_port": 8125,
"prometheus_host": "127.0.0.1",
"prometheus_port": 8005,
}
SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD]

Expand Down
127 changes: 127 additions & 0 deletions karapace/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""
karapace - metrics
Supports collection of system metrics
list of supported metrics:
connections-active - The number of active HTTP(S) connections to server.
Data collected inside aiohttp request handler.

Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from karapace.base_stats import StatsClient
from karapace.config import Config
from karapace.prometheus import PrometheusClient
from karapace.statsd import StatsdClient

import os
import psutil
import schedule
import threading
import time


class MetricsException(Exception):
pass


class Singleton(type):
_instance: Singleton | None = None

def __call__(cls, *args: str, **kwargs: int) -> Singleton:
if cls._instance is None:
instance = super().__call__(*args, **kwargs)
cls._instance = instance
return cls._instance


class Metrics(metaclass=Singleton):
def __init__(self) -> None:
self.stats_client: StatsClient
self.is_ready = False
self.stop_event = threading.Event()
self.worker_thread = threading.Thread(target=self.worker)
self.lock = threading.Lock()

def setup(self, config: Config) -> None:
stats_service = config.get("stats_service")
if not config.get("metrics_extended"):
return
if stats_service == "statsd":
self.stats_client = StatsdClient(config=config)
elif stats_service == "prometheus":
self.stats_client = PrometheusClient(config=config)
else:
raise MetricsException('Config variable "stats_service" is not defined')
with self.lock:
if self.is_ready:
return
self.is_ready = True

schedule.every(10).seconds.do(self.connections)
self.worker_thread.start()

def request(self, size: int) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.gauge("request-size", size)

def response(self, size: int) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.gauge("response-size", size)

def are_we_master(self, is_master: bool) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.gauge("master-slave-role", int(is_master))

def latency(self, latency_ms: float) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.timing("latency_ms", latency_ms)

def error(self) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.increase("error_total", 1)

def connections(self) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
connections = 0
karapace_proc = psutil.Process(os.getpid())

for conn in karapace_proc.connections(kind="tcp"):
if conn.laddr and conn.status == "ESTABLISHED":
connections += 1
self.stats_client.gauge("connections-active", connections)

def worker(self) -> None:
while True:
if self.stop_event.is_set():
break
schedule.run_pending()
time.sleep(1)

def cleanup(self) -> None:
if self.stats_client:
self.stats_client.close()
if not self.is_ready:
return
self.stop_event.set()
if self.worker_thread.is_alive():
self.worker_thread.join()
69 changes: 69 additions & 0 deletions karapace/prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
karapace - prometheus

Supports telegraf's statsd protocol extension for 'key=value' tags:

https://github.com/influxdata/telegraf/tree/master/plugins/inputs/statsd

Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from karapace.base_stats import StatsClient
from karapace.config import Config
from prometheus_client import Counter, Gauge, start_http_server, Summary
from typing import Final

import logging

LOG = logging.getLogger(__name__)
HOST: Final = "127.0.0.1"
PORT: Final = 8005


class PrometheusException(Exception):
pass


class PrometheusClient(StatsClient):
server_is_active = False

def __init__(self, config: Config, host: str = HOST, port: int = PORT) -> None:
super().__init__(config)

_host = config.get("prometheus_host") if "prometheus_host" in config else host
_port = config.get("prometheus_port") if "prometheus_port" in config else port
if _host is None:
raise PrometheusException("prometheus_host host is undefined")
if _port is None:
raise PrometheusException("prometheus_host port is undefined")
if not self.server_is_active:
start_http_server(_port, _host)
self.server_is_active = True
else:
raise PrometheusException("Double instance of Prometheus interface")
self._gauge: dict[str, Gauge] = dict()
self._summary: dict[str, Summary] = dict()
self._counter: dict[str, Counter] = dict()

def gauge(self, metric: str, value: float, tags: dict | None = None) -> None:
m = self._gauge.get(metric)
if m is None:
m = Gauge(metric, metric)
self._gauge[metric] = m
m.set(value)

def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None:
m = self._counter.get(metric)
if m is None:
m = Counter(metric, metric)
self._counter[metric] = m
m.inc(inc_value)

def timing(self, metric: str, value: float, tags: dict | None = None) -> None:
m = self._summary.get(metric)
if m is None:
m = Summary(metric, metric)
self._summary[metric] = m
m.observe(value)
18 changes: 15 additions & 3 deletions karapace/rapu.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from accept_types import get_best_match
from http import HTTPStatus
from karapace.config import Config, create_server_ssl_context
from karapace.statsd import StatsClient
from karapace.metrics import Metrics
from karapace.utils import json_decode, json_encode
from karapace.version import __version__
from typing import Callable, Dict, NoReturn, Optional, overload, Union
Expand Down Expand Up @@ -134,6 +134,8 @@ def __init__(
if content_type:
self.headers["Content-Type"] = content_type
super().__init__(f"HTTPResponse {status.value}")
if not is_success(status):
Metrics().error()

def ok(self) -> bool:
"""True if resposne has a 2xx status_code"""
Expand Down Expand Up @@ -166,7 +168,8 @@ def __init__(
self.app_request_metric = f"{app_name}_request"
self.app = self._create_aiohttp_application(config=config)
self.log = logging.getLogger(self.app_name)
self.stats = StatsClient(config=config)
Metrics().setup(config)
self.stats = Metrics().stats_client
self.app.on_cleanup.append(self.close_by_app)
self.not_ready_handler = not_ready_handler

Expand All @@ -183,7 +186,7 @@ async def close(self) -> None:
set as hook because the awaitables have to run inside the event loop
created by the aiohttp library.
"""
self.stats.close()
Metrics().cleanup()

@staticmethod
def cors_and_server_headers_for_request(*, request, origin="*"): # pylint: disable=unused-argument
Expand Down Expand Up @@ -269,15 +272,21 @@ async def _handle_request(
url=request.url,
path_for_stats=path_for_stats,
)

try:
if request.method == "OPTIONS":
origin = request.headers.get("Origin")
if not origin:
raise HTTPResponse(body="OPTIONS missing Origin", status=HTTPStatus.BAD_REQUEST)
headers = self.cors_and_server_headers_for_request(request=rapu_request, origin=origin)

raise HTTPResponse(body=b"", status=HTTPStatus.OK, headers=headers)

body = await request.read()
if body:
Metrics().request(len(body))
else:
Metrics().request(0)
if json_request:
if not body:
raise HTTPResponse(body="Missing request JSON body", status=HTTPStatus.BAD_REQUEST)
Expand Down Expand Up @@ -385,6 +394,7 @@ async def _handle_request(
)
headers = {"Content-Type": "application/json"}
resp = aiohttp.web.Response(body=body, status=status.value, headers=headers)

except asyncio.CancelledError:
self.log.debug("Client closed connection")
raise
Expand All @@ -393,6 +403,8 @@ async def _handle_request(
self.log.exception("Unexpected error handling user request: %s %s", request.method, request.url)
resp = aiohttp.web.Response(text="Internal Server Error", status=HTTPStatus.INTERNAL_SERVER_ERROR.value)
finally:
Metrics().response(resp.content_length)
Metrics().latency((time.monotonic() - start_time) * 1000)
self.stats.timing(
self.app_request_metric,
time.monotonic() - start_time,
Expand Down
Loading
Loading