Skip to content

Commit

Permalink
feat: improve statistics implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
qdelamea-aneo committed Mar 6, 2024
1 parent d601e05 commit aebdd66
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 55 deletions.
63 changes: 40 additions & 23 deletions examples/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,35 @@
import matplotlib.pyplot as plt

from armonik.client import TaskFieldFilter
from armonik.stats import ArmoniKStatistics
from armonik.stats.metrics import AvgThroughput, TotalElapsedTime, TimestampsTransition, TasksInStatusOverTime
from armonik_analytics import ArmoniKStatistics
from armonik_analytics.metrics import (
AvgThroughput,
TotalElapsedTime,
TimestampsTransition,
TasksInStatusOverTime,
)

from armonik_analytics.utils import TaskTimestamps


def plot_metrics(stats):
plt.figure()
for metric_name in stats.values.keys():
if metric_name.endswith("OverTime"):
values = stats.values[metric_name]
X = values[0, :]
Y = values[1, :]
X = [(x - X[0]).total_seconds() for x in X]
plt.plot(X, Y, label=metric_name)
plt.savefig("metrics.png")


def print_metrics(stats):
print(
json.dumps(
{name: value for name, value in stats.values.items() if not name.endswith("OverTime")}
)
)


if __name__ == "__main__":
Expand All @@ -25,27 +52,17 @@
metrics=[
AvgThroughput(),
TotalElapsedTime(),
TimestampsTransition("created", "submitted"),
TimestampsTransition("submitted", "received"),
TimestampsTransition("received", "acquired"),
TimestampsTransition("acquired", "fetched"),
TimestampsTransition("fetched", "started"),
TimestampsTransition("started", "processed"),
TimestampsTransition("processed", "ended"),
TasksInStatusOverTime("processed", "ended"),
TasksInStatusOverTime("ended"),
]
TimestampsTransition(TaskTimestamps.CREATED, TaskTimestamps.SUBMITTED),
TimestampsTransition(TaskTimestamps.SUBMITTED, TaskTimestamps.RECEIVED),
TimestampsTransition(TaskTimestamps.RECEIVED, TaskTimestamps.ACQUIRED),
TimestampsTransition(TaskTimestamps.ACQUIRED, TaskTimestamps.FETCHED),
TimestampsTransition(TaskTimestamps.FETCHED, TaskTimestamps.STARTED),
TimestampsTransition(TaskTimestamps.STARTED, TaskTimestamps.PROCESSED),
TimestampsTransition(TaskTimestamps.PROCESSED, TaskTimestamps.ENDED),
TasksInStatusOverTime(TaskTimestamps.ENDED),
],
)
stats.compute()

plt.figure()
for metric_name in stats.values.keys():
if metric_name.endswith("OverTime"):
values = stats.values[metric_name]
X = values[0,:]
Y = values[1,:]
X = [(x - X[0]).total_seconds() for x in X]
plt.plot(X, Y, label=metric_name)
plt.savefig(f"metrics.png")

print(json.dumps({name: value for name, value in stats.values.items() if not name.endswith("OverTime")}))
plot_metrics(stats)
print_metrics(stats)
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ classifiers = [
"Programming Language :: Python :: 3",
]
dependencies = [
'armonik',
'armonik>=3.16.1',
'numpy',
]

Expand Down Expand Up @@ -46,3 +46,6 @@ tests = [
'pytest-cov',
'pytest-benchmark[histogram]',
]
samples = [
'matplotlib'
]
8 changes: 7 additions & 1 deletion src/armonik_analytics/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
__version__ = "0.1.0"
from .stats import ArmoniKStatistics


__version__ = "0.1.0"


__all__ = ["__version__", "ArmoniKStatistics"]
7 changes: 3 additions & 4 deletions src/armonik_analytics/metrics/base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import abc

from abc import ABC, abstractproperty
from datetime import datetime

from armonik.common import Task


