From f84f31c1b7455f39917bb68dedc8fcc144af46a5 Mon Sep 17 00:00:00 2001 From: qdelamea Date: Wed, 13 Mar 2024 17:56:01 +0100 Subject: [PATCH 1/4] fix: metric naming --- examples/basic.py | 11 +++++++++++ src/armonik_analytics/metrics/base.py | 4 ++++ src/armonik_analytics/metrics/time_series.py | 6 +++++- src/armonik_analytics/metrics/transitions.py | 5 ++++- src/armonik_analytics/stats.py | 2 +- 5 files changed, 25 insertions(+), 3 deletions(-) diff --git a/examples/basic.py b/examples/basic.py index 26ec77d..0c6db14 100644 --- a/examples/basic.py +++ b/examples/basic.py @@ -18,6 +18,10 @@ def plot_metrics(stats): plt.figure() + plt.xlabel("Time (s)") + plt.ylabel("Number of tasks") + plt.legend(loc="upper right") + plt.title("Tasks in status over time") for metric_name in stats.values.keys(): if metric_name.endswith("OverTime"): values = stats.values[metric_name] @@ -58,6 +62,13 @@ def print_metrics(stats): TimestampsTransition(TaskTimestamps.FETCHED, TaskTimestamps.STARTED), TimestampsTransition(TaskTimestamps.STARTED, TaskTimestamps.PROCESSED), TimestampsTransition(TaskTimestamps.PROCESSED, TaskTimestamps.ENDED), + TasksInStatusOverTime(TaskTimestamps.CREATED, TaskTimestamps.SUBMITTED), + TasksInStatusOverTime(TaskTimestamps.SUBMITTED, TaskTimestamps.RECEIVED), + TasksInStatusOverTime(TaskTimestamps.RECEIVED, TaskTimestamps.ACQUIRED), + TasksInStatusOverTime(TaskTimestamps.ACQUIRED, TaskTimestamps.FETCHED), + TasksInStatusOverTime(TaskTimestamps.FETCHED, TaskTimestamps.STARTED), + TasksInStatusOverTime(TaskTimestamps.STARTED, TaskTimestamps.PROCESSED), + TasksInStatusOverTime(TaskTimestamps.PROCESSED, TaskTimestamps.ENDED), TasksInStatusOverTime(TaskTimestamps.ENDED), ], ) diff --git a/src/armonik_analytics/metrics/base.py b/src/armonik_analytics/metrics/base.py index 15405ce..00b4da3 100644 --- a/src/armonik_analytics/metrics/base.py +++ b/src/armonik_analytics/metrics/base.py @@ -30,6 +30,10 @@ def complete(self, start: datetime, end: datetime) -> None: """ pass + @property + def name(self) -> str: + return self.__class__.__name__ + @abstractproperty def values(self) -> any: """ diff --git a/src/armonik_analytics/metrics/time_series.py b/src/armonik_analytics/metrics/time_series.py index 4ca8506..88b8d9e 100644 --- a/src/armonik_analytics/metrics/time_series.py +++ b/src/armonik_analytics/metrics/time_series.py @@ -27,6 +27,10 @@ def __init__( self.timestamps = None self.index = 0 + @property + def name(self) -> str: + return f"{self.timestamp.name.capitalize()}TasksOverTime" + @property def timestamp(self) -> TaskTimestamps: return self.__timestamp @@ -62,7 +66,7 @@ def update(self, total: int, tasks: list[Task]) -> None: n_tasks = len(tasks) if self.timestamps is None: n = (2 * total) + 1 if self.next_timestamp else total + 1 - self.timestamps = np.memmap("timestamps.dat", dtype=object, mode="w+", shape=(2, n)) + self.timestamps = np.memmap(f"{self.name}.dat", dtype=object, mode="w+", shape=(2, n)) self.index = 1 self.timestamps[:, self.index : self.index + n_tasks] = [ [getattr(t, f"{self.timestamp.name.lower()}_at") for t in tasks], diff --git a/src/armonik_analytics/metrics/transitions.py b/src/armonik_analytics/metrics/transitions.py index 6561347..eabe150 100644 --- a/src/armonik_analytics/metrics/transitions.py +++ b/src/armonik_analytics/metrics/transitions.py @@ -22,7 +22,10 @@ def __init__(self, timestamp_1: str, timestamp_2: str) -> None: self.avg = 0 self.min = None self.max = None - self.__class__.__qualname__ = f"{self.timestamps[0].name.lower().capitalize()}To{self.timestamps[1].name.capitalize()}" + + @property + def name(self) -> str: + return f"{self.timestamps[0].name.capitalize()}To{self.timestamps[1].name.capitalize()}" @property def timestamps(self) -> tuple[TaskTimestamps, TaskTimestamps]: diff --git a/src/armonik_analytics/stats.py b/src/armonik_analytics/stats.py index 9c429dd..ceb1bf3 100644 --- a/src/armonik_analytics/stats.py +++ b/src/armonik_analytics/stats.py @@ -59,4 +59,4 @@ def compute(self) -> None: @property def values(self): """Dict[str, Union[float, dict]]: A dictionary containing computed statistics.""" - return {metric.__class__.__qualname__: metric.values for metric in self.metrics} + return {metric.name: metric.values for metric in self.metrics} From bd8d3ff41e516d6dee649eb4c84657f09b33a8d5 Mon Sep 17 00:00:00 2001 From: qdelamea Date: Wed, 13 Mar 2024 20:02:28 +0100 Subject: [PATCH 2/4] feat: add support for tasks with incomplete timestamps --- src/armonik_analytics/metrics/time_series.py | 34 +++++++++++++------- src/armonik_analytics/metrics/transitions.py | 17 +++++----- 2 files changed, 32 insertions(+), 19 deletions(-) diff --git a/src/armonik_analytics/metrics/time_series.py b/src/armonik_analytics/metrics/time_series.py index 88b8d9e..92d4f02 100644 --- a/src/armonik_analytics/metrics/time_series.py +++ b/src/armonik_analytics/metrics/time_series.py @@ -25,6 +25,7 @@ def __init__( self.timestamp = timestamp self.next_timestamp = next_timestamp self.timestamps = None + self.steps = None self.index = 0 @property @@ -66,18 +67,19 @@ def update(self, total: int, tasks: list[Task]) -> None: n_tasks = len(tasks) if self.timestamps is None: n = (2 * total) + 1 if self.next_timestamp else total + 1 - self.timestamps = np.memmap(f"{self.name}.dat", dtype=object, mode="w+", shape=(2, n)) + self.timestamps = np.memmap(f"{self.name}_timestamps.dat", dtype=datetime, mode="w+", shape=(n,)) + self.steps = np.memmap(f"{self.name}_steps.dat", dtype=np.int8, mode="w+", shape=(n,)) self.index = 1 - self.timestamps[:, self.index : self.index + n_tasks] = [ - [getattr(t, f"{self.timestamp.name.lower()}_at") for t in tasks], - n_tasks * [1], + self.timestamps[self.index : self.index + n_tasks] = [ + getattr(t, f"{self.timestamp.name.lower()}_at") for t in tasks ] + self.steps[self.index : self.index + n_tasks] = n_tasks * [1] self.index += n_tasks if self.next_timestamp: - self.timestamps[:, self.index : self.index + n_tasks] = [ - [getattr(t, f"{self.next_timestamp.name.lower()}_at") for t in tasks], - n_tasks * [-1], + self.timestamps[self.index : self.index + n_tasks] = [ + getattr(t, f"{self.next_timestamp.name.lower()}_at") for t in tasks ] + self.steps[self.index : self.index + n_tasks] = n_tasks * [-1] self.index += n_tasks def complete(self, start: datetime, end: datetime) -> None: @@ -88,13 +90,23 @@ def complete(self, start: datetime, end: datetime) -> None: start (datetime): The start time. end (datetime): The end time. """ - self.timestamps[:, 0] = (start, 0) - self.timestamps = self.timestamps[:, self.timestamps[0, :].argsort()] - self.timestamps[1, :] = np.cumsum(self.timestamps[1, :]) + # Add start date with no step + self.timestamps[0] = start + self.steps[0] = 0 + # Remove inconsistent data (due to missing timestamps in task metadata) + inconsistent_values = np.atleast_1d(self.timestamps is None).nonzero()[0] + self.timestamps = np.delete(self.timestamps, inconsistent_values) + self.steps = np.delete(self.steps, inconsistent_values) + # Sort the arrays by timestamp dates + sort_indices = self.timestamps.argsort() + self.timestamps = self.timestamps[sort_indices] + self.steps = self.steps[sort_indices] + # Compute the number of task in the given timestamp over time + self.steps = np.cumsum(self.steps) @property def values(self): """ Return the timestamps as the metric values. """ - return self.timestamps + return np.vstack((self.timestamps, self.steps)) diff --git a/src/armonik_analytics/metrics/transitions.py b/src/armonik_analytics/metrics/transitions.py index eabe150..5665ee2 100644 --- a/src/armonik_analytics/metrics/transitions.py +++ b/src/armonik_analytics/metrics/transitions.py @@ -70,15 +70,16 @@ def update(self, total: int, tasks: list[Task]) -> None: getattr(t, f"{self.timestamps[1].name.lower()}_at") - getattr(t, f"{self.timestamps[0].name.lower()}_at") ).total_seconds() - for t in tasks + for t in tasks if (getattr(t, f"{self.timestamps[1].name.lower()}_at") is not None and getattr(t, f"{self.timestamps[0].name.lower()}_at") is not None) ] - self.avg += np.sum(deltas) / total - min = np.min(deltas) - max = np.max(deltas) - if self.max is None or self.max < max: - self.max = max - if self.min is None or self.min > min: - self.min = min + if deltas: + self.avg += np.sum(deltas) / total + min = np.min(deltas) + max = np.max(deltas) + if self.max is None or self.max < max: + self.max = max + if self.min is None or self.min > min: + self.min = min @property def values(self) -> dict[str, float]: From cf03415ef324e3541d3523e7a9ace175f9065656 Mon Sep 17 00:00:00 2001 From: qdelamea Date: Mon, 18 Mar 2024 14:28:51 +0100 Subject: [PATCH 3/4] feat: unit test rewriting --- src/armonik_analytics/metrics/common.py | 23 +- src/armonik_analytics/metrics/time_series.py | 19 +- src/armonik_analytics/metrics/transitions.py | 16 +- src/armonik_analytics/stats.py | 16 +- tests/conftest.py | 184 +++++++++++++++ tests/contest.py | 0 tests/unit/test_common.py | 37 +++ tests/unit/test_stats.py | 234 +++++-------------- tests/unit/test_time_series.py | 57 +++++ tests/unit/test_transitions.py | 41 ++++ 10 files changed, 435 insertions(+), 192 deletions(-) create mode 100644 tests/conftest.py delete mode 100644 tests/contest.py create mode 100644 tests/unit/test_common.py create mode 100644 tests/unit/test_time_series.py create mode 100644 tests/unit/test_transitions.py diff --git a/src/armonik_analytics/metrics/common.py b/src/armonik_analytics/metrics/common.py index 107239e..e67f98b 100644 --- a/src/armonik_analytics/metrics/common.py +++ b/src/armonik_analytics/metrics/common.py @@ -12,7 +12,7 @@ class TotalElapsedTime(ArmoniKMetric): def __init__(self) -> None: self.elapsed = None - def complete(self, start: datetime, end: datetime) -> None: + def complete(self, start: datetime | None, end: datetime | None) -> None: """ Calculate the total elapsed time. @@ -20,7 +20,10 @@ def complete(self, start: datetime, end: datetime) -> None: start (datetime): The start time. end (datetime): The end time. """ - self.elapsed = (end - start).total_seconds() + if isinstance(start, datetime) and isinstance(end, datetime): + self.elapsed = (end - start).total_seconds() + else: + self.elapsed = None @property def values(self) -> float: @@ -40,7 +43,7 @@ class AvgThroughput(ArmoniKMetric): def __init__(self) -> None: self.throughput = None - self.total = None + self.ended = None def update(self, total: int, tasks: list[Task]) -> None: """ @@ -50,9 +53,10 @@ def update(self, total: int, tasks: list[Task]) -> None: total (int): Total number of tasks. tasks (list[Task]): A task batch. """ - self.total = total + n_ended = len([t for t in tasks if t.ended_at]) + self.ended = self.ended + n_ended if self.ended else n_ended - def complete(self, start: datetime, end: datetime) -> None: + def complete(self, start: datetime | None, end: datetime | None) -> None: """ Calculate the average throughput. @@ -60,7 +64,14 @@ def complete(self, start: datetime, end: datetime) -> None: start (datetime): The start time. end (datetime): The end time. """ - self.throughput = self.total / (end - start).total_seconds() + if ( + isinstance(self.ended, int) + and isinstance(start, datetime) + and isinstance(end, datetime) + ): + self.throughput = self.ended / (end - start).total_seconds() + else: + self.throughput = None @property def values(self) -> float: diff --git a/src/armonik_analytics/metrics/time_series.py b/src/armonik_analytics/metrics/time_series.py index 92d4f02..132bd4c 100644 --- a/src/armonik_analytics/metrics/time_series.py +++ b/src/armonik_analytics/metrics/time_series.py @@ -38,8 +38,7 @@ def timestamp(self) -> TaskTimestamps: @timestamp.setter def timestamp(self, __value: TaskTimestamps) -> None: - if __value not in TaskTimestamps: - raise ValueError(f"{__value} is not a valid timestamp.") + __value = TaskTimestamps(__value) self.__timestamp = __value @property @@ -49,7 +48,7 @@ def next_timestamp(self) -> TaskTimestamps: @next_timestamp.setter def next_timestamp(self, __value: TaskTimestamps) -> None: if __value is not None: - assert __value in TaskTimestamps + __value = TaskTimestamps(__value) if __value < self.timestamp: raise ValueError( f"Inconsistent timestamp order '{self.timestamp.name}' is not prior to '{__value.name}'." @@ -67,7 +66,9 @@ def update(self, total: int, tasks: list[Task]) -> None: n_tasks = len(tasks) if self.timestamps is None: n = (2 * total) + 1 if self.next_timestamp else total + 1 - self.timestamps = np.memmap(f"{self.name}_timestamps.dat", dtype=datetime, mode="w+", shape=(n,)) + self.timestamps = np.memmap( + f"{self.name}_timestamps.dat", dtype=datetime, mode="w+", shape=(n,) + ) self.steps = np.memmap(f"{self.name}_steps.dat", dtype=np.int8, mode="w+", shape=(n,)) self.index = 1 self.timestamps[self.index : self.index + n_tasks] = [ @@ -82,7 +83,7 @@ def update(self, total: int, tasks: list[Task]) -> None: self.steps[self.index : self.index + n_tasks] = n_tasks * [-1] self.index += n_tasks - def complete(self, start: datetime, end: datetime) -> None: + def complete(self, start: datetime | None, end: datetime | None) -> None: """ Complete the metric calculation. @@ -90,11 +91,15 @@ def complete(self, start: datetime, end: datetime) -> None: start (datetime): The start time. end (datetime): The end time. """ + if start is None or self.timestamps.shape[0] == 1: + self.timestamps = None + self.steps = None + return # Add start date with no step self.timestamps[0] = start self.steps[0] = 0 # Remove inconsistent data (due to missing timestamps in task metadata) - inconsistent_values = np.atleast_1d(self.timestamps is None).nonzero()[0] + inconsistent_values = np.atleast_1d(self.timestamps == np.array(None)).nonzero()[0] self.timestamps = np.delete(self.timestamps, inconsistent_values) self.steps = np.delete(self.steps, inconsistent_values) # Sort the arrays by timestamp dates @@ -109,4 +114,6 @@ def values(self): """ Return the timestamps as the metric values. """ + if self.timestamps is None: + return None return np.vstack((self.timestamps, self.steps)) diff --git a/src/armonik_analytics/metrics/transitions.py b/src/armonik_analytics/metrics/transitions.py index 5665ee2..2b558e4 100644 --- a/src/armonik_analytics/metrics/transitions.py +++ b/src/armonik_analytics/metrics/transitions.py @@ -19,7 +19,7 @@ def __init__(self, timestamp_1: str, timestamp_2: str) -> None: timestamp_2 (str): The second timestamp. """ self.timestamps = (timestamp_1, timestamp_2) - self.avg = 0 + self.avg = None self.min = None self.max = None @@ -49,7 +49,7 @@ def timestamps(self, __value: tuple[TaskTimestamps, TaskTimestamps]) -> None: ValueError: If the timestamps are not valid or in inconsistent order. """ for timestamp in __value: - assert timestamp in TaskTimestamps + TaskTimestamps(timestamp) if __value[0] > __value[1]: raise ValueError( f"Inconsistent timestamp order '{__value[0].name}' is not prior to '{__value[1].name}'." @@ -70,10 +70,18 @@ def update(self, total: int, tasks: list[Task]) -> None: getattr(t, f"{self.timestamps[1].name.lower()}_at") - getattr(t, f"{self.timestamps[0].name.lower()}_at") ).total_seconds() - for t in tasks if (getattr(t, f"{self.timestamps[1].name.lower()}_at") is not None and getattr(t, f"{self.timestamps[0].name.lower()}_at") is not None) + for t in tasks + if ( + getattr(t, f"{self.timestamps[1].name.lower()}_at") is not None + and getattr(t, f"{self.timestamps[0].name.lower()}_at") is not None + ) ] if deltas: - self.avg += np.sum(deltas) / total + self.avg = ( + self.avg + np.sum(deltas) / len(deltas) + if self.avg + else np.sum(deltas) / len(deltas) + ) min = np.min(deltas) max = np.max(deltas) if self.max is None or self.max < max: diff --git a/src/armonik_analytics/stats.py b/src/armonik_analytics/stats.py index ceb1bf3..b94513a 100644 --- a/src/armonik_analytics/stats.py +++ b/src/armonik_analytics/stats.py @@ -44,12 +44,16 @@ def compute(self) -> None: while tasks: for metric in self.metrics: metric.update(total, tasks) - min_start = np.min([t.created_at for t in tasks]) - max_end = np.max([t.ended_at for t in tasks]) - if start is None or min_start < start: - start = min_start - if end is None or max_end > end: - end = max_end + starts = [t.created_at for t in tasks if t.created_at] + ends = [t.ended_at for t in tasks if t.ended_at] + if starts: + min_start = np.min(starts) + if start is None or min_start < start: + start = min_start + if ends: + max_end = np.max(ends) + if end is None or max_end > end: + end = max_end page += 1 _, tasks = self.client.list_tasks(task_filter=self.filter, page=page) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..c657bd5 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,184 @@ +import os + +from datetime import datetime, timezone + +import numpy as np +import pytest + +from armonik.common import Task + + +test_cases = [ + { + "tasks": [], + "start": None, + "end": None, + "avg_throughput": None, + "total_elapsed_time": None, + "created_to_submitted": {"avg": None, "min": None, "max": None}, + "ended_over_time": None, + "created_over_time": None, + }, + { + "tasks": [ + Task( + id=0, + created_at=datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), + submitted_at=datetime(1, 1, 1, 1, 1, 2, tzinfo=timezone.utc), + ended_at=datetime(1, 1, 1, 1, 1, 3, tzinfo=timezone.utc), + ) + ], + "start": datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), + "end": datetime(1, 1, 1, 1, 1, 3, tzinfo=timezone.utc), + "avg_throughput": 0.5, + "total_elapsed_time": 2.0, + "created_to_submitted": {"avg": 1.0, "min": 1.0, "max": 1.0}, + "ended_over_time": np.array( + [ + [ + datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), + datetime(1, 1, 1, 1, 1, 3, tzinfo=timezone.utc), + ], + [0, 1], + ] + ), + "created_over_time": np.array( + [ + [ + datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), + datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), + datetime(1, 1, 1, 1, 1, 2, tzinfo=timezone.utc), + ], + [0, 1, 0], + ] + ), + }, + { + "tasks": [ + Task( + id=0, + created_at=datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), + submitted_at=datetime(1, 1, 1, 1, 1, 3, tzinfo=timezone.utc), + ended_at=datetime(1, 1, 1, 1, 1, 6, tzinfo=timezone.utc), + ), + Task( + id=1, + created_at=datetime(1, 1, 1, 1, 1, 2, tzinfo=timezone.utc), + submitted_at=datetime(1, 1, 1, 1, 1, 4, tzinfo=timezone.utc), + ended_at=datetime(1, 1, 1, 1, 1, 5, tzinfo=timezone.utc), + ), + Task( + id=2, + created_at=datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), + submitted_at=datetime(1, 1, 1, 1, 1, 5, tzinfo=timezone.utc), + ended_at=datetime(1, 1, 1, 1, 1, 4, tzinfo=timezone.utc), + ), + ], + "start": datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), + "end": datetime(1, 1, 1, 1, 1, 6, tzinfo=timezone.utc), + "avg_throughput": 3 / 5.0, + "total_elapsed_time": 5.0, + "created_to_submitted": {"avg": 8 / 3, "min": 2.0, "max": 4.0}, + "ended_over_time": np.array( + [ + [ + datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), + datetime(1, 1, 1, 1, 1, 4, tzinfo=timezone.utc), + datetime(1, 1, 1, 1, 1, 5, tzinfo=timezone.utc), + datetime(1, 1, 1, 1, 1, 6, tzinfo=timezone.utc), + ], + [0, 1, 2, 3], + ] + ), + "created_over_time": np.array( + [ + [ + datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), + datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), + datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), + datetime(1, 1, 1, 1, 1, 2, tzinfo=timezone.utc), + datetime(1, 1, 1, 1, 1, 3, tzinfo=timezone.utc), + datetime(1, 1, 1, 1, 1, 4, tzinfo=timezone.utc), + datetime(1, 1, 1, 1, 1, 5, tzinfo=timezone.utc), + ], + [0, 1, 2, 3, 2, 1, 0], + ] + ), + }, + { + "tasks": [ + Task( + id=0, + created_at=datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), + submitted_at=datetime(1, 1, 1, 1, 1, 3, tzinfo=timezone.utc), + ended_at=datetime(1, 1, 1, 1, 1, 6, tzinfo=timezone.utc), + ), + Task(id=1), + Task( + id=2, + created_at=datetime(1, 1, 1, 1, 1, 2, tzinfo=timezone.utc), + submitted_at=datetime(1, 1, 1, 1, 1, 4, tzinfo=timezone.utc), + ended_at=datetime(1, 1, 1, 1, 1, 5, tzinfo=timezone.utc), + ), + Task(id=3), + Task( + id=4, + created_at=datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), + submitted_at=datetime(1, 1, 1, 1, 1, 5, tzinfo=timezone.utc), + ended_at=datetime(1, 1, 1, 1, 1, 4, tzinfo=timezone.utc), + ), + ], + "start": datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), + "end": datetime(1, 1, 1, 1, 1, 6, tzinfo=timezone.utc), + "avg_throughput": 3 / 5.0, + "total_elapsed_time": 5.0, + "created_to_submitted": {"avg": 8 / 3, "min": 2.0, "max": 4.0}, + "ended_over_time": np.array( + [ + [ + datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), + datetime(1, 1, 1, 1, 1, 4, tzinfo=timezone.utc), + datetime(1, 1, 1, 1, 1, 5, tzinfo=timezone.utc), + datetime(1, 1, 1, 1, 1, 6, tzinfo=timezone.utc), + ], + [0, 1, 2, 3], + ] + ), + "created_over_time": np.array( + [ + [ + datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), + datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), + datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), + datetime(1, 1, 1, 1, 1, 2, tzinfo=timezone.utc), + datetime(1, 1, 1, 1, 1, 3, tzinfo=timezone.utc), + datetime(1, 1, 1, 1, 1, 4, tzinfo=timezone.utc), + datetime(1, 1, 1, 1, 1, 5, tzinfo=timezone.utc), + ], + [0, 1, 2, 3, 2, 1, 0], + ] + ), + }, + { + "tasks": [ + Task(id=0), + Task(id=1), + ], + "start": None, + "end": None, + "avg_throughput": None, + "total_elapsed_time": None, + "created_to_submitted": {"avg": None, "min": None, "max": None}, + "ended_over_time": None, + "created_over_time": None, + }, +] + + +@pytest.fixture(scope="session", autouse=True) +def clean_up(): + yield + + for file in os.listdir(os.getcwd()): + if file.endswith(".dat"): + os.remove(os.path.join(os.getcwd(), file)) diff --git a/tests/contest.py b/tests/contest.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/unit/test_common.py b/tests/unit/test_common.py new file mode 100644 index 0000000..bba33fd --- /dev/null +++ b/tests/unit/test_common.py @@ -0,0 +1,37 @@ +from datetime import datetime + +import pytest + +from armonik.common import Task + +from armonik_analytics.metrics import AvgThroughput, TotalElapsedTime + +from conftest import test_cases + + +@pytest.mark.parametrize( + ["tasks", "start", "end", "value"], + [ + (test_case["tasks"], test_case["start"], test_case["end"], test_case["avg_throughput"]) + for test_case in test_cases + ], +) +def test_avg_throughput(tasks: list[Task], start: datetime, end: datetime, value: float | None): + th = AvgThroughput() + th.update(len(tasks), tasks) + th.complete(start, end) + assert th.values == value + + +@pytest.mark.parametrize( + ["tasks", "start", "end", "value"], + [ + (test_case["tasks"], test_case["start"], test_case["end"], test_case["total_elapsed_time"]) + for test_case in test_cases + ], +) +def test_total_elapsed_time(tasks: list[Task], start: datetime, end: datetime, value: float | None): + tet = TotalElapsedTime() + tet.update(len(tasks), tasks) + tet.complete(start, end) + assert tet.values == value diff --git a/tests/unit/test_stats.py b/tests/unit/test_stats.py index b61ccea..736ed96 100644 --- a/tests/unit/test_stats.py +++ b/tests/unit/test_stats.py @@ -1,23 +1,16 @@ -from datetime import datetime, timezone +from datetime import datetime import grpc -import numpy as np import pytest from armonik.client import ArmoniKTasks, TaskFieldFilter -from armonik.common import Direction, Task -from armonik.common.filter import Filter +from armonik.common import Direction, Filter, Task from armonik.protogen.common.sort_direction_pb2 import SortDirection from armonik_analytics.stats import ArmoniKStatistics -from armonik_analytics.metrics import ( - ArmoniKMetric, - TotalElapsedTime, - AvgThroughput, - TimestampsTransition, - TasksInStatusOverTime, -) -from armonik_analytics.utils import TaskTimestamps +from armonik_analytics.metrics.base import ArmoniKMetric + +from conftest import test_cases class DummyMetric(ArmoniKMetric): @@ -29,12 +22,12 @@ def __init__(self): self.start = None self.end = None - def update(self, total: int, tasks: list[Task]): + def update(self, total: int, tasks: list[Task]) -> None: self.updates += 1 self.total = total self.tasks.append(tasks) - def complete(self, start: datetime, end: datetime): + def complete(self, start: datetime, end: datetime) -> None: self.start = start self.end = end self.completes += 1 @@ -44,31 +37,14 @@ def values(self): return "value" -task_batch_1 = [ - Task( - id=i, - created_at=datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), - submitted_at=datetime(1, 1, 1, 1, 1, 1, 1 + i, tzinfo=timezone.utc), - ended_at=datetime(1, 1, 1, 1, 1, 1 + i, tzinfo=timezone.utc), - ) - for i in range(3) -] -task_batch_2 = [ - Task( - id=i, - created_at=datetime(1, 1, 1, 1, 1, 0, tzinfo=timezone.utc), - submitted_at=datetime(1, 1, 1, 1, 1, 0, 1 + i, tzinfo=timezone.utc), - ended_at=datetime(1, 1, 1, 1, 1, 1 + i, tzinfo=timezone.utc), - ) - for i in range(3, 5) -] -start = task_batch_2[0].created_at -end = task_batch_2[-1].ended_at - - class DummyArmoniKTasks(ArmoniKTasks): __call_count = 0 + def __init__(self, grpc_channel: grpc.Channel, tasks: list[Task]): + super().__init__(grpc_channel) + self.tasks = tasks + self.total = 2 * len(tasks) + def list_tasks( self, task_filter: Filter | None = None, @@ -80,152 +56,70 @@ def list_tasks( detailed: bool = True, ) -> tuple[int, list[Task]]: self.__call_count += 1 - if self.__call_count == 1: - return 5, task_batch_1 - elif self.__call_count == 2: - return 5, task_batch_2 - return 5, [] + if self.__call_count <= 2: + return self.total, self.tasks + return self.total, [] + +def test_constructor(): + with grpc.insecure_channel("url") as channel: + ArmoniKStatistics( + channel=channel, + task_filter=TaskFieldFilter.SESSION_ID == "session-id", + metrics=[DummyMetric()], + ) -class TestArmoniKStatistics: - def test_constructor(self): - with grpc.insecure_channel("url") as channel: + with pytest.raises(TypeError): ArmoniKStatistics( channel=channel, task_filter=TaskFieldFilter.SESSION_ID == "session-id", - metrics=[TotalElapsedTime()], + metrics="a", ) - with pytest.raises(TypeError): - ArmoniKStatistics( - channel=channel, - task_filter=TaskFieldFilter.SESSION_ID == "session-id", - metrics="a", - ) - - with pytest.raises(TypeError): - ArmoniKStatistics( - channel=channel, - task_filter=TaskFieldFilter.SESSION_ID == "session-id", - metrics=[], - ) - - with pytest.raises(TypeError): - ArmoniKStatistics( - channel=channel, - task_filter=TaskFieldFilter.SESSION_ID == "session-id", - metrics=["a", TotalElapsedTime()], - ) - - with pytest.raises(TypeError): - ArmoniKStatistics( - channel=channel, - task_filter=TaskFieldFilter.SESSION_ID == "session-id", - metrics=[TotalElapsedTime], - ) - - def test_compute(self): - with grpc.insecure_channel("url") as channel: - dummy = DummyMetric() - stats = ArmoniKStatistics( + with pytest.raises(TypeError): + ArmoniKStatistics( channel=channel, task_filter=TaskFieldFilter.SESSION_ID == "session-id", - metrics=[dummy], + metrics=[], ) - stats.client = DummyArmoniKTasks(channel) - stats.compute() - - assert dummy.updates == 2 - assert dummy.completes == 1 - assert stats.values == {"DummyMetric": "value"} - assert dummy.total == 5 - assert dummy.tasks[0] == task_batch_1 and dummy.tasks[1] == task_batch_2 - assert dummy.start == datetime(1, 1, 1, 1, 1, 0, tzinfo=timezone.utc) - assert dummy.end == datetime(1, 1, 1, 1, 1, 5, tzinfo=timezone.utc) - - -class TestAvgThroughput: - def test_avg_throughput(self): - th = AvgThroughput() - th.update(2, task_batch_2) - th.complete(start, end) - assert th.values == 2.0 / 5.0 - - -class TestTotalElapsedTime: - def test_total_elapsed_time(self): - tet = TotalElapsedTime() - tet.update(5, task_batch_1) - tet.complete(start, end) - assert tet.values == 5.0 - -class TestTimestampsTransition: - def test_constructor(self): - TimestampsTransition(TaskTimestamps.CREATED, TaskTimestamps.SUBMITTED) + with pytest.raises(TypeError): + ArmoniKStatistics( + channel=channel, + task_filter=TaskFieldFilter.SESSION_ID == "session-id", + metrics=["a", DummyMetric()], + ) with pytest.raises(TypeError): - TimestampsTransition(TaskTimestamps.CREATED, "wrong") - - with pytest.raises(ValueError): - TimestampsTransition(TaskTimestamps.SUBMITTED, TaskTimestamps.CREATED) - - def test_timestamps_transition(self): - st = TimestampsTransition(TaskTimestamps.CREATED, TaskTimestamps.ENDED) - st.update(5, task_batch_1) - st.update(5, task_batch_2) - st.complete(start, end) - assert st.values == {"avg": 12.0 / 5.0, "min": 0.0, "max": 5.0} - - -class TestTasksInStatusOverTime: - def test_task_in_status_over_time_no_next_status(self): - tisot = TasksInStatusOverTime(timestamp=TaskTimestamps.ENDED) - tisot.update(5, task_batch_1) - tisot.update(5, task_batch_2) - tisot.complete(start, end) - assert np.array_equal( - tisot.values, - np.array( - [ - [ - start, - task_batch_1[0].ended_at, - task_batch_1[1].ended_at, - task_batch_1[2].ended_at, - task_batch_2[0].ended_at, - task_batch_2[1].ended_at, - ], - [0, 1, 2, 3, 4, 5], - ] - ), - ) + ArmoniKStatistics( + channel=channel, + task_filter=TaskFieldFilter.SESSION_ID == "session-id", + metrics=[DummyMetric], + ) - def test_task_in_status_over_time_with_next_status(self): - tisot = TasksInStatusOverTime( - timestamp=TaskTimestamps.CREATED, next_timestamp=TaskTimestamps.SUBMITTED - ) - tisot.update(5, task_batch_1) - tisot.update(5, task_batch_2) - tisot.complete(start, end) - assert np.array_equal( - tisot.values, - np.array( - [ - [ - datetime(1, 1, 1, 1, 1, tzinfo=timezone.utc), - datetime(1, 1, 1, 1, 1, tzinfo=timezone.utc), - datetime(1, 1, 1, 1, 1, tzinfo=timezone.utc), - datetime(1, 1, 1, 1, 1, 0, 4, tzinfo=timezone.utc), - datetime(1, 1, 1, 1, 1, 0, 5, tzinfo=timezone.utc), - datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), - datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), - datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), - datetime(1, 1, 1, 1, 1, 1, 1, tzinfo=timezone.utc), - datetime(1, 1, 1, 1, 1, 1, 2, tzinfo=timezone.utc), - datetime(1, 1, 1, 1, 1, 1, 3, tzinfo=timezone.utc), - ], - [0, 1, 2, 1, 0, 1, 2, 3, 2, 1, 0], - ] - ), + +@pytest.mark.parametrize( + ["tasks", "start", "end"], + [ + (test_case["tasks"], test_case["start"], test_case["end"]) + for test_case in test_cases + if len(test_case["tasks"]) > 0 + ], +) +def test_compute(tasks: list[Task], start: datetime, end: datetime): + with grpc.insecure_channel("url") as channel: + dummy = DummyMetric() + stats = ArmoniKStatistics( + channel=channel, + task_filter=TaskFieldFilter.SESSION_ID == "session-id", + metrics=[dummy], ) + stats.client = DummyArmoniKTasks(channel, tasks=tasks) + stats.compute() + + assert dummy.updates == 2 + assert dummy.completes == 1 + assert stats.values == {"DummyMetric": "value"} + assert dummy.total == 2 * len(tasks) + assert dummy.start == start + assert dummy.end == end diff --git a/tests/unit/test_time_series.py b/tests/unit/test_time_series.py new file mode 100644 index 0000000..f9a9bfa --- /dev/null +++ b/tests/unit/test_time_series.py @@ -0,0 +1,57 @@ +from datetime import datetime + +import numpy as np +import pytest + +from armonik.common import Task + +from armonik_analytics.metrics import TasksInStatusOverTime +from armonik_analytics.utils import TaskTimestamps + +from conftest import test_cases + + +def test_constructor(): + TasksInStatusOverTime(TaskTimestamps.ENDED) + TasksInStatusOverTime(TaskTimestamps.CREATED, TaskTimestamps.SUBMITTED) + + with pytest.raises(ValueError): + TasksInStatusOverTime("wrong") + + with pytest.raises(ValueError): + TasksInStatusOverTime(TaskTimestamps.CREATED, "wrong") + + with pytest.raises(ValueError): + TasksInStatusOverTime(TaskTimestamps.SUBMITTED, TaskTimestamps.CREATED) + + +@pytest.mark.parametrize( + ["tasks", "start", "end", "value"], + [ + (test_case["tasks"], test_case["start"], test_case["end"], test_case["ended_over_time"]) + for test_case in test_cases + ], +) +def test_task_in_status_over_time_no_next_status( + tasks: list[Task], start: datetime, end: datetime, value: np.ndarray +): + tisot = TasksInStatusOverTime(TaskTimestamps.ENDED) + tisot.update(len(tasks), tasks) + tisot.complete(start, end) + assert np.array_equal(tisot.values, value) + + +@pytest.mark.parametrize( + ["tasks", "start", "end", "value"], + [ + (test_case["tasks"], test_case["start"], test_case["end"], test_case["created_over_time"]) + for test_case in test_cases + ], +) +def test_task_in_status_over_time_with_next_status( + tasks: list[Task], start: datetime, end: datetime, value: np.ndarray +): + tisot = TasksInStatusOverTime(TaskTimestamps.CREATED, TaskTimestamps.SUBMITTED) + tisot.update(len(tasks), tasks) + tisot.complete(start, end) + assert np.array_equal(tisot.values, value) diff --git a/tests/unit/test_transitions.py b/tests/unit/test_transitions.py new file mode 100644 index 0000000..da9a2b5 --- /dev/null +++ b/tests/unit/test_transitions.py @@ -0,0 +1,41 @@ +from datetime import datetime + +import pytest + +from armonik.common import Task + +from armonik_analytics.metrics import TimestampsTransition +from armonik_analytics.utils import TaskTimestamps + +from conftest import test_cases + + +def test_constructor(): + TimestampsTransition(TaskTimestamps.CREATED, TaskTimestamps.SUBMITTED) + + with pytest.raises(ValueError): + TimestampsTransition(TaskTimestamps.CREATED, "wrong") + + with pytest.raises(ValueError): + TimestampsTransition(TaskTimestamps.SUBMITTED, TaskTimestamps.CREATED) + + +@pytest.mark.parametrize( + ["tasks", "start", "end", "value"], + [ + ( + test_case["tasks"], + test_case["start"], + test_case["end"], + test_case["created_to_submitted"], + ) + for test_case in test_cases + ], +) +def test_timestamps_transition( + tasks: list[Task], start: datetime, end: datetime, value: dict[str, float | None] +): + st = TimestampsTransition(TaskTimestamps.CREATED, TaskTimestamps.SUBMITTED) + st.update(len(tasks), tasks) + st.complete(start, end) + assert st.values == value From b40850bc56979ffa7ac81ea7a8c2aa3defe61dca Mon Sep 17 00:00:00 2001 From: qdelamea Date: Mon, 18 Mar 2024 14:30:22 +0100 Subject: [PATCH 4/4] chore: remove deprecated config item --- ruff.toml | 3 --- 1 file changed, 3 deletions(-) diff --git a/ruff.toml b/ruff.toml index 229b585..14fefa6 100644 --- a/ruff.toml +++ b/ruff.toml @@ -10,9 +10,6 @@ fix = false # Enable application of unsafe fixes. unsafe-fixes = false -# Whether to show source code snippets when reporting lint violation. -show-source = false - # Enumerate all fixed violations. show-fixes = true