Skip to content

Commit

Permalink
[SARC-329] Implémenter les alertes : Proportion de jobs CPU avec stat…
Browse files Browse the repository at this point in the history
…s prometheus sur un noeud donné plus bas qu’un threshold X (#132)

* [SARC-329] Implémenter les alertes : Proportion de jobs CPU avec stats prometheus sur un noeud donné plus bas qu’un threshold X

* Use file_regression for tests.

* (cleanup) Remove unused code in tests for SARC-328
  • Loading branch information
notoraptor authored Sep 20, 2024
1 parent 0e6ba5c commit bab50a5
Show file tree
Hide file tree
Showing 14 changed files with 371 additions and 13 deletions.
177 changes: 177 additions & 0 deletions sarc/alerts/usage_alerts/prometheus_stats_occurrences.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Sequence, Union

from sarc.config import MTL
from sarc.jobs.series import compute_time_frames, load_job_series

logger = logging.getLogger(__name__)


class PrometheusStatInfo:
"""Prometheus stat context, used in checking below."""

def __init__(self, name):
self.name = name
self.col_has = f"has_{name}"
self.col_ratio = f"ratio_{name}"
self.avg = None
self.stddev = None
self.threshold = None


def check_prometheus_stats_occurrences(
time_interval: Optional[timedelta] = timedelta(days=7),
time_unit=timedelta(days=1),
minimum_runtime: Optional[timedelta] = timedelta(minutes=5),
cluster_names: Optional[List[str]] = None,
group_by_node: Optional[Sequence[str]] = ("mila",),
min_jobs_per_group: Optional[Union[int, Dict[str, int]]] = None,
nb_stddev=2,
):
"""
Check if we have scrapped Prometheus stats for enough jobs per node per cluster per time unit.
Log a warning for each node / cluster where ratio of jobs with Prometheus stats is lower than
a threshold computed using mean and standard deviation statistics from all clusters.
Parameters
----------
time_interval: timedelta
If given, only jobs which ran in [now - time_interval, time_interval] will be used for checking.
Default is last 7 days.
If None, all jobs are used.
time_unit: timedelta
Time unit in which we must check cluster usage through time_interval. Default is 1 day.
minimum_runtime: timedelta
If given, only jobs which ran at least for this minimum runtime will be used for checking.
Default is 5 minutes.
If None, set to 0.
cluster_names: list
Optional list of clusters to check.
There may have clusters we don't want to check among retrieved jobs (eg. clusters in maintenance).
On the opposite, we may expect to see jobs in a cluster, but there are actually no jobs in this cluster.
To cover such cases, one can specify the complete list of expected clusters with `cluster_names`.
Jobs from clusters not in this list will be ignored both to compute statistics and in checking phase.
If a cluster in this list does not appear in jobs, a warning will be logged.
If empty (or not specified), use all clusters available among jobs retrieved with time_interval.
group_by_node: Sequence
Optional sequence of clusters to group by node.
For clusters in this list, we will check each node separately (ie. a "group" is a cluster node).
By default, we check the entire cluster (i.e. the "group" is the cluster itself).
min_jobs_per_group: int | dict
Minimum number of jobs required for checking in each group.
Either an integer, as minimum number for any group,
or a dictionary mapping a cluster name to minimum number in each group of this cluster
A group is either a cluster node, if cluster name is in `group_by_node`,
or the entire cluster otherwise.
Default is 1 job per group.
nb_stddev: float
Amount of standard deviation to remove from average statistics to compute checking threshold.
Threshold is computed as:
max(0, average - nb_stddev * stddev)
"""

# Parse time_interval and get data frame
start, end, clip_time = None, None, False
if time_interval is not None:
end = datetime.now(tz=MTL)
start = end - time_interval
clip_time = True
df = load_job_series(start=start, end=end, clip_time=clip_time)

# Parse minimum_runtime, and select only jobs where
# elapsed time >= minimum runtime and allocated.gres_gpu == 0
if minimum_runtime is None:
minimum_runtime = timedelta(seconds=0)
df = df[
(df["elapsed_time"] >= minimum_runtime.total_seconds())
& (df["allocated.gres_gpu"] == 0)
]

# List clusters
cluster_names = cluster_names or sorted(df["cluster_name"].unique())

# Split data frame into time frames using `time_unit`
df = compute_time_frames(df, frame_size=time_unit)

# Duplicates lines per node to count each job for each node where it runs
df = df.explode("nodes")

# If cluster not in group_by_node,
# then we must count jobs for the entire cluster, not per node.
# To simplify the code, let's just define 1 common node for all cluster jobs
cluster_node_name = "(all)"
df.loc[~df["cluster_name"].isin(group_by_node), "nodes"] = cluster_node_name

# Add a column to ease job count
df.loc[:, "task_"] = 1

# Generate Prometheus context for each Prometheus stat we want to check.
prom_contexts = [
PrometheusStatInfo(name=prom_col)
for prom_col in ["cpu_utilization", "system_memory"]
]

# Add columns to check if job has prometheus stats
for prom in prom_contexts:
df.loc[:, prom.col_has] = ~df[prom.name].isnull()

# Group per timestamp per cluster per node, and count jobs and prometheus stats.
# If "cluster_names" are given, use only jobs in these clusters.
f_stats = (
df[df["cluster_name"].isin(cluster_names)]
.groupby(["timestamp", "cluster_name", "nodes"])[
[prom_info.col_has for prom_info in prom_contexts] + ["task_"]
]
.sum()
)

# Compute ratio of job with Prometheus stat for each group,
# then compute threshold for each Prometheus stat.
for prom in prom_contexts:
f_stats[prom.col_ratio] = f_stats[prom.col_has] / f_stats["task_"]
prom.avg = f_stats[prom.col_ratio].mean()
prom.stddev = f_stats[prom.col_ratio].std()
prom.threshold = max(0, prom.avg - nb_stddev * prom.stddev)

# Parse min_jobs_per_group
default_min_jobs = 1
if min_jobs_per_group is None:
min_jobs_per_group = {}
elif isinstance(min_jobs_per_group, int):
default_min_jobs = min_jobs_per_group
min_jobs_per_group = {}
assert isinstance(min_jobs_per_group, dict)

# Now we can check
clusters_seen = set()
for row in f_stats.itertuples():
timestamp, cluster_name, node = row.Index
clusters_seen.add(cluster_name)
nb_jobs = row.task_
if nb_jobs >= min_jobs_per_group.get(cluster_name, default_min_jobs):
grouping_type = "cluster" if node == cluster_node_name else "node / cluster"
grouping_name = (
f"[{cluster_name}]"
if node == cluster_node_name
else f"[{cluster_name}][{node}]"
)
for prom in prom_contexts:
local_stat = getattr(row, prom.col_has) / nb_jobs
if local_stat < prom.threshold:
logger.warning(
f"[{timestamp}]{grouping_name} insufficient Prometheus data for {prom.name}: "
f"{round(local_stat * 100, 2)} % of CPU jobs / {grouping_type} / time unit; "
f"minimum required: {prom.threshold} ({prom.avg} - {nb_stddev} * {prom.stddev}); "
f"time unit: {time_unit}"
)

# Check clusters listed in `cluster_names` but not found in jobs.
for cluster_name in cluster_names:
if cluster_name not in clusters_seen:
# No stats found for this cluster. Warning
logger.warning(
f"[{cluster_name}] no Prometheus data available: no job found"
)
27 changes: 23 additions & 4 deletions tests/functional/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ def clear_db(db):
db.clusters.drop()


def fill_db(db, with_users=False, with_clusters=False):
def fill_db(db, with_users=False, with_clusters=False, job_patch=None):
db.allocations.insert_many(create_allocations())
db.jobs.insert_many(create_jobs())
db.jobs.insert_many(create_jobs(job_patch=job_patch))
db.diskusage.insert_many(create_diskusages())
if with_users:
db.users.insert_many(create_users())
Expand All @@ -58,15 +58,15 @@ def fill_db(db, with_users=False, with_clusters=False):


def create_db_configuration_fixture(
db_name, empty=False, with_users=False, scope="function"
db_name, empty=False, with_users=False, job_patch=None, scope="function"
):
@pytest.fixture(scope=scope)
def fixture(standard_config_object):
cfg = custom_db_config(standard_config_object, db_name)
db = cfg.mongo.database_instance
clear_db(db)
if not empty:
fill_db(db, with_users=with_users)
fill_db(db, with_users=with_users, job_patch=job_patch)
yield

return fixture
Expand Down Expand Up @@ -106,6 +106,16 @@ def fixture(client_config_object):
)