class ArmoniKMetric(abc.ABC):
class ArmoniKMetric(ABC):
"""
Abstract base class for ArmoniK metrics.
"""
Expand All @@ -31,7 +30,7 @@ def complete(self, start: datetime, end: datetime) -> None:
"""
pass

@abc.abstractproperty
@abstractproperty
def values(self) -> any:
"""
Abstract method to be override.
Expand Down
2 changes: 1 addition & 1 deletion src/armonik_analytics/metrics/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def complete(self, start: datetime, end: datetime) -> None:
self.throughput = self.total / (end - start).total_seconds()

@property
def values(self) -> int:
def values(self) -> float:
"""
Return the average throughput as the metric value.
Expand Down
35 changes: 30 additions & 5 deletions src/armonik_analytics/metrics/time_series.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
from datetime import datetime

import numpy as np
from armonik.common import Task

from .base import ArmoniKMetric
from armonik.common import Task
from ..utils import TaskTimestamps


class TasksInStatusOverTime(ArmoniKMetric):
"""
A metric to track tasks in a particular status over time.
"""

def __init__(self, timestamp, next_timestamp=None) -> None:
def __init__(self, timestamp: TaskTimestamps, next_timestamp: TaskTimestamps | None = None) -> None:
"""
Initialize the metric.
Expand All @@ -24,6 +25,30 @@ def __init__(self, timestamp, next_timestamp=None) -> None:
self.timestamps = None
self.index = 0

@property
def timestamp(self) -> TaskTimestamps:
return self.__timestamp

@timestamp.setter
def timestamp(self, __value: TaskTimestamps) -> None:
if __value not in TaskTimestamps:
raise ValueError(f"{__value} is not a valid timestamp.")
self.__timestamp = __value

@property
def next_timestamp(self) -> TaskTimestamps:
return self.__next_timestamp

@next_timestamp.setter
def next_timestamp(self, __value: TaskTimestamps) -> None:
if __value is not None:
assert __value in TaskTimestamps
if __value < self.timestamp:
raise ValueError(
f"Inconsistent timestamp order '{self.timestamp.name}' is not prior to '{__value.name}'."
)
self.__next_timestamp = __value

def update(self, total: int, tasks: list[Task]) -> None:
"""
Update the metric.
Expand All @@ -34,17 +59,17 @@ def update(self, total: int, tasks: list[Task]) -> None:
"""
n_tasks = len(tasks)
if self.timestamps is None:
n = total * 2 + 1 if self.next_timestamp else total + 1
n = (2 * total) + 1 if self.next_timestamp else total + 1
self.timestamps = np.memmap("timestamps.dat", dtype=object, mode="w+", shape=(2, n))
self.index = 1
self.timestamps[:, self.index : self.index + n_tasks] = [
[getattr(t, f"{self.timestamp}_at") for t in tasks],
[getattr(t, f"{self.timestamp.name.lower()}_at") for t in tasks],
n_tasks * [1],
]
self.index += n_tasks
if self.next_timestamp:
self.timestamps[:, self.index : self.index + n_tasks] = [
[getattr(t, f"{self.next_timestamp}_at") for t in tasks],
[getattr(t, f"{self.next_timestamp.name.lower()}_at") for t in tasks],
n_tasks * [-1],
]
self.index += n_tasks
Expand Down
23 changes: 10 additions & 13 deletions src/armonik_analytics/metrics/transitions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import numpy as np
from armonik.common import Task

from .base import ArmoniKMetric
from armonik.common import Task, TaskTimestamps
from ..utils import TaskTimestamps


class TimestampsTransition(ArmoniKMetric):
Expand All @@ -21,12 +22,10 @@ def __init__(self, timestamp_1: str, timestamp_2: str) -> None:
self.avg = 0
self.min = None
self.max = None
self.__class__.__qualname__ = (
f"{self.timestamps[0].capitalize()}To{self.timestamps[1].capitalize()}"
)
self.__class__.__qualname__ = f"{self.timestamps[0].name.lower().capitalize()}To{self.timestamps[1].name.capitalize()}"

