From 02bb455315a17a39cb0d936a12a13fbbdb887bb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20=22decko=22=20de=20Brito?= Date: Thu, 6 Jun 2024 13:09:46 -0300 Subject: [PATCH] Add a lock to avoid multiple workers to emit at the same time. Closes #5442 --- CHANGES/5442.bugfix | 1 + pulpcore/app/util.py | 72 +++++++++++++++++++++++++++++--------- pulpcore/constants.py | 1 + pulpcore/tasking/_util.py | 29 --------------- pulpcore/tasking/worker.py | 4 +-- 5 files changed, 59 insertions(+), 48 deletions(-) create mode 100644 CHANGES/5442.bugfix diff --git a/CHANGES/5442.bugfix b/CHANGES/5442.bugfix new file mode 100644 index 0000000000..94bc422368 --- /dev/null +++ b/CHANGES/5442.bugfix @@ -0,0 +1 @@ +Added a lock to avoid multiple workers sending metrics at the same time. diff --git a/pulpcore/app/util.py b/pulpcore/app/util.py index c533e267c1..2843d2a204 100644 --- a/pulpcore/app/util.py +++ b/pulpcore/app/util.py @@ -1,20 +1,21 @@ import hashlib import zlib -from functools import lru_cache -from gettext import gettext as _ import os import tempfile +import gnupg +from functools import lru_cache +from gettext import gettext as _ from urllib.parse import urlparse - from contextlib import ExitStack from contextvars import ContextVar from datetime import timedelta -import gnupg from django.conf import settings +from django.db import connection from django.db.models import Model, Sum from django.urls import Resolver404, resolve, reverse + from opentelemetry import metrics from rest_framework.serializers import ValidationError @@ -22,6 +23,8 @@ from pulpcore.app.loggers import deprecation_logger from pulpcore.app.apps import pulp_plugin_configs from pulpcore.app import models +from pulpcore.constants import STORAGE_METRICS_LOCK +from pulpcore.exceptions import AdvisoryLockError from pulpcore.exceptions.validation import InvalidSignatureError @@ -539,19 +542,26 @@ def _init_emitting_total_size(self): ) def _disk_usage_callback(self): - from pulpcore.app.models import Artifact - - options = yield # noqa - - while True: - artifacts = Artifact.objects.filter(pulp_domain=self.domain).distinct() - total_size = artifacts.aggregate(size=Sum("size", default=0))["size"] - options = yield [ # noqa - metrics.Observation( - total_size, - {"pulp_href": get_url(self.domain), "domain_name": self.domain.name}, - ) - ] + try: + with PGAdvisoryLock(STORAGE_METRICS_LOCK): + from pulpcore.app.models import Artifact + + options = yield # noqa + + while True: + artifacts = Artifact.objects.filter(pulp_domain=self.domain).distinct() + total_size = artifacts.aggregate(size=Sum("size", default=0))["size"] + options = yield [ # noqa + metrics.Observation( + total_size, + { + "pulp_href": get_url(self.domain), + "domain_name": self.domain.name, + }, + ) + ] + except AdvisoryLockError: + yield class _NoopEmitter: def __call__(self, *args, **kwargs): @@ -574,3 +584,31 @@ def init_domain_metrics_exporter(): for domain in Domain.objects.all(): DomainMetricsEmitterBuilder.build(domain) + + +class PGAdvisoryLock: + """ + A context manager that will hold a postgres advisory lock non-blocking. + + The locks can be chosen from a lock group to avoid collisions. They will never collide with the + locks used for tasks. + """ + + def __init__(self, lock, lock_group=0): + self.lock_group = lock_group + self.lock = lock + + def __enter__(self): + with connection.cursor() as cursor: + cursor.execute("SELECT pg_try_advisory_lock(%s, %s)", [self.lock_group, self.lock]) + acquired = cursor.fetchone()[0] + if not acquired: + raise AdvisoryLockError("Could not acquire lock.") + return self + + def __exit__(self, exc_type, exc_value, traceback): + with connection.cursor() as cursor: + cursor.execute("SELECT pg_advisory_unlock(%s, %s)", [self.lock_group, self.lock]) + released = cursor.fetchone()[0] + if not released: + raise RuntimeError("Lock not held.") diff --git a/pulpcore/constants.py b/pulpcore/constants.py index a6852a3373..edd270bf24 100644 --- a/pulpcore/constants.py +++ b/pulpcore/constants.py @@ -13,6 +13,7 @@ TASK_SCHEDULING_LOCK = 42 TASK_UNBLOCKING_LOCK = 84 TASK_METRICS_HEARTBEAT_LOCK = 74 +STORAGE_METRICS_LOCK = 72 #: All valid task states. diff --git a/pulpcore/tasking/_util.py b/pulpcore/tasking/_util.py index 6677c0268a..b691017bda 100644 --- a/pulpcore/tasking/_util.py +++ b/pulpcore/tasking/_util.py @@ -24,40 +24,11 @@ configure_cleanup, ) from pulpcore.constants import TASK_FINAL_STATES, TASK_STATES, VAR_TMP_PULP -from pulpcore.exceptions import AdvisoryLockError from pulpcore.tasking.tasks import dispatch, execute_task _logger = logging.getLogger(__name__) -class PGAdvisoryLock: - """ - A context manager that will hold a postgres advisory lock non-blocking. - - The locks can be chosen from a lock group to avoid collisions. They will never collide with the - locks used for tasks. - """ - - def __init__(self, lock, lock_group=0): - self.lock_group = lock_group - self.lock = lock - - def __enter__(self): - with connection.cursor() as cursor: - cursor.execute("SELECT pg_try_advisory_lock(%s, %s)", [self.lock_group, self.lock]) - acquired = cursor.fetchone()[0] - if not acquired: - raise AdvisoryLockError("Could not acquire lock.") - return self - - def __exit__(self, exc_type, exc_value, traceback): - with connection.cursor() as cursor: - cursor.execute("SELECT pg_advisory_unlock(%s, %s)", [self.lock_group, self.lock]) - released = cursor.fetchone()[0] - if not released: - raise RuntimeError("Lock not held.") - - def startup_hook(): configure_analytics() configure_cleanup() diff --git a/pulpcore/tasking/worker.py b/pulpcore/tasking/worker.py index 1af7e90083..933e8b5acb 100644 --- a/pulpcore/tasking/worker.py +++ b/pulpcore/tasking/worker.py @@ -25,9 +25,10 @@ TASK_UNBLOCKING_LOCK, TASK_METRICS_HEARTBEAT_LOCK, ) -from pulpcore.exceptions import AdvisoryLockError from pulpcore.app.apps import pulp_plugin_configs from pulpcore.app.models import Worker, Task, ApiAppStatus, ContentAppStatus +from pulpcore.app.util import PGAdvisoryLock +from pulpcore.exceptions import AdvisoryLockError from pulpcore.tasking.storage import WorkerDirectory from pulpcore.tasking._util import ( @@ -35,7 +36,6 @@ dispatch_scheduled_tasks, perform_task, startup_hook, - PGAdvisoryLock, ) _logger = logging.getLogger(__name__)