read_only_db_with_many_cpu_jobs_config_object = create_db_configuration_fixture(
db_name="sarc-read-only-with-many-cpu-jobs-test",
scope="session",
job_patch={
"allocated": {"billing": 0, "cpu": 0, "gres_gpu": 0, "mem": 0, "node": 0},
"requested": {"billing": 0, "cpu": 0, "gres_gpu": 0, "mem": 0, "node": 0},
},
)


read_only_db_with_users_config_object = create_db_configuration_fixture(
db_name="sarc-read-only-with-users-test",
with_users=True,
Expand Down Expand Up @@ -141,6 +151,15 @@ def read_only_db(standard_config, read_only_db_config_object):
yield cfg.mongo.database_instance


@pytest.fixture
def read_only_db_with_many_cpu_jobs(
standard_config, read_only_db_with_many_cpu_jobs_config_object
):
cfg = custom_db_config(standard_config, "sarc-read-only-with-many-cpu-jobs-test")
with using_config(cfg) as cfg:
yield cfg.mongo.database_instance


@pytest.fixture
def read_only_db_with_users(standard_config, read_only_db_with_users_config_object):
cfg = custom_db_config(standard_config, "sarc-read-only-with-users-test")
Expand Down
12 changes: 9 additions & 3 deletions tests/functional/jobs/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,17 @@

class JobFactory:
def __init__(
self, first_submit_time: None | datetime = None, first_job_id: int = 1
self,
first_submit_time: None | datetime = None,
first_job_id: int = 1,
job_patch: dict | None = None,
):
self.jobs = []
self._first_submit_time = first_submit_time or datetime(
2023, 2, 14, tzinfo=MTL
).astimezone(UTC)
self._first_job_id = first_job_id
self.job_patch = job_patch or {}

@property
def next_job_id(self):
Expand Down Expand Up @@ -96,6 +100,8 @@ def format_kwargs(self, kwargs):

def create_job(self, **kwargs):
job = copy.deepcopy(base_job)
if self.job_patch:
job.update(self.job_patch)
job.update(self.format_kwargs(kwargs))

return job
Expand Down Expand Up @@ -189,9 +195,9 @@ def _create_user(username: str, with_drac=True):
}


def create_jobs(job_factory: JobFactory | None = None):
def create_jobs(job_factory: JobFactory | None = None, job_patch: dict | None = None):
if job_factory is None:
job_factory = JobFactory()
job_factory = JobFactory(job_patch=job_patch)

for status in [
"CANCELLED",
Expand Down
6 changes: 0 additions & 6 deletions tests/functional/usage_alerts/test_alert_cluster_scraping.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,6 @@
from sarc.alerts.usage_alerts.cluster_scraping import check_nb_jobs_per_cluster_per_time

from ..jobs.test_func_load_job_series import MOCK_TIME
from .common import _get_warnings

get_warnings = functools.partial(
_get_warnings,
module="sarc.alerts.usage_alerts.cluster_scraping:cluster_scraping.py",
)


@pytest.mark.freeze_time(MOCK_TIME)
Expand Down
82 changes: 82 additions & 0 deletions tests/functional/usage_alerts/test_prometheus_scraping_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import functools
import re

import pytest

from sarc.alerts.usage_alerts.prometheus_stats_occurrences import (
check_prometheus_stats_occurrences,
)
from sarc.client import get_jobs
from tests.functional.jobs.test_func_load_job_series import MOCK_TIME

from ..jobs.test_func_job_statistics import generate_fake_timeseries


@pytest.mark.freeze_time(MOCK_TIME)
@pytest.mark.usefixtures("read_only_db_with_many_cpu_jobs", "tzlocal_is_mtl")
@pytest.mark.parametrize(
"params",
[
# Check with default params. In last 7 days from now (mock time: 2023-11-22),
# there is only 2 jobs from 1 cluster in 1 timestamp, both with no cpu_utilization
# and no system_memory. So threshold will be 0 everywhere, and no warning will be printed.
dict(),
# Check with no time_interval.
dict(time_interval=None),
# Check with no time_interval and low amount of stddev (0.25), to get more warnings.
dict(time_interval=None, nb_stddev=0.25),
# Check with no time_interval, 0.25 stddev, and 1 extra cluster.
# Expected 1 more warning, no other changes .
dict(
time_interval=None,
nb_stddev=0.25,
cluster_names=[
"raisin",
"patate",
"fromage",
"mila",
"invisible-cluster",
],
),
# Check with no time_interval, 0.25 stddev, with only 2 clusters. Thresholds will change.
dict(time_interval=None, nb_stddev=0.25, cluster_names=["raisin", "mila"]),
# Check with no time_interval, 0.25 stddev, and no group_by_node.
dict(time_interval=None, nb_stddev=0.25, group_by_node=()),
# Check with no time_interval, 0.25 stddev, and group_by_node for all clusters.
# Many changes.
dict(
time_interval=None,
nb_stddev=0.25,
group_by_node=["raisin", "patate", "fromage", "mila"],
),
# Check with no time_interval, 0.25 stddev, group_by_node for all clusters, and min jobs to 2.
dict(
time_interval=None,
nb_stddev=0.25,
group_by_node=["raisin", "patate", "fromage", "mila"],
min_jobs_per_group=2,
),
# Check with no time_interval, 0.25 stddev, group_by_node for all clusters, and min jobs set for one cluster.
dict(
time_interval=None,
nb_stddev=0.25,
group_by_node=["raisin", "patate", "fromage", "mila"],
min_jobs_per_group={"raisin": 3},
),
],
)
def test_check_prometheus_scraping_stats(params, monkeypatch, caplog, file_regression):
monkeypatch.setattr(
"sarc.jobs.series.get_job_time_series", generate_fake_timeseries
)

for job in get_jobs():
job.statistics(save=True)
check_prometheus_stats_occurrences(**params)
file_regression.check(
re.sub(
r"WARNING +sarc\.alerts\.usage_alerts\.prometheus_stats_occurrences:prometheus_stats_occurrences.py:[0-9]+ +",
"",
caplog.text,
)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[2023-11-21 00:01:00-05:00][raisin] insufficient Prometheus data for cpu_utilization: 0.0 % of CPU jobs / cluster / time unit; minimum required: 0.368376726851694 (0.9230769230769231 - 2 * 0.2773500981126146); time unit: 1 day, 0:00:00
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[2023-02-14 00:01:00-05:00][raisin] insufficient Prometheus data for system_memory: 0.0 % of CPU jobs / cluster / time unit; minimum required: 0.059962701821302505 (0.15384615384615385 - 0.25 * 0.3755338080994054); time unit: 1 day, 0:00:00
[2023-02-15 00:01:00-05:00][raisin] insufficient Prometheus data for system_memory: 0.0 % of CPU jobs / cluster / time unit; minimum required: 0.059962701821302505 (0.15384615384615385 - 0.25 * 0.3755338080994054); time unit: 1 day, 0:00:00
[2023-02-16 00:01:00-05:00][raisin] insufficient Prometheus data for system_memory: 0.0 % of CPU jobs / cluster / time unit; minimum required: 0.059962701821302505 (0.15384615384615385 - 0.25 * 0.3755338080994054); time unit: 1 day, 0:00:00
[2023-02-17 00:01:00-05:00][fromage] insufficient Prometheus data for system_memory: 0.0 % of CPU jobs / cluster / time unit; minimum required: 0.059962701821302505 (0.15384615384615385 - 0.25 * 0.3755338080994054); time unit: 1 day, 0:00:00
[2023-02-17 00:01:00-05:00][patate] insufficient Prometheus data for system_memory: 0.0 % of CPU jobs / cluster / time unit; minimum required: 0.059962701821302505 (0.15384615384615385 - 0.25 * 0.3755338080994054); time unit: 1 day, 0:00:00
[2023-02-17 00:01:00-05:00][raisin] insufficient Prometheus data for system_memory: 0.0 % of CPU jobs / cluster / time unit; minimum required: 0.059962701821302505 (0.15384615384615385 - 0.25 * 0.3755338080994054); time unit: 1 day, 0:00:00
[2023-02-18 00:01:00-05:00][mila][cn-c021] insufficient Prometheus data for system_memory: 0.0 % of CPU jobs / node / cluster / time unit; minimum required: 0.059962701821302505 (0.15384615384615385 - 0.25 * 0.3755338080994054); time unit: 1 day, 0:00:00
[2023-02-18 00:01:00-05:00][patate] insufficient Prometheus data for system_memory: 0.0 % of CPU jobs / cluster / time unit; minimum required: 0.059962701821302505 (0.15384615384615385 - 0.25 * 0.3755338080994054); time unit: 1 day, 0:00:00
[2023-02-18 00:01:00-05:00][raisin] insufficient Prometheus data for system_memory: 0.0 % of CPU jobs / cluster / time unit; minimum required: 0.059962701821302505 (0.15384615384615385 - 0.25 * 0.3755338080994054); time unit: 1 day, 0:00:00
[2023-02-19 00:01:00-05:00][raisin] insufficient Prometheus data for system_memory: 0.0 % of CPU jobs / cluster / time unit; minimum required: 0.059962701821302505 (0.15384615384615385 - 0.25 * 0.3755338080994054); time unit: 1 day, 0:00:00
[2023-11-21 00:01:00-05:00][raisin] insufficient Prometheus data for cpu_utilization: 0.0 % of CPU jobs / cluster / time unit; minimum required: 0.8537393985487695 (0.9230769230769231 - 0.25 * 0.2773500981126146); time unit: 1 day, 0:00:00
[2023-11-21 00:01:00-05:00][raisin] insufficient Prometheus data for system_memory: 0.0 % of CPU jobs / cluster / time unit; minimum required: 0.059962701821302505 (0.15384615384615385 - 0.25 * 0.3755338080994054); time unit: 1 day, 0:00:00
Loading

0 comments on commit bab50a5

Please sign in to comment.