@property
def timestamps(self) -> tuple[str, str]:
def timestamps(self) -> tuple[TaskTimestamps, TaskTimestamps]:
"""
Get the timestamps.
Expand All @@ -36,7 +35,7 @@ def timestamps(self) -> tuple[str, str]:
return self.__timestamps

@timestamps.setter
def timestamps(self, __value: tuple[str, str]) -> None:
def timestamps(self, __value: tuple[TaskTimestamps, TaskTimestamps]) -> None:
"""
Set the timestamps.
Expand All @@ -47,13 +46,10 @@ def timestamps(self, __value: tuple[str, str]) -> None:
ValueError: If the timestamps are not valid or in inconsistent order.
"""
for timestamp in __value:
if not TaskTimestamps.has_value(timestamp):
raise ValueError(f"{timestamp} is not a valid timestamp.")
if getattr(TaskTimestamps, __value[0].upper()) > getattr(
TaskTimestamps, __value[1].upper()
):
assert timestamp in TaskTimestamps
if __value[0] > __value[1]:
raise ValueError(
f"Inconsistent timestamp order '{__value[0]}' is not prior to '{__value[1]}'."
f"Inconsistent timestamp order '{__value[0].name}' is not prior to '{__value[1].name}'."
)
self.__timestamps = __value

Expand All @@ -68,7 +64,8 @@ def update(self, total: int, tasks: list[Task]) -> None:
"""
deltas = [
(
getattr(t, f"{self.timestamps[1]}_at") - getattr(t, f"{self.timestamps[0]}_at")
getattr(t, f"{self.timestamps[1].name.lower()}_at")
- getattr(t, f"{self.timestamps[0].name.lower()}_at")
).total_seconds()
for t in tasks
]
Expand Down
12 changes: 12 additions & 0 deletions src/armonik_analytics/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from enum import IntEnum


class TaskTimestamps(IntEnum):
CREATED = 0
SUBMITTED = 1
RECEIVED = 2
ACQUIRED = 3
FETCHED = 4
STARTED = 5
PROCESSED = 6
ENDED = 7
15 changes: 8 additions & 7 deletions tests/unit/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
TimestampsTransition,
TasksInStatusOverTime,
)
from armonik_analytics.utils import TaskTimestamps


class DummyMetric(ArmoniKMetric):
Expand Down Expand Up @@ -161,16 +162,16 @@ def test_total_elapsed_time(self):

class TestTimestampsTransition:
def test_constructor(self):
TimestampsTransition("created", "submitted")
TimestampsTransition(TaskTimestamps.CREATED, TaskTimestamps.SUBMITTED)

with pytest.raises(ValueError):
TimestampsTransition("created", "wrong")
with pytest.raises(TypeError):
TimestampsTransition(TaskTimestamps.CREATED, "wrong")

with pytest.raises(ValueError):
TimestampsTransition("submitted", "created")
TimestampsTransition(TaskTimestamps.SUBMITTED, TaskTimestamps.CREATED)

def test_timestamps_transition(self):
st = TimestampsTransition("created", "ended")
st = TimestampsTransition(TaskTimestamps.CREATED, TaskTimestamps.ENDED)
st.update(5, task_batch_1)
st.update(5, task_batch_2)
st.complete(start, end)
Expand All @@ -179,7 +180,7 @@ def test_timestamps_transition(self):

class TestTasksInStatusOverTime:
def test_task_in_status_over_time_no_next_status(self):
tisot = TasksInStatusOverTime(timestamp="ended")
tisot = TasksInStatusOverTime(timestamp=TaskTimestamps.ENDED)
tisot.update(5, task_batch_1)
tisot.update(5, task_batch_2)
tisot.complete(start, end)
Expand All @@ -201,7 +202,7 @@ def test_task_in_status_over_time_no_next_status(self):
)

def test_task_in_status_over_time_with_next_status(self):
tisot = TasksInStatusOverTime(timestamp="created", next_timestamp="submitted")
tisot = TasksInStatusOverTime(timestamp=TaskTimestamps.CREATED, next_timestamp=TaskTimestamps.SUBMITTED)
tisot.update(5, task_batch_1)
tisot.update(5, task_batch_2)
tisot.complete(start, end)
Expand Down

0 comments on commit aebdd66

Please sign in to comment.