From 7cfcb5d62ff6a650967a86d7f815b755fc826772 Mon Sep 17 00:00:00 2001 From: Till Prochaska Date: Mon, 16 Oct 2023 23:43:22 +0200 Subject: [PATCH 01/20] Add Prometheus instrumentation Closes #3214 --- Dockerfile | 4 +- aleph/core.py | 3 + aleph/metrics/__init__.py | 96 +++++++++++++++++++ aleph/metrics/collectors.py | 181 ++++++++++++++++++++++++++++++++++++ aleph/settings.py | 8 +- aleph/tests/test_metrics.py | 177 +++++++++++++++++++++++++++++++++++ aleph/tests/util.py | 21 +++-- docker-compose.yml | 1 - gunicorn.conf.py | 34 +++++++ requirements.txt | 1 + 10 files changed, 514 insertions(+), 12 deletions(-) create mode 100644 aleph/metrics/__init__.py create mode 100644 aleph/metrics/collectors.py create mode 100644 aleph/tests/test_metrics.py create mode 100644 gunicorn.conf.py diff --git a/Dockerfile b/Dockerfile index 05ebeaa78f..fbb40cb8c6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -46,5 +46,7 @@ ENV ALEPH_ELASTICSEARCH_URI=http://elasticsearch:9200/ \ FTM_COMPARE_FREQUENCIES_DIR=/opt/ftm-compare/word-frequencies/ \ FTM_COMPARE_MODEL=/opt/ftm-compare/model.pkl +RUN mkdir /run/prometheus + # Run the green unicorn -CMD gunicorn -w 5 -b 0.0.0.0:8000 --log-level info --log-file - aleph.manage:app +CMD gunicorn --config /aleph/gunicorn.conf.py --workers 5 --log-level info --log-file - aleph.manage:app diff --git a/aleph/core.py b/aleph/core.py index e94fb9e91c..8446227dab 100644 --- a/aleph/core.py +++ b/aleph/core.py @@ -26,6 +26,7 @@ from aleph.cache import Cache from aleph.oauth import configure_oauth from aleph.util import LoggingTransport +from aleph.metrics import PrometheusExtension import sentry_sdk from sentry_sdk.integrations.flask import FlaskIntegration @@ -39,6 +40,7 @@ mail = Mail() babel = Babel() talisman = Talisman() +prometheus = PrometheusExtension() def determine_locale(): @@ -94,6 +96,7 @@ def create_app(config=None): mail.init_app(app) db.init_app(app) babel.init_app(app, locale_selector=determine_locale) + prometheus.init_app(app) CORS( app, resources=r"/api/*", diff --git a/aleph/metrics/__init__.py b/aleph/metrics/__init__.py new file mode 100644 index 0000000000..5412e325c0 --- /dev/null +++ b/aleph/metrics/__init__.py @@ -0,0 +1,96 @@ +from urllib.parse import urlparse +from flask import request +from werkzeug.exceptions import NotFound +from timeit import default_timer +from prometheus_client import ( + generate_latest, + CollectorRegistry, + Counter, + Histogram, + CONTENT_TYPE_LATEST, +) +from prometheus_client.multiprocess import MultiProcessCollector +from aleph.settings import SETTINGS + +REQUEST_DURATION = Histogram( + "http_request_duration_seconds", + "Duration of requests to the Aleph API in seconds", + ["method", "status", "endpoint"], +) + +REQUEST = Counter( + "http_request_total", + "Total number of Aleph API requests", + ["method", "status", "endpoint", "logged_in"], +) + + +def before_request(): + request.prometheus_start_time = default_timer() + + +def after_request(response): + endpoint = request.endpoint + + # Do not track request duration for the metrics endpoint + if endpoint == "metrics": + return response + + method = request.method + status = response.status_code + logged_in = request.authz.logged_in + duration = max(0, default_timer() - request.prometheus_start_time) + + REQUEST.labels(method, status, endpoint, logged_in).inc() + REQUEST_DURATION.labels(method, status, endpoint).observe(duration) + + return response + + +def create_metrics_endpoint(): + # Hacky workaround to prevent circular imports on app startup + from .collectors import InfoCollector, DatabaseCollector, QueuesCollector + + def metrics(): + # Make metrics available on internal port only + url = urlparse(request.url) + + if url.port != SETTINGS.PROMETHEUS_PORT: + raise NotFound() + + # The Prometheus client does not support using custom collectors in multi-process + # mode. We work around that by setting up two registries. The default registry uses + # a multi-process collector to collect metrics such as request durations etc. across + # all application processes. The second is a single-process registry for use with + # our custom collectors. + custom_collectors_registry = CollectorRegistry() + custom_collectors_registry.register(InfoCollector()) + custom_collectors_registry.register(DatabaseCollector()) + custom_collectors_registry.register(QueuesCollector()) + + default_registry = CollectorRegistry() + MultiProcessCollector(default_registry) + + registries = [custom_collectors_registry, default_registry] + body = "\n".join(generate_latest(r).decode("utf-8") for r in registries) + headers = {"Content-Type": CONTENT_TYPE_LATEST} + + return body, 200, headers + + return metrics + + +class PrometheusExtension: + def __init__(self, app=None): + if app is not None: + self.init_app(app) + + def init_app(self, app): + if not SETTINGS.PROMETHEUS_ENABLED: + return + + app.before_request(before_request) + app.after_request(after_request) + + metrics_endpoint = create_metrics_endpoint() + app.add_url_rule("/metrics", view_func=metrics_endpoint) diff --git a/aleph/metrics/collectors.py b/aleph/metrics/collectors.py new file mode 100644 index 0000000000..f3ead27e1c --- /dev/null +++ b/aleph/metrics/collectors.py @@ -0,0 +1,181 @@ +from sqlalchemy import func +from prometheus_client.core import GaugeMetricFamily, InfoMetricFamily +from followthemoney import __version__ as ftm_version + +from aleph import __version__ as aleph_version +from aleph.core import create_app +from aleph.queues import get_active_dataset_status +from aleph.model import Role, Collection, EntitySet, Bookmark + + +class InfoCollector(object): + def collect(self): + yield InfoMetricFamily( + "aleph_system", + "Aleph system information", + value={ + "aleph_version": aleph_version, + "ftm_version": ftm_version, + }, + ) + + +class DatabaseCollector(object): + PREFIX = "aleph_" + + def __init__(self): + self._flask_app = create_app() + + def collect(self): + with self._flask_app.app_context(): + yield self._users() + yield self._collections() + yield self._collection_users() + yield self._entitysets() + yield self._entityset_users() + yield self._bookmarks() + yield self._bookmark_users() + + def _users(self): + return GaugeMetricFamily( + self.PREFIX + "users", + "Total number of users", + value=Role.all_users().count(), + ) + + def _collections(self): + gauge = GaugeMetricFamily( + self.PREFIX + "collections", + "Total number of collections by category", + labels=["category"], + ) + + query = ( + Collection.all() + .with_entities(Collection.category, func.count()) + .group_by(Collection.category) + ) + + for category, count in query: + gauge.add_metric([category], count) + + return gauge + + def _collection_users(self): + gauge = GaugeMetricFamily( + self.PREFIX + "collection_users", + "Total number of users that have created at least one collection", + labels=["category"], + ) + + query = ( + Collection.all() + .with_entities( + Collection.category, + func.count(func.distinct(Collection.creator_id)), + ) + .group_by(Collection.category) + ) + + for category, count in query: + gauge.add_metric([category], count) + + return gauge + + def _entitysets(self): + gauge = GaugeMetricFamily( + self.PREFIX + "entitysets", + "Total number of entity set by type", + labels=["type"], + ) + + query = ( + EntitySet.all() + .with_entities(EntitySet.type, func.count()) + .group_by(EntitySet.type) + ) + + for entityset_type, count in query: + gauge.add_metric([entityset_type], count) + + return gauge + + def _entityset_users(self): + gauge = GaugeMetricFamily( + self.PREFIX + "entityset_users", + "Number of users that have created at least on entity set of the given type", + labels=["type"], + ) + + query = ( + EntitySet.all() + .with_entities( + EntitySet.type, + func.count(func.distinct(EntitySet.role_id)), + ) + .group_by(EntitySet.type) + ) + + for entityset_type, count in query: + gauge.add_metric([entityset_type], count) + + return gauge + + def _bookmarks(self): + return GaugeMetricFamily( + self.PREFIX + "bookmarks", + "Total number of bookmarks", + value=Bookmark.query.count(), + ) + + def _bookmark_users(self): + return GaugeMetricFamily( + self.PREFIX + "bookmark_users", + "Number of users that have created at least one bookmark", + value=Bookmark.query.distinct(Bookmark.role_id).count(), + ) + + +class QueuesCollector(object): + PREFIX = "queues_" + + def collect(self): + status = get_active_dataset_status() + + yield GaugeMetricFamily( + self.PREFIX + "active_datasets", + "Total number of active datasets", + value=status["total"], + ) + + stages = {} + + for collection_status in status["datasets"].values(): + for job_status in collection_status["jobs"]: + for stage_status in job_status["stages"]: + stage = stage_status["stage"] + pending = stage_status["pending"] + running = stage_status["running"] + + if stage not in stages: + stages[stage] = { + "pending": 0, + "running": 0, + } + + stages[stage] = { + "pending": stages[stage].get("pending") + pending, + "running": stages[stage].get("running") + running, + } + + tasks_gauge = GaugeMetricFamily( + self.PREFIX + "tasks", + "Total number of pending or running tasks in a given stage", + labels=["stage", "status"], + ) + + for stage, tasks in stages.items(): + tasks_gauge.add_metric([stage, "pending"], tasks["pending"]) + tasks_gauge.add_metric([stage, "running"], tasks["running"]) + + yield tasks_gauge diff --git a/aleph/settings.py b/aleph/settings.py index f80e272530..203e84972d 100644 --- a/aleph/settings.py +++ b/aleph/settings.py @@ -221,9 +221,15 @@ def __init__(self) -> None: self.FEEDBACK_URL_TIMELINES = env.get("ALEPH_FEEDBACK_URL_TIMELINES", None) ############################################################################### - # Additional configurations + # Instrumentation and observability self.SENTRY_DSN = env.get("SENTRY_DSN", None) self.SENTRY_ENVIRONMENT = env.get("SENTRY_ENVIRONMENT", "") + + self.PROMETHEUS_ENABLED = env.to_bool("PROMETHEUS_ENABLED", False) + self.PROMETHEUS_PORT = env.to_int("PROMETHEUS_PORT", 9100) + + ############################################################################### + # Additional configuration string_prefix = env.get("ALEPH_STRING_CONFIG_PREFIX") json_prefix = env.get("ALEPH_JSON_CONFIG_PREFIX") if string_prefix or json_prefix: diff --git a/aleph/tests/test_metrics.py b/aleph/tests/test_metrics.py new file mode 100644 index 0000000000..e21cb2ac92 --- /dev/null +++ b/aleph/tests/test_metrics.py @@ -0,0 +1,177 @@ +import os + +from prometheus_client import CollectorRegistry + +from aleph.tests.util import TestCase +from aleph.settings import SETTINGS +from aleph.metrics.collectors import DatabaseCollector, QueuesCollector +from aleph.model import Role, Bookmark, EntitySet +from aleph.core import db +from aleph.queues import get_stage + + +class MetricsTestCase(TestCase): + def test_metrics_endpoint_enabled(self): + # Depending on the value of `PROMETHEUS_ENABLED` the metrics endpoint + # is mounted app when the Flask app is initialized, so we need to recreate + # it after changing the value of the setting. + SETTINGS.PROMETHEUS_ENABLED = False + self.init_app() + + res = self.client.get("/metrics", base_url="http://localhost:9100") + assert res.status_code == 404 + + res = self.client.get("/metrics", base_url="http://localhost:5000") + assert res.status_code == 404 + + SETTINGS.PROMETHEUS_ENABLED = True + os.environ["PROMETHEUS_MULTIPROC_DIR"] = "/var/run" + self.init_app() + + res = self.client.get("/metrics", base_url="http://localhost:9100") + assert res.status_code == 200 + + res = self.client.get("/metrics", base_url="http://localhost:5000") + assert res.status_code == 404 + + def test_users(self): + reg = CollectorRegistry() + reg.register(DatabaseCollector()) + + # Sanity check: Aleph creates a superuser by default + users = list(Role.all_users()) + assert users[0].foreign_id == "system:aleph" + + reg.collect() + assert reg.get_sample_value("aleph_users") == 1 + + self.create_user() + + reg.collect() + assert reg.get_sample_value("aleph_users") == 2 + + def test_collections(self): + reg = CollectorRegistry() + reg.register(DatabaseCollector()) + labels = {"category": "casefile"} + + reg.collect() + counter = reg.get_sample_value("aleph_collections", labels) + users = reg.get_sample_value("aleph_collection_users", labels) + assert counter is None, counter + assert counter is None, counter + + user = self.create_user() + self.create_collection(creator=user) + self.create_collection(creator=user) + + reg.collect() + counter = reg.get_sample_value("aleph_collections", labels) + users = reg.get_sample_value("aleph_collection_users", labels) + assert counter == 2, counter + assert users == 1, users + + def test_entitysets(self): + reg = CollectorRegistry() + reg.register(DatabaseCollector()) + + reg.collect() + count = reg.get_sample_value("aleph_entitysets", {"type": "diagram"}) + users = reg.get_sample_value("aleph_entityset_users", {"type": "diagram"}) + assert count is None, count + assert users is None, users + + user = self.create_user() + col = self.create_collection(creator=user) + + entityset = EntitySet( + id="1", + role_id=user.id, + collection_id=col.id, + type="diagram", + label="Test Diagram 1", + ) + db.session.add(entityset) + entityset = EntitySet( + id="2", + collection_id=col.id, + role_id=user.id, + type="diagram", + label="Test Diagram 2", + ) + db.session.add(entityset) + db.session.commit() + + reg.collect() + count = reg.get_sample_value("aleph_entitysets", {"type": "diagram"}) + users = reg.get_sample_value("aleph_entityset_users", {"type": "diagram"}) + assert count == 2, count + assert users == 1, users + + def test_bookmarks(self): + reg = CollectorRegistry() + reg.register(DatabaseCollector()) + + reg.collect() + count = reg.get_sample_value("aleph_bookmarks") + users = reg.get_sample_value("aleph_bookmark_users") + assert count == 0, count + assert users == 0, users + + user = self.create_user() + col = self.create_collection(user, label="Test Collection") + + company = self.create_entity(data={"schema": "Company"}, collection=col) + person = self.create_entity(data={"schema": "Person"}, collection=col) + + bookmark = Bookmark(entity_id=company.id, collection_id=col.id, role_id=user.id) + db.session.add(bookmark) + bookmark = Bookmark(entity_id=person.id, collection_id=col.id, role_id=user.id) + db.session.add(bookmark) + db.session.commit() + + reg.collect() + count = reg.get_sample_value("aleph_bookmarks") + users = reg.get_sample_value("aleph_bookmark_users") + assert count == 2, count + assert users == 1, users + + def test_queue_tasks(self): + reg = CollectorRegistry() + reg.register(QueuesCollector()) + + reg.collect() + count = reg.get_sample_value( + "queue_tasks", {"stage": "index", "status": "pending"} + ) + assert count is None, count + count = reg.get_sample_value( + "queue_tasks", {"stage": "index", "status": "pending"} + ) + assert count is None, count + + col = self.create_collection() + entity = self.create_entity(data={"schema": "Company"}, collection=col) + + stage = get_stage(collection=col, stage="index") + stage.queue({"entity_id": entity.id}) + + reg.collect() + count = reg.get_sample_value( + "queues_tasks", {"stage": "index", "status": "pending"} + ) + assert count == 1, count + + # Fetch tasks from queue and mark them as running + tasks = stage.get_tasks(limit=1) + assert len(tasks) == 1, tasks + + reg.collect() + count = reg.get_sample_value( + "queues_tasks", {"stage": "index", "status": "pending"} + ) + assert count == 0, count + count = reg.get_sample_value( + "queues_tasks", {"stage": "index", "status": "running"} + ) + assert count == 1, count diff --git a/aleph/tests/util.py b/aleph/tests/util.py index 17516fc4e5..64539c4bf0 100644 --- a/aleph/tests/util.py +++ b/aleph/tests/util.py @@ -220,6 +220,17 @@ def load_fixtures(self): aggregator.put(sample, fragment="sample") reindex_collection(self.private_coll, sync=True) + def init_app(self): + self.app = self.create_app() + + self._orig_response_class = self.app.response_class + self.app.response_class = _make_test_response(self.app.response_class) + + self.client = self.app.test_client() + + self._ctx = self.app.test_request_context() + self._ctx.push() + def setUp(self): if not hasattr(SETTINGS, "_global_test_state"): SETTINGS._global_test_state = True @@ -274,15 +285,7 @@ def debug(self): self._post_teardown() def _pre_setup(self): - self.app = self.create_app() - - self._orig_response_class = self.app.response_class - self.app.response_class = _make_test_response(self.app.response_class) - - self.client = self.app.test_client() - - self._ctx = self.app.test_request_context() - self._ctx.push() + self.init_app() def _post_teardown(self): if getattr(self, "_ctx", None) is not None: diff --git a/docker-compose.yml b/docker-compose.yml index e571239a1f..e8f448d735 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -73,7 +73,6 @@ services: api: image: ghcr.io/alephdata/aleph:${ALEPH_TAG:-3.15.4} - command: gunicorn -w 6 -b 0.0.0.0:8000 --timeout 3600 --log-level debug --log-file - aleph.wsgi:app expose: - 8000 depends_on: diff --git a/gunicorn.conf.py b/gunicorn.conf.py new file mode 100644 index 0000000000..b43e4ac0fe --- /dev/null +++ b/gunicorn.conf.py @@ -0,0 +1,34 @@ +import os +from prometheus_client import multiprocess + + +_PROM_ENABLED = os.environ.get("PROMETHEUS_ENABLED", False) +_PROM_MULTIPROC_DIR = os.environ.get("PROMETHEUS_MULTIPROC_DIR", None) + +wsgi_app = "aleph.wsgi:app" +bind = "0.0.0.0:8000" +timeout = 3600 + +if _PROM_ENABLED: + # Gunicorn will bind to port 8000 (the default, publicly exposed port) + # and port 9100 (which is accessible internally only). The metrics endpoint + # is only available on the internal port. + bind = ["0.0.0.0:8000", "0.0.0.0:9100"] + + # In multiprocess mode, the Prometheus client writes metrics to to the filesystem + # to aggregate them across processes. We need to ensure the directory used by the + # client exists and is empty on application startup. We also need to notify the + # client if a Gunicorn worker process exits so that the client clean up data related + # to that process. + # + # For more information see: + # https://github.com/prometheus/client_python#multiprocess-mode-eg-gunicorn + def on_starting(_): + if _PROM_ENABLED and _PROM_MULTIPROC_DIR: + for file in os.scandir(_PROM_MULTIPROC_DIR): + os.unlink(file.path) + + + def child_exit(_, worker): + if _PROM_ENABLED and _PROM_MULTIPROC_DIR: + multiprocess.mark_process_dead(worker.pid) diff --git a/requirements.txt b/requirements.txt index d5b3142cb8..69d22e7749 100644 --- a/requirements.txt +++ b/requirements.txt @@ -36,6 +36,7 @@ requests[security] >= 2.25.1, < 3.0.0 tabulate==0.9.0 zipstream-new==1.1.8 sentry-sdk[flask]==1.31.0 +prometheus-client==0.17.1 # Testing dependencies factory-boy >=3.2.0, < 4.0.0 From 35ec9cad6eaa37872e3a9090e6a733d39ff55981 Mon Sep 17 00:00:00 2001 From: Till Prochaska Date: Thu, 21 Sep 2023 19:19:45 +0200 Subject: [PATCH 02/20] Fix missing bind argument --- aleph/migration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aleph/migration.py b/aleph/migration.py index 55aca6e230..da94980159 100644 --- a/aleph/migration.py +++ b/aleph/migration.py @@ -34,7 +34,7 @@ def destroy_db(): while len(tables): for table in tables: try: - table.drop(checkfirst=True) + table.drop(bind=db.engine, checkfirst=True) tables.remove(table) except InternalError: pass From 58d4ee7580dbab84801322bcafc5e9049bd69984 Mon Sep 17 00:00:00 2001 From: Till Prochaska Date: Mon, 16 Oct 2023 23:38:53 +0200 Subject: [PATCH 03/20] Run Prometheus exporter as a separate service --- aleph/core.py | 2 +- aleph/metrics/__init__.py | 96 ------------------------------------- aleph/metrics/collectors.py | 4 +- aleph/metrics/exporter.py | 16 +++++++ aleph/metrics/flask.py | 76 +++++++++++++++++++++++++++++ 5 files changed, 95 insertions(+), 99 deletions(-) create mode 100644 aleph/metrics/exporter.py create mode 100644 aleph/metrics/flask.py diff --git a/aleph/core.py b/aleph/core.py index 8446227dab..e8ec8af566 100644 --- a/aleph/core.py +++ b/aleph/core.py @@ -26,7 +26,7 @@ from aleph.cache import Cache from aleph.oauth import configure_oauth from aleph.util import LoggingTransport -from aleph.metrics import PrometheusExtension +from aleph.metrics.flask import PrometheusExtension import sentry_sdk from sentry_sdk.integrations.flask import FlaskIntegration diff --git a/aleph/metrics/__init__.py b/aleph/metrics/__init__.py index 5412e325c0..e69de29bb2 100644 --- a/aleph/metrics/__init__.py +++ b/aleph/metrics/__init__.py @@ -1,96 +0,0 @@ -from urllib.parse import urlparse -from flask import request -from werkzeug.exceptions import NotFound -from timeit import default_timer -from prometheus_client import ( - generate_latest, - CollectorRegistry, - Counter, - Histogram, - CONTENT_TYPE_LATEST, -) -from prometheus_client.multiprocess import MultiProcessCollector -from aleph.settings import SETTINGS - -REQUEST_DURATION = Histogram( - "http_request_duration_seconds", - "Duration of requests to the Aleph API in seconds", - ["method", "status", "endpoint"], -) - -REQUEST = Counter( - "http_request_total", - "Total number of Aleph API requests", - ["method", "status", "endpoint", "logged_in"], -) - - -def before_request(): - request.prometheus_start_time = default_timer() - - -def after_request(response): - endpoint = request.endpoint - - # Do not track request duration for the metrics endpoint - if endpoint == "metrics": - return response - - method = request.method - status = response.status_code - logged_in = request.authz.logged_in - duration = max(0, default_timer() - request.prometheus_start_time) - - REQUEST.labels(method, status, endpoint, logged_in).inc() - REQUEST_DURATION.labels(method, status, endpoint).observe(duration) - - return response - - -def create_metrics_endpoint(): - # Hacky workaround to prevent circular imports on app startup - from .collectors import InfoCollector, DatabaseCollector, QueuesCollector - - def metrics(): - # Make metrics available on internal port only - url = urlparse(request.url) - - if url.port != SETTINGS.PROMETHEUS_PORT: - raise NotFound() - - # The Prometheus client does not support using custom collectors in multi-process - # mode. We work around that by setting up two registries. The default registry uses - # a multi-process collector to collect metrics such as request durations etc. across - # all application processes. The second is a single-process registry for use with - # our custom collectors. - custom_collectors_registry = CollectorRegistry() - custom_collectors_registry.register(InfoCollector()) - custom_collectors_registry.register(DatabaseCollector()) - custom_collectors_registry.register(QueuesCollector()) - - default_registry = CollectorRegistry() - MultiProcessCollector(default_registry) - - registries = [custom_collectors_registry, default_registry] - body = "\n".join(generate_latest(r).decode("utf-8") for r in registries) - headers = {"Content-Type": CONTENT_TYPE_LATEST} - - return body, 200, headers - - return metrics - - -class PrometheusExtension: - def __init__(self, app=None): - if app is not None: - self.init_app(app) - - def init_app(self, app): - if not SETTINGS.PROMETHEUS_ENABLED: - return - - app.before_request(before_request) - app.after_request(after_request) - - metrics_endpoint = create_metrics_endpoint() - app.add_url_rule("/metrics", view_func=metrics_endpoint) diff --git a/aleph/metrics/collectors.py b/aleph/metrics/collectors.py index f3ead27e1c..f31a062173 100644 --- a/aleph/metrics/collectors.py +++ b/aleph/metrics/collectors.py @@ -3,7 +3,7 @@ from followthemoney import __version__ as ftm_version from aleph import __version__ as aleph_version -from aleph.core import create_app +from aleph.core import create_app as create_flask_app from aleph.queues import get_active_dataset_status from aleph.model import Role, Collection, EntitySet, Bookmark @@ -24,7 +24,7 @@ class DatabaseCollector(object): PREFIX = "aleph_" def __init__(self): - self._flask_app = create_app() + self._flask_app = create_flask_app() def collect(self): with self._flask_app.app_context(): diff --git a/aleph/metrics/exporter.py b/aleph/metrics/exporter.py new file mode 100644 index 0000000000..7ba729ef8d --- /dev/null +++ b/aleph/metrics/exporter.py @@ -0,0 +1,16 @@ +from prometheus_client import make_wsgi_app +from prometheus_client.core import CollectorRegistry + +from aleph.metrics.collectors import InfoCollector, DatabaseCollector, QueuesCollector + + +def create_app(): + registry = CollectorRegistry() + registry.register(InfoCollector()) + registry.register(DatabaseCollector()) + registry.register(QueuesCollector()) + + return make_wsgi_app(registry=registry) + + +app = create_app() diff --git a/aleph/metrics/flask.py b/aleph/metrics/flask.py new file mode 100644 index 0000000000..6a4d4511a6 --- /dev/null +++ b/aleph/metrics/flask.py @@ -0,0 +1,76 @@ +from urllib.parse import urlparse +from flask import request +from werkzeug.exceptions import NotFound +from timeit import default_timer +from prometheus_client import ( + generate_latest, + CollectorRegistry, + Counter, + Histogram, + CONTENT_TYPE_LATEST, +) +from prometheus_client.multiprocess import MultiProcessCollector +from aleph.settings import SETTINGS + +METRICS_ENDPOINT_NAME = "metrics" + +REQUEST_DURATION = Histogram( + "http_request_duration_seconds", + "Duration of requests to the Aleph API in seconds", + ["method", "status", "endpoint"], +) + +REQUEST = Counter( + "http_request_total", + "Total number of Aleph API requests", + ["method", "status", "endpoint", "logged_in"], +) + + +def before_request(): + request.prometheus_start_time = default_timer() + + +def after_request(response): + endpoint = request.endpoint + + # Do not track request duration for the metrics endpoint + if endpoint == METRICS_ENDPOINT_NAME: + return response + + method = request.method + status = response.status_code + logged_in = request.authz.logged_in + duration = max(0, default_timer() - request.prometheus_start_time) + + REQUEST.labels(method, status, endpoint, logged_in).inc() + REQUEST_DURATION.labels(method, status, endpoint).observe(duration) + + return response + + +def metrics(): + # Make metrics available on internal port only + url = urlparse(request.url) + + if url.port != SETTINGS.PROMETHEUS_PORT: + raise NotFound() + + registry = CollectorRegistry() + MultiProcessCollector(registry) + + body = generate_latest(registry).decode("utf-8") + headers = {"Content-Type": CONTENT_TYPE_LATEST} + + return body, 200, headers + + +class PrometheusExtension: + def init_app(self, app): + if not SETTINGS.PROMETHEUS_ENABLED: + return + + app.before_request(before_request) + app.after_request(after_request) + + app.add_url_rule("/metrics", endpoint=METRICS_ENDPOINT_NAME, view_func=metrics) From 21202a8bad469b4ab97eb041746258f51e89047b Mon Sep 17 00:00:00 2001 From: Till Prochaska Date: Tue, 17 Oct 2023 00:03:03 +0200 Subject: [PATCH 04/20] Expose number of streaming requests and number of streamed entities as metrics --- aleph/views/stream_api.py | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/aleph/views/stream_api.py b/aleph/views/stream_api.py index ebc88d6dda..e2bd2c4151 100644 --- a/aleph/views/stream_api.py +++ b/aleph/views/stream_api.py @@ -1,6 +1,7 @@ import logging from banal import ensure_list from flask import Blueprint, request +from prometheus_client import Counter from aleph.index.entities import iter_entities, PROXY_INCLUDES from aleph.views.util import require, stream_ijson @@ -8,6 +9,16 @@ log = logging.getLogger(__name__) blueprint = Blueprint("bulk_api", __name__) +STREAMED_ENTITIES = Counter( + "aleph_streamed_entities_total", + "Total number of streamed entities", +) + +STREAMS = Counter( + "aleph_streams_total", + "Total number of entity streaming requests", +) + @blueprint.route("/api/2/entities/_stream") @blueprint.route("/api/2/collections//_stream") @@ -53,4 +64,21 @@ def entities(collection_id=None): schemata=schemata, includes=includes, ) - return stream_ijson(entities) + + STREAMS.inc() + + def generator(entities): + count = 0 + + for entity in entities: + count += 1 + + if count == 1000: + STREAMED_ENTITIES.inc(count) + count = 0 + + yield entity + + STREAMED_ENTITIES.inc(count) + + return stream_ijson(generator(entities)) From 74e9eb75c4389f8a2c5855abd6fdc3b4478d9d40 Mon Sep 17 00:00:00 2001 From: Till Prochaska Date: Tue, 17 Oct 2023 00:25:26 +0200 Subject: [PATCH 05/20] Expose number of auth attempts as Prometheus metrics --- aleph/views/sessions_api.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/aleph/views/sessions_api.py b/aleph/views/sessions_api.py index 758a24df44..28a8b44ad4 100644 --- a/aleph/views/sessions_api.py +++ b/aleph/views/sessions_api.py @@ -4,6 +4,7 @@ from flask import Blueprint, redirect, request, session from authlib.common.errors import AuthlibBaseError from werkzeug.exceptions import Unauthorized, BadRequest +from prometheus_client import Counter from aleph.settings import SETTINGS from aleph.core import db, url_for, cache @@ -18,6 +19,12 @@ log = logging.getLogger(__name__) blueprint = Blueprint("sessions_api", __name__) +AUTH_ATTEMPS = Counter( + "aleph_auth_attemps_total", + "Total number of successful/failed authentication attemps", + ["method", "result"], +) + def _oauth_session(token): return cache.key("oauth-sess", token) @@ -58,10 +65,12 @@ def password_login(): data = parse_request("Login") role = Role.login(data.get("email"), data.get("password")) if role is None: + AUTH_ATTEMPS.labels("password", "failed").inc() raise BadRequest(gettext("Invalid user or password.")) role.touch() db.session.commit() + AUTH_ATTEMPS.labels("password", "success").inc() update_role(role) authz = Authz.from_role(role) return jsonify({"status": "ok", "token": authz.to_token()}) @@ -95,6 +104,7 @@ def oauth_callback(): err = Unauthorized(gettext("Authentication has failed.")) state = cache.get_complex(_oauth_session(request.args.get("state"))) if state is None: + AUTH_ATTEMPS.labels("oauth", "failed").inc() raise err try: @@ -102,18 +112,22 @@ def oauth_callback(): uri = state.get("redirect_uri") oauth_token = oauth.provider.authorize_access_token(redirect_uri=uri) except AuthlibBaseError as err: + AUTH_ATTEMPS.labels("oauth", "failed").inc() log.warning("Failed OAuth: %r", err) raise err if oauth_token is None or isinstance(oauth_token, AuthlibBaseError): + AUTH_ATTEMPS.labels("oauth", "failed").inc() log.warning("Failed OAuth: %r", oauth_token) raise err role = handle_oauth(oauth.provider, oauth_token) if role is None: + AUTH_ATTEMPS.labels("oauth", "failed").inc() raise err db.session.commit() update_role(role) + AUTH_ATTEMPS.labels("oauth", "success").inc() log.debug("Logged in: %r", role) request.authz = Authz.from_role(role) token = request.authz.to_token() From e0769a778fa050999838134acfdbe3f83f35a75b Mon Sep 17 00:00:00 2001 From: Till Prochaska Date: Sun, 19 Nov 2023 12:06:20 +0100 Subject: [PATCH 06/20] Update Helm chart to expose metrics endpoints, setup ServiceMonitors --- helm/charts/aleph/templates/api.yaml | 41 ++++++- helm/charts/aleph/templates/exporter.yaml | 110 +++++++++++++++++++ helm/charts/aleph/templates/ingest-file.yaml | 37 +++++++ helm/charts/aleph/templates/worker.yaml | 39 ++++++- helm/charts/aleph/values.yaml | 33 ++++++ 5 files changed, 256 insertions(+), 4 deletions(-) create mode 100644 helm/charts/aleph/templates/exporter.yaml diff --git a/helm/charts/aleph/templates/api.yaml b/helm/charts/aleph/templates/api.yaml index 608ec34c7a..d0a3574c19 100644 --- a/helm/charts/aleph/templates/api.yaml +++ b/helm/charts/aleph/templates/api.yaml @@ -24,27 +24,30 @@ spec: imagePullPolicy: {{ .Values.api.image.pullPolicy }} command: - gunicorn + - "--config" + - "/aleph/gunicorn.conf.py" - "--timeout" - "3600" - "--keep-alive" - "60" - "--threads" - "10" - - "--bind" - - 0.0.0.0:8000 - --log-level - warning - --log-file - "-" - --access-logfile - "-" - - aleph.wsgi:app volumeMounts: {{ if .Values.global.google }} - mountPath: /var/secrets/google name: service-account-aleph readOnly: true {{ end }} + {{ if .Values.global.prometheus.enabled }} + - mountPath: /run/prometheus + name: prometheus-multiproc-dir + {{ end }} - mountPath: /tmp name: tmp-volume - mountPath: /home/app @@ -108,6 +111,12 @@ spec: name: aleph-secrets key: AWS_SECRET_ACCESS_KEY {{ end }} + {{ if .Values.global.prometheus.enabled }} + - name: PROMETHEUS_ENABLED + value: "true" + - name: PROMETHEUS_MULTIPROC_DIR + value: "/run/prometheus" + {{ end }} readinessProbe: httpGet: path: /healthz?ready @@ -129,6 +138,10 @@ spec: secret: secretName: service-account-aleph {{ end }} + {{ if .Values.global.prometheus.enabled }} + - name: prometheus-multiproc-dir + emptyDir: {} + {{ end }} - name: tmp-volume emptyDir: {} - name: home-volume @@ -161,3 +174,25 @@ spec: targetPort: 8000 protocol: "TCP" name: api + {{ if .Values.global.prometheus.enabled }} + - port: 9100 + targetPort: 9100 + protocol: "TCP" + name: metrics + {{ end }} +--- +{{ if .Values.global.serviceMonitor.enabled }} +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: {{ .Values.global.namingPrefix }}-api + labels: + app: {{ .Values.global.namingPrefix }}-api +spec: + selector: + matchLabels: + app: {{ .Values.global.namingPrefix }}-api + endpoints: + - interval: "30s" + port: metrics +{{ end }} diff --git a/helm/charts/aleph/templates/exporter.yaml b/helm/charts/aleph/templates/exporter.yaml new file mode 100644 index 0000000000..6f13872c43 --- /dev/null +++ b/helm/charts/aleph/templates/exporter.yaml @@ -0,0 +1,110 @@ +{{ if .Values.exporter.enabled }} +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .Values.global.namingPrefix }}-exporter + labels: + app: {{ .Values.global.namingPrefix }}-exporter +spec: + replicas: 1 + selector: + matchLabels: + app: {{ .Values.global.namingPrefix }}-exporter + template: + metadata: + labels: + app: {{ .Values.global.namingPrefix }}-exporter + annotations: {{- toYaml .Values.exporter.podAnnotations | nindent 8 }} + spec: + nodeSelector: {{- toYaml .Values.exporter.nodeSelector | nindent 8 }} + restartPolicy: Always + securityContext: {{- toYaml .Values.exporter.podSecurityContext | nindent 8 }} + containers: + - name: {{ .Chart.Name }} + image: "{{ .Values.global.image.repository }}:{{ .Values.global.image.tag }}" + imagePullPolicy: {{ .Values.global.image.pullPolicy }} + command: + - "gunicorn" + - "--bind" + - "0.0.0.0:9100" + - "--log-level" + - "warn" + - "--log-file" + - "-" + - "aleph.metrics.exporter:app" + volumeMounts: + - mountPath: /tmp + name: tmp-volume + securityContext: + {{- toYaml .Values.exporter.containerSecurityContext | nindent 12 }} + resources: + {{- toYaml .Values.exporter.containerResources | nindent 12 }} + env: + {{- range $key, $value := .Values.global.commonEnv }} + - name: {{ $key }} + value: {{ $value | quote }} + {{- end }} + {{- range $key, $value := .Values.global.env }} + - name: {{ $key }} + value: {{ $value | quote }} + {{- end }} + - name: ALEPH_DATABASE_URI + valueFrom: + secretKeyRef: + name: aleph-secrets + key: ALEPH_DATABASE_URI + - name: ALEPH_SECRET_KEY + valueFrom: + secretKeyRef: + name: aleph-secrets + key: ALEPH_SECRET_KEY + - name: SENTRY_DSN + valueFrom: + secretKeyRef: + name: aleph-secrets + key: SENTRY_DSN + readinessProbe: + httpGet: + path: /metrics + port: 9100 + initialDelaySeconds: 5 + livenessProbe: + httpGet: + path: /metrics + port: 9100 + initialDelaySeconds: 5 + volumes: + - name: tmp-volume + emptyDir: {} +--- +apiVersion: v1 +kind: Service +metadata: + name: {{ .Values.global.namingPrefix }}-exporter + labels: + app: {{ .Values.global.namingPrefix }}-exporter +spec: + selector: + app: {{ .Values.global.namingPrefix }}-exporter + ports: + - port: 9100 + targetPort: 9100 + protocol: "TCP" + name: metrics +{{ end }} +--- +{{ if and .Values.exporter.enabled .Values.global.serviceMonitor.enabled }} +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: {{ .Values.global.namingPrefix }}-exporter + labels: + app: {{ .Values.global.namingPrefix }}-exporter +spec: + selector: + matchLabels: + app: {{ .Values.global.namingPrefix }}-exporter + endpoints: + - interval: "120s" + port: metrics +{{ end }} diff --git a/helm/charts/aleph/templates/ingest-file.yaml b/helm/charts/aleph/templates/ingest-file.yaml index 82edaef3b8..3aa0144767 100644 --- a/helm/charts/aleph/templates/ingest-file.yaml +++ b/helm/charts/aleph/templates/ingest-file.yaml @@ -79,6 +79,10 @@ spec: name: aleph-secrets key: AWS_SECRET_ACCESS_KEY {{ end }} + {{ if .Values.global.prometheus.enabled }} + - name: PROMETHEUS_ENABLED + value: "true" + {{ end }} volumes: {{ if .Values.global.google }} - name: service-account-aleph @@ -114,3 +118,36 @@ spec: app: {{ .Values.global.namingPrefix }}-ingest-file policyTypes: - Ingress +--- +{{ if .Values.global.prometheus.enabled }} +apiVersion: v1 +kind: Service +metadata: + name: {{ .Values.global.namingPrefix }}-ingest-file + labels: + app: {{ .Values.global.namingPrefix }}-ingest-file +spec: + selector: + app: {{ .Values.global.namingPrefix }}-ingest-file + ports: + - port: 9090 + targetPort: 9090 # TODO: Set to 9100 once servicelayer fix is released + protocol: "TCP" + name: metrics +{{ end }} +--- +{{ if .Values.global.serviceMonitor.enabled }} +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: {{ .Values.global.namingPrefix }}-ingest-file + labels: + app: {{ .Values.global.namingPrefix }}-ingest-file +spec: + selector: + matchLabels: + app: {{ .Values.global.namingPrefix }}-ingest-file + endpoints: + - interval: "30s" + port: metrics +{{ end }} diff --git a/helm/charts/aleph/templates/worker.yaml b/helm/charts/aleph/templates/worker.yaml index a286cf21c3..3eb230d305 100644 --- a/helm/charts/aleph/templates/worker.yaml +++ b/helm/charts/aleph/templates/worker.yaml @@ -22,7 +22,7 @@ spec: containers: - name: {{ .Chart.Name }} image: "{{ .Values.global.image.repository }}:{{ .Values.global.image.tag }}" - imagePullPolicy: Always + imagePullPolicy: {{ .Values.global.image.pullPolicy }} command: ["aleph", "worker"] volumeMounts: {{ if .Values.global.google }} @@ -93,6 +93,10 @@ spec: name: aleph-secrets key: AWS_SECRET_ACCESS_KEY {{ end }} + {{ if .Values.global.prometheus.enabled }} + - name: PROMETHEUS_ENABLED + value: "true" + {{ end }} volumes: {{ if .Values.global.google }} - name: service-account-aleph @@ -104,3 +108,36 @@ spec: medium: Memory - name: tmp-volume emptyDir: {} +--- +{{ if .Values.global.prometheus.enabled }} +apiVersion: v1 +kind: Service +metadata: + name: {{ .Values.global.namingPrefix }}-worker + labels: + app: {{ .Values.global.namingPrefix }}-worker +spec: + selector: + app: {{ .Values.global.namingPrefix }}-worker + ports: + - port: 9100 + targetPort: 9090 # TODO: Set to 9100 once servicelayer fix is released + protocol: "TCP" + name: metrics +{{ end }} +--- +{{ if .Values.global.serviceMonitor.enabled }} +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: {{ .Values.global.namingPrefix }}-worker + labels: + app: {{ .Values.global.namingPrefix }}-worker +spec: + selector: + matchLabels: + app: {{ .Values.global.namingPrefix }}-worker + endpoints: + - interval: "30s" + port: metrics +{{ end }} diff --git a/helm/charts/aleph/values.yaml b/helm/charts/aleph/values.yaml index 5965aab223..7bbea90fd5 100644 --- a/helm/charts/aleph/values.yaml +++ b/helm/charts/aleph/values.yaml @@ -13,6 +13,12 @@ global: LOG_FORMAT: JSON REDIS_URL: redis://aleph-redis-master.default.svc.cluster.local:6379/0 + prometheus: + enabled: false + + serviceMonitor: + enabled: false + env: ALEPH_DEBUG: "false" ALEPH_CACHE: "true" @@ -261,3 +267,30 @@ worker: env: WORKER_THREADS: 0 + +# Aleph Prometheus Exporter - templates/exporter.yaml +exporter: + enabled: false + + podAnnotations: {} + + nodeSelector: {} + + podSecurityContext: + runAsUser: 1000 + runAsGroup: 1000 + fsGroup: 1000 + + image: + pullPolicy: Always + + containerSecurityContext: + readOnlyRootFilesystem: true + allowPrivilegeEscalation: false + + containerResources: + requests: + memory: 250Mi + cpu: 10m + limits: + memory: 500Mi From bfd7a60e6244e15f6c0e63d0f5e167ed59bd563a Mon Sep 17 00:00:00 2001 From: Till Prochaska Date: Wed, 8 Nov 2023 18:20:42 +0100 Subject: [PATCH 07/20] Handle requests without Authz object gracefully --- aleph/metrics/flask.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/aleph/metrics/flask.py b/aleph/metrics/flask.py index 6a4d4511a6..c6a6b73aa8 100644 --- a/aleph/metrics/flask.py +++ b/aleph/metrics/flask.py @@ -40,7 +40,14 @@ def after_request(response): method = request.method status = response.status_code - logged_in = request.authz.logged_in + + logged_in = False + + # In theory, there should always be an Authz object. However in practice, + # this isn’t always the case, but I haven’t been able to reliably reproduce that. + if hasattr(request, "authz"): + logged_in = request.authz.logged_in + duration = max(0, default_timer() - request.prometheus_start_time) REQUEST.labels(method, status, endpoint, logged_in).inc() From 7bd852d8f90fe2711f69dc8604d66b377cfd3a17 Mon Sep 17 00:00:00 2001 From: Till Prochaska Date: Wed, 8 Nov 2023 18:28:18 +0100 Subject: [PATCH 08/20] Rename Prometheus label to "api_endpoint" to prevent naming clashes Prometheus Operator also uses the "endpoint" label and automatically renames "endpoint" labels exposed by the metrics endpoint to "exported_endpoints" which is ugly. --- aleph/metrics/flask.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/aleph/metrics/flask.py b/aleph/metrics/flask.py index c6a6b73aa8..d5506020c3 100644 --- a/aleph/metrics/flask.py +++ b/aleph/metrics/flask.py @@ -17,13 +17,13 @@ REQUEST_DURATION = Histogram( "http_request_duration_seconds", "Duration of requests to the Aleph API in seconds", - ["method", "status", "endpoint"], + ["method", "status", "api_endpoint"], ) REQUEST = Counter( "http_request_total", "Total number of Aleph API requests", - ["method", "status", "endpoint", "logged_in"], + ["method", "status", "api_endpoint", "logged_in"], ) @@ -32,10 +32,10 @@ def before_request(): def after_request(response): - endpoint = request.endpoint + api_endpoint = request.endpoint - # Do not track request duration for the metrics endpoint - if endpoint == METRICS_ENDPOINT_NAME: + # Do not track request duration for the metrics and healthz endpoints + if api_endpoint == METRICS_ENDPOINT_NAME or api_endpoint == "base_api.healthz": return response method = request.method @@ -50,8 +50,8 @@ def after_request(response): duration = max(0, default_timer() - request.prometheus_start_time) - REQUEST.labels(method, status, endpoint, logged_in).inc() - REQUEST_DURATION.labels(method, status, endpoint).observe(duration) + REQUEST.labels(method, status, api_endpoint, logged_in).inc() + REQUEST_DURATION.labels(method, status, api_endpoint).observe(duration) return response From de5538fa229a3f821b53166a8b1ee99ae04d71f8 Mon Sep 17 00:00:00 2001 From: Till Prochaska Date: Fri, 17 Nov 2023 20:01:10 +0100 Subject: [PATCH 09/20] Add xref metrics --- aleph/logic/xref.py | 53 ++++++++++++++++++++++++++++++++++++++++ aleph/search/__init__.py | 3 ++- 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/aleph/logic/xref.py b/aleph/logic/xref.py index 125900b527..43371fe3b1 100644 --- a/aleph/logic/xref.py +++ b/aleph/logic/xref.py @@ -4,6 +4,7 @@ from pprint import pformat, pprint # noqa from tempfile import mkdtemp from dataclasses import dataclass +from timeit import default_timer import followthemoney from followthemoney import model @@ -15,6 +16,7 @@ from followthemoney_compare.models import GLMBernoulli2EEvaluator from followthemoney.proxy import EntityProxy from servicelayer.archive.util import ensure_path +from prometheus_client import Counter, Histogram from aleph.core import es, db from aleph.settings import SETTINGS @@ -40,6 +42,36 @@ ORIGIN = "xref" MODEL = None FTM_VERSION_STR = f"ftm-{followthemoney.__version__}" +SCORE_CUTOFF = 0.5 + +XREF_ENTITIES = Counter( + "aleph_xref_entities_total", + "Total number of entities and mentions that have been xref'ed", +) + +XREF_MATCHES = Histogram( + "aleph_xref_matches", + "Number of matches per xref'ed entitiy or mention", + buckets=[ + # Listing 0 as a separate bucket size because it's interesting to know + # what percentage of entities result in no matches at all + 0, + 5, + 10, + 25, + 50, + ], +) + +XREF_CANDIDATES_QUERY_DURATION = Histogram( + "aleph_xref_candidates_query_duration_seconds", + "Processing duration of the candidates query (excl. network, serialization etc.)", +) + +XREF_CANDIDATES_QUERY_ROUNDTRIP_DURATION = Histogram( + "aleph_xref_candidates_query_roundtrip_duration_seconds", + "Roundtrip duration of the candidates query (incl. network, serialization etc.)", +) @dataclass @@ -102,7 +134,15 @@ def _query_item(entity, entitysets=True): query = {"query": query, "size": 50, "_source": ENTITY_SOURCE} schemata = list(entity.schema.matchable_schemata) index = entities_read_index(schema=schemata, expand=False) + + start_time = default_timer() result = es.search(index=index, body=query) + roundtrip_duration = max(0, default_timer() - start_time) + query_duration = result.get("took") + if query_duration is not None: + # ES returns milliseconds, but we track query time in seconds + query_duration = result.get("took") / 1000 + candidates = [] for result in result.get("hits").get("hits"): result = unpack_result(result) @@ -116,7 +156,9 @@ def _query_item(entity, entitysets=True): entity.caption, len(candidates), ) + results = _bulk_compare([(entity, c) for c in candidates]) + match_count = 0 for match, (score, doubt, method) in zip(candidates, results): log.debug( "Match: %s: %s <[%.2f]@%0.2f> %s", @@ -136,6 +178,17 @@ def _query_item(entity, entitysets=True): match=match, entityset_ids=entityset_ids, ) + if score > SCORE_CUTOFF: + # While we store all xref matches with a score > 0, we only count matches + # with a score above a threshold. This is in line with the user-facing behavior + # which also only shows matches above the threshold. + match_count += 1 + + XREF_ENTITIES.inc() + XREF_MATCHES.observe(match_count) + XREF_CANDIDATES_QUERY_ROUNDTRIP_DURATION.observe(roundtrip_duration) + if query_duration: + XREF_CANDIDATES_QUERY_DURATION.observe(query_duration) def _iter_mentions(collection): diff --git a/aleph/search/__init__.py b/aleph/search/__init__.py index 0f5c789f7c..dde387bfa7 100644 --- a/aleph/search/__init__.py +++ b/aleph/search/__init__.py @@ -10,6 +10,7 @@ from aleph.index.entities import ENTITY_SOURCE from aleph.logic.matching import match_query from aleph.logic.notifications import get_role_channels +from aleph.logic.xref import SCORE_CUTOFF from aleph.search.parser import QueryParser, SearchQueryParser # noqa from aleph.search.result import QueryResult, DatabaseQueryResult # noqa from aleph.search.query import Query @@ -116,7 +117,7 @@ class XrefQuery(Query): "score": "_score", } AUTHZ_FIELD = "match_collection_id" - SCORE_CUTOFF = 0.5 + SCORE_CUTOFF = SCORE_CUTOFF SOURCE = XREF_SOURCE def __init__(self, parser, collection_id=None): From 155ec42a037647e1d9652a5bc0a973cae0e1bada Mon Sep 17 00:00:00 2001 From: Till Prochaska Date: Sun, 19 Nov 2023 11:48:26 +0100 Subject: [PATCH 10/20] Use common prefix for all metric names MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Even though it is considered an anti-pattern to add a prefix with the name of the software or component to metrics (according to the official Prometheus documentation), I have decided to add a prefix. I’ve found that this makes it much easier to find relevant metrics. The main disadvantage of per-component prefixes queries become slightly more complex if you want to query the same metric (e.g. HTTP request duration) across multiple components. This isn’t super important in our case though, so I think the trade-off is acceptable. --- aleph/metrics/collectors.py | 22 +++++++++------------- aleph/tests/test_metrics.py | 12 ++++++------ 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/aleph/metrics/collectors.py b/aleph/metrics/collectors.py index f31a062173..4863976d32 100644 --- a/aleph/metrics/collectors.py +++ b/aleph/metrics/collectors.py @@ -21,8 +21,6 @@ def collect(self): class DatabaseCollector(object): - PREFIX = "aleph_" - def __init__(self): self._flask_app = create_flask_app() @@ -38,14 +36,14 @@ def collect(self): def _users(self): return GaugeMetricFamily( - self.PREFIX + "users", + "aleph_users", "Total number of users", value=Role.all_users().count(), ) def _collections(self): gauge = GaugeMetricFamily( - self.PREFIX + "collections", + "aleph_collections", "Total number of collections by category", labels=["category"], ) @@ -63,7 +61,7 @@ def _collections(self): def _collection_users(self): gauge = GaugeMetricFamily( - self.PREFIX + "collection_users", + "aleph_collection_users", "Total number of users that have created at least one collection", labels=["category"], ) @@ -84,7 +82,7 @@ def _collection_users(self): def _entitysets(self): gauge = GaugeMetricFamily( - self.PREFIX + "entitysets", + "aleph_entitysets", "Total number of entity set by type", labels=["type"], ) @@ -102,7 +100,7 @@ def _entitysets(self): def _entityset_users(self): gauge = GaugeMetricFamily( - self.PREFIX + "entityset_users", + "aleph_entityset_users", "Number of users that have created at least on entity set of the given type", labels=["type"], ) @@ -123,27 +121,25 @@ def _entityset_users(self): def _bookmarks(self): return GaugeMetricFamily( - self.PREFIX + "bookmarks", + "aleph_bookmarks", "Total number of bookmarks", value=Bookmark.query.count(), ) def _bookmark_users(self): return GaugeMetricFamily( - self.PREFIX + "bookmark_users", + "aleph_bookmark_users", "Number of users that have created at least one bookmark", value=Bookmark.query.distinct(Bookmark.role_id).count(), ) class QueuesCollector(object): - PREFIX = "queues_" - def collect(self): status = get_active_dataset_status() yield GaugeMetricFamily( - self.PREFIX + "active_datasets", + "aleph_active_datasets", "Total number of active datasets", value=status["total"], ) @@ -169,7 +165,7 @@ def collect(self): } tasks_gauge = GaugeMetricFamily( - self.PREFIX + "tasks", + "aleph_tasks", "Total number of pending or running tasks in a given stage", labels=["stage", "status"], ) diff --git a/aleph/tests/test_metrics.py b/aleph/tests/test_metrics.py index e21cb2ac92..58e0a501a9 100644 --- a/aleph/tests/test_metrics.py +++ b/aleph/tests/test_metrics.py @@ -136,17 +136,17 @@ def test_bookmarks(self): assert count == 2, count assert users == 1, users - def test_queue_tasks(self): + def test_tasks(self): reg = CollectorRegistry() reg.register(QueuesCollector()) reg.collect() count = reg.get_sample_value( - "queue_tasks", {"stage": "index", "status": "pending"} + "aleph_tasks", {"stage": "index", "status": "pending"} ) assert count is None, count count = reg.get_sample_value( - "queue_tasks", {"stage": "index", "status": "pending"} + "aleph_tasks", {"stage": "index", "status": "pending"} ) assert count is None, count @@ -158,7 +158,7 @@ def test_queue_tasks(self): reg.collect() count = reg.get_sample_value( - "queues_tasks", {"stage": "index", "status": "pending"} + "aleph_tasks", {"stage": "index", "status": "pending"} ) assert count == 1, count @@ -168,10 +168,10 @@ def test_queue_tasks(self): reg.collect() count = reg.get_sample_value( - "queues_tasks", {"stage": "index", "status": "pending"} + "aleph_tasks", {"stage": "index", "status": "pending"} ) assert count == 0, count count = reg.get_sample_value( - "queues_tasks", {"stage": "index", "status": "running"} + "aleph_tasks", {"stage": "index", "status": "running"} ) assert count == 1, count From 4c238d25a1e5db0a7db55cf1c73ccae0a0410453 Mon Sep 17 00:00:00 2001 From: Till Prochaska Date: Sun, 19 Nov 2023 11:48:42 +0100 Subject: [PATCH 11/20] Expose Python platform information as Prometheus metrics --- aleph/metrics/exporter.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/aleph/metrics/exporter.py b/aleph/metrics/exporter.py index 7ba729ef8d..eb7622671b 100644 --- a/aleph/metrics/exporter.py +++ b/aleph/metrics/exporter.py @@ -1,4 +1,4 @@ -from prometheus_client import make_wsgi_app +from prometheus_client import make_wsgi_app, PLATFORM_COLLECTOR from prometheus_client.core import CollectorRegistry from aleph.metrics.collectors import InfoCollector, DatabaseCollector, QueuesCollector @@ -6,6 +6,7 @@ def create_app(): registry = CollectorRegistry() + registry.register(PLATFORM_COLLECTOR) registry.register(InfoCollector()) registry.register(DatabaseCollector()) registry.register(QueuesCollector()) From b27bae4e07ba6325ea1497de158af3f1842b58a1 Mon Sep 17 00:00:00 2001 From: Till Prochaska Date: Sun, 19 Nov 2023 12:03:15 +0100 Subject: [PATCH 12/20] Remove unused port, network policy from K8s specs Although I'm not 100% sure, the exposed port 3000 probably is a left-over from the past, possibly when convert-document was still part of ingest-file. The network policy prevented Prometheus from scraping ingest-file metrics (and as the metrics port is now the only port exposed by ingest-file, should be otherwise unnecessary). --- helm/charts/aleph/templates/ingest-file.yaml | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/helm/charts/aleph/templates/ingest-file.yaml b/helm/charts/aleph/templates/ingest-file.yaml index 3aa0144767..4f97d29200 100644 --- a/helm/charts/aleph/templates/ingest-file.yaml +++ b/helm/charts/aleph/templates/ingest-file.yaml @@ -27,9 +27,6 @@ spec: command: - ingestors - process - ports: - - containerPort: 3000 - name: http volumeMounts: {{ if .Values.global.google }} - mountPath: /var/secrets/google @@ -108,17 +105,6 @@ spec: maxReplicas: {{.Values.ingestfile.hpa.maxReplicas}} metrics: {{- toYaml .Values.ingestfile.hpa.scalingMetrics | nindent 4}} --- -apiVersion: networking.k8s.io/v1 -kind: NetworkPolicy -metadata: - name: {{ .Values.global.namingPrefix }}-ingest-file-policy -spec: - podSelector: - matchLabels: - app: {{ .Values.global.namingPrefix }}-ingest-file - policyTypes: - - Ingress ---- {{ if .Values.global.prometheus.enabled }} apiVersion: v1 kind: Service From d512e00d621063288628ce003719494f2870a835 Mon Sep 17 00:00:00 2001 From: Till Prochaska Date: Mon, 20 Nov 2023 11:33:12 +0100 Subject: [PATCH 13/20] Use keyword args to set Prometheus metric labels As suggested by @stchris --- aleph/metrics/flask.py | 14 ++++++++++++-- aleph/views/sessions_api.py | 14 +++++++------- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/aleph/metrics/flask.py b/aleph/metrics/flask.py index d5506020c3..35d28883b0 100644 --- a/aleph/metrics/flask.py +++ b/aleph/metrics/flask.py @@ -50,8 +50,18 @@ def after_request(response): duration = max(0, default_timer() - request.prometheus_start_time) - REQUEST.labels(method, status, api_endpoint, logged_in).inc() - REQUEST_DURATION.labels(method, status, api_endpoint).observe(duration) + REQUEST.labels( + method=method, + status=status, + api_endpoint=api_endpoint, + logged_in=logged_in, + ).inc() + + REQUEST_DURATION.labels( + method=method, + status=status, + api_endpoint=api_endpoint, + ).observe(duration) return response diff --git a/aleph/views/sessions_api.py b/aleph/views/sessions_api.py index 28a8b44ad4..0e76fdf4c1 100644 --- a/aleph/views/sessions_api.py +++ b/aleph/views/sessions_api.py @@ -65,12 +65,12 @@ def password_login(): data = parse_request("Login") role = Role.login(data.get("email"), data.get("password")) if role is None: - AUTH_ATTEMPS.labels("password", "failed").inc() + AUTH_ATTEMPS.labels(method="password", result="failed").inc() raise BadRequest(gettext("Invalid user or password.")) role.touch() db.session.commit() - AUTH_ATTEMPS.labels("password", "success").inc() + AUTH_ATTEMPS.labels(method="password", result="success").inc() update_role(role) authz = Authz.from_role(role) return jsonify({"status": "ok", "token": authz.to_token()}) @@ -104,7 +104,7 @@ def oauth_callback(): err = Unauthorized(gettext("Authentication has failed.")) state = cache.get_complex(_oauth_session(request.args.get("state"))) if state is None: - AUTH_ATTEMPS.labels("oauth", "failed").inc() + AUTH_ATTEMPS.labels(method="oauth", result="failed").inc() raise err try: @@ -112,22 +112,22 @@ def oauth_callback(): uri = state.get("redirect_uri") oauth_token = oauth.provider.authorize_access_token(redirect_uri=uri) except AuthlibBaseError as err: - AUTH_ATTEMPS.labels("oauth", "failed").inc() + AUTH_ATTEMPS.labels(method="oauth", result="failed").inc() log.warning("Failed OAuth: %r", err) raise err if oauth_token is None or isinstance(oauth_token, AuthlibBaseError): - AUTH_ATTEMPS.labels("oauth", "failed").inc() + AUTH_ATTEMPS.labels(method="oauth", result="failed").inc() log.warning("Failed OAuth: %r", oauth_token) raise err role = handle_oauth(oauth.provider, oauth_token) if role is None: - AUTH_ATTEMPS.labels("oauth", "failed").inc() + AUTH_ATTEMPS.labels(method="oauth", result="failed").inc() raise err db.session.commit() update_role(role) - AUTH_ATTEMPS.labels("oauth", "success").inc() + AUTH_ATTEMPS.labels(method="oauth", result="success").inc() log.debug("Logged in: %r", role) request.authz = Authz.from_role(role) token = request.authz.to_token() From 8137882bce8c75fda217002a63ada3e80165c70d Mon Sep 17 00:00:00 2001 From: Till Prochaska Date: Tue, 21 Nov 2023 14:40:42 +0100 Subject: [PATCH 14/20] Bump servicelayer from 1.22.0 to 1.22.1 --- helm/charts/aleph/templates/ingest-file.yaml | 4 ++-- helm/charts/aleph/templates/worker.yaml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/helm/charts/aleph/templates/ingest-file.yaml b/helm/charts/aleph/templates/ingest-file.yaml index 4f97d29200..5d87737372 100644 --- a/helm/charts/aleph/templates/ingest-file.yaml +++ b/helm/charts/aleph/templates/ingest-file.yaml @@ -116,8 +116,8 @@ spec: selector: app: {{ .Values.global.namingPrefix }}-ingest-file ports: - - port: 9090 - targetPort: 9090 # TODO: Set to 9100 once servicelayer fix is released + - port: 9100 + targetPort: 9100 protocol: "TCP" name: metrics {{ end }} diff --git a/helm/charts/aleph/templates/worker.yaml b/helm/charts/aleph/templates/worker.yaml index 3eb230d305..2546cff956 100644 --- a/helm/charts/aleph/templates/worker.yaml +++ b/helm/charts/aleph/templates/worker.yaml @@ -121,7 +121,7 @@ spec: app: {{ .Values.global.namingPrefix }}-worker ports: - port: 9100 - targetPort: 9090 # TODO: Set to 9100 once servicelayer fix is released + targetPort: 9100 protocol: "TCP" name: metrics {{ end }} From 841f4b64bd82e5662bfa2afcc7c24f792bc21a7b Mon Sep 17 00:00:00 2001 From: Till Prochaska Date: Thu, 23 Nov 2023 12:53:19 +0100 Subject: [PATCH 15/20] Simplify entity streaming metrics code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There’s no need to do batched metric increments until this becomes a performance bottleneck. --- aleph/views/stream_api.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/aleph/views/stream_api.py b/aleph/views/stream_api.py index e2bd2c4151..960606e25b 100644 --- a/aleph/views/stream_api.py +++ b/aleph/views/stream_api.py @@ -68,17 +68,8 @@ def entities(collection_id=None): STREAMS.inc() def generator(entities): - count = 0 - for entity in entities: - count += 1 - - if count == 1000: - STREAMED_ENTITIES.inc(count) - count = 0 - + STREAMED_ENTITIES.inc() yield entity - STREAMED_ENTITIES.inc(count) - return stream_ijson(generator(entities)) From 1288d9463f448a44dbdbc400c1e4d21e3de67c9f Mon Sep 17 00:00:00 2001 From: Till Prochaska Date: Thu, 23 Nov 2023 13:35:13 +0100 Subject: [PATCH 16/20] Limit maximum size of Prometheus multiprocessing directory --- helm/charts/aleph/templates/api.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/helm/charts/aleph/templates/api.yaml b/helm/charts/aleph/templates/api.yaml index d0a3574c19..451bcf837f 100644 --- a/helm/charts/aleph/templates/api.yaml +++ b/helm/charts/aleph/templates/api.yaml @@ -140,7 +140,8 @@ spec: {{ end }} {{ if .Values.global.prometheus.enabled }} - name: prometheus-multiproc-dir - emptyDir: {} + emptyDir: + sizeLimit: 1Gi {{ end }} - name: tmp-volume emptyDir: {} From 33d9416da51388f101d4918e496280f56310f7a0 Mon Sep 17 00:00:00 2001 From: Till Prochaska Date: Thu, 23 Nov 2023 13:45:47 +0100 Subject: [PATCH 17/20] Do not let collector classes inherit from `object` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit I copied the boilerplate for custom collectors from the docs without thinking about it too much, but inheriting from `object` really isn’t necessary anymore in Python 3. The Prometheus client also exports an abstract `Collector` class -- it doesn’t do anything except providing type hints for the `collect` method which is nice. --- aleph/metrics/collectors.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/aleph/metrics/collectors.py b/aleph/metrics/collectors.py index 4863976d32..f68a186f33 100644 --- a/aleph/metrics/collectors.py +++ b/aleph/metrics/collectors.py @@ -1,5 +1,6 @@ from sqlalchemy import func from prometheus_client.core import GaugeMetricFamily, InfoMetricFamily +from prometheus_client.registry import Collector from followthemoney import __version__ as ftm_version from aleph import __version__ as aleph_version @@ -8,7 +9,7 @@ from aleph.model import Role, Collection, EntitySet, Bookmark -class InfoCollector(object): +class InfoCollector(Collector): def collect(self): yield InfoMetricFamily( "aleph_system", @@ -20,7 +21,7 @@ def collect(self): ) -class DatabaseCollector(object): +class DatabaseCollector(Collector): def __init__(self): self._flask_app = create_flask_app() @@ -134,7 +135,7 @@ def _bookmark_users(self): ) -class QueuesCollector(object): +class QueuesCollector(Collector): def collect(self): status = get_active_dataset_status() From 63fb216c164c6115c010fe8986905ddb1f115956 Mon Sep 17 00:00:00 2001 From: Till Prochaska <1512805+tillprochaska@users.noreply.github.com> Date: Tue, 9 Jan 2024 17:41:19 +0100 Subject: [PATCH 18/20] Add `aleph_` prefix to Prometheus API metrics --- aleph/metrics/flask.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aleph/metrics/flask.py b/aleph/metrics/flask.py index 35d28883b0..919fd9e23f 100644 --- a/aleph/metrics/flask.py +++ b/aleph/metrics/flask.py @@ -15,13 +15,13 @@ METRICS_ENDPOINT_NAME = "metrics" REQUEST_DURATION = Histogram( - "http_request_duration_seconds", + "aleph_http_request_duration_seconds", "Duration of requests to the Aleph API in seconds", ["method", "status", "api_endpoint"], ) REQUEST = Counter( - "http_request_total", + "aleph_http_request_total", "Total number of Aleph API requests", ["method", "status", "api_endpoint", "logged_in"], ) From ff937cdd1ab4f61d57160413128a52c92dd95319 Mon Sep 17 00:00:00 2001 From: Till Prochaska <1512805+tillprochaska@users.noreply.github.com> Date: Tue, 9 Jan 2024 17:43:54 +0100 Subject: [PATCH 19/20] Fix metrics name (singular -> plural) --- aleph/metrics/flask.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/aleph/metrics/flask.py b/aleph/metrics/flask.py index 919fd9e23f..42dfb46c38 100644 --- a/aleph/metrics/flask.py +++ b/aleph/metrics/flask.py @@ -20,8 +20,8 @@ ["method", "status", "api_endpoint"], ) -REQUEST = Counter( - "aleph_http_request_total", +REQUESTS = Counter( + "aleph_http_requests_total", "Total number of Aleph API requests", ["method", "status", "api_endpoint", "logged_in"], ) @@ -50,7 +50,7 @@ def after_request(response): duration = max(0, default_timer() - request.prometheus_start_time) - REQUEST.labels( + REQUESTS.labels( method=method, status=status, api_endpoint=api_endpoint, From 6197c308f6bfe82262b1d0b51a33208fed42cd24 Mon Sep 17 00:00:00 2001 From: Till Prochaska <1512805+tillprochaska@users.noreply.github.com> Date: Mon, 15 Jan 2024 17:46:45 +0100 Subject: [PATCH 20/20] Add documentation on how to test Prometheus instrumentation in local Kubernetes cluster --- helm/examples/dev/README.md | 24 ++++++++++++++++++++++++ helm/examples/dev/prometheusValues.yaml | 15 +++++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 helm/examples/dev/prometheusValues.yaml diff --git a/helm/examples/dev/README.md b/helm/examples/dev/README.md index 17bf1ad071..d0bca9bbc8 100644 --- a/helm/examples/dev/README.md +++ b/helm/examples/dev/README.md @@ -93,3 +93,27 @@ aleph createuser --name "Test User" --password "12345678" --admin mail@example.o ## Viewing and downloading files Because the MinIO endpoint is only accessible from within the cluster network, you won’t be able to preview or download files from the Aleph UI. + +## Prometheus + +In order to collect Prometheus metrics, install the the Kubernetes Prometheus Stack in your cluster: + +``` +helm repo add prometheus-community https://prometheus-community.github.io/helm-charts +helm repo update +helm install -f prometheusValues.yaml kube-prometheus-stack prometheus-community/kube-prometheus-stack +``` + +Now enable Aleph’s Prometheus integration: + +``` +helm upgrade --set "aleph.enabled=true" --set "global.prometheus.enabled=true" --set "global.serviceMonitor.enabled=true" aleph . +``` + +In order to access the Prometheus dashboard forward the respective port… + +``` +kubectl port-forward svc/aleph-api 9100 +``` + +… and open `http://kubernetes.docker.internal:9100` in your browser. diff --git a/helm/examples/dev/prometheusValues.yaml b/helm/examples/dev/prometheusValues.yaml new file mode 100644 index 0000000000..3b43bc6f7e --- /dev/null +++ b/helm/examples/dev/prometheusValues.yaml @@ -0,0 +1,15 @@ +prometheus: + prometheusSpec: + serviceMonitorSelectorNilUsesHelmValues: false + +grafana: + enabled: false + +nodeExporter: + enabled: false + +alertmanager: + enabled: false + +kubeStateMetrics: + enabled: false