Skip to content

Commit

Permalink
feat: first implementation of statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
qdelamea-aneo committed Mar 1, 2024
1 parent 5e9444e commit 0088dc9
Show file tree
Hide file tree
Showing 13 changed files with 636 additions and 16 deletions.
8 changes: 1 addition & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@






# armonik_analytics
# ArmoniK Analytics

Set of tools for analyzing workload execution on ArmoniK.

Expand Down
51 changes: 51 additions & 0 deletions examples/basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import argparse
import json

import grpc
import matplotlib.pyplot as plt

from armonik.client import TaskFieldFilter
from armonik.stats import ArmoniKStatistics
from armonik.stats.metrics import AvgThroughput, TotalElapsedTime, TimestampsTransition, TasksInStatusOverTime


if __name__ == "__main__":
parser = argparse.ArgumentParser("Compute statistics for tasks of a given session.")
parser.add_argument("--endpoint", "-e", type=str, help="ArmoniK controle plane endpoint")
parser.add_argument("--session-id", "-s", type=str, help="ID of the session")
args = parser.parse_args()

if args.endpoint.startswith("http://"):
args.endpoint = args.endpoint.removeprefix("http://")

with grpc.insecure_channel(args.endpoint) as channel:
stats = ArmoniKStatistics(
channel=channel,
task_filter=TaskFieldFilter.SESSION_ID == args.session_id,
metrics=[
AvgThroughput(),
TotalElapsedTime(),
TimestampsTransition("created", "submitted"),
TimestampsTransition("submitted", "received"),
TimestampsTransition("received", "acquired"),
TimestampsTransition("acquired", "fetched"),
TimestampsTransition("fetched", "started"),
TimestampsTransition("started", "processed"),
TimestampsTransition("processed", "ended"),
TasksInStatusOverTime("processed", "ended"),
TasksInStatusOverTime("ended"),
]
)
stats.compute()

plt.figure()
for metric_name in stats.values.keys():
if metric_name.endswith("OverTime"):
values = stats.values[metric_name]
X = values[0,:]
Y = values[1,:]
X = [(x - X[0]).total_seconds() for x in X]
plt.plot(X, Y, label=metric_name)
plt.savefig(f"metrics.png")

print(json.dumps({name: value for name, value in stats.values.items() if not name.endswith("OverTime")}))
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ license = {text = "Apache Software License 2.0"}
classifiers = [
"Programming Language :: Python :: 3",
]
dependencies = []
dependencies = [
'armonik',
'numpy',
]

[project.urls]
Homepage = "https://github.com/aneoconsulting/ArmoniK.Analytics"
Expand Down
2 changes: 1 addition & 1 deletion src/armonik_analytics/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.1.0"
__version__ = "0.1.0"
2 changes: 0 additions & 2 deletions src/armonik_analytics/hello.py

This file was deleted.

13 changes: 13 additions & 0 deletions src/armonik_analytics/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from .base import ArmoniKMetric
from .common import AvgThroughput, TotalElapsedTime
from .time_series import TasksInStatusOverTime
from .transitions import TimestampsTransition


__all__ = [
"ArmoniKMetric",
"AvgThroughput",
"TotalElapsedTime",
"TasksInStatusOverTime",
"TimestampsTransition",
]
43 changes: 43 additions & 0 deletions src/armonik_analytics/metrics/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import abc

from datetime import datetime

from armonik.common import Task


class ArmoniKMetric(abc.ABC):
"""
Abstract base class for ArmoniK metrics.
"""

def update(self, total: int, tasks: list[Task]) -> None:
"""
Abstract method to be override.
Update the metric with a given task batch.
Args:
total (int): Total number of task on which the metric is computed.
tasks (list[Task]): A task batch.
"""
pass

def complete(self, start: datetime, end: datetime) -> None:
"""
Complete the metric computation.
Args:
start (datetime): The start datetime.
end (datetime): The end datetime.
"""
pass

@abc.abstractproperty
def values(self) -> any:
"""
Abstract method to be override.
Property to access the values of the metric.
Return:
any: The values of the metric.
"""
pass
73 changes: 73 additions & 0 deletions src/armonik_analytics/metrics/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from datetime import datetime

from .base import ArmoniKMetric
from armonik.common import Task


class TotalElapsedTime(ArmoniKMetric):
"""
A metric to compute the total elapsed time between the first task and the last task.
"""

def __init__(self) -> None:
self.elapsed = None

def complete(self, start: datetime, end: datetime) -> None:
"""
Calculate the total elapsed time.
Args:
start (datetime): The start time.
end (datetime): The end time.
"""
self.elapsed = (end - start).total_seconds()

@property
def values(self) -> float:
"""
Return the total elapsed time as the metric value.
Return:
int: The total elasped time.
"""
return self.elapsed


class AvgThroughput(ArmoniKMetric):
"""
A metric to compute the average throughput.
"""

def __init__(self) -> None:
self.throughput = None
self.total = None

def update(self, total: int, tasks: list[Task]) -> None:
"""
Update the total number of tasks.
Args:
total (int): Total number of tasks.
tasks (list[Task]): A task batch.
"""
self.total = total

def complete(self, start: datetime, end: datetime) -> None:
"""
Calculate the average throughput.
Args:
start (datetime): The start time.
end (datetime): The end time.
"""
self.throughput = self.total / (end - start).total_seconds()

@property
def values(self) -> int:
"""
Return the average throughput as the metric value.
Return:
int: The average throughput.
"""
return self.throughput
69 changes: 69 additions & 0 deletions src/armonik_analytics/metrics/time_series.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from datetime import datetime

import numpy as np

from .base import ArmoniKMetric
from armonik.common import Task


class TasksInStatusOverTime(ArmoniKMetric):
"""
A metric to track tasks in a particular status over time.
"""

def __init__(self, timestamp, next_timestamp=None) -> None:
"""
Initialize the metric.
Args:
timestamp (str): The current timestamp of the tasks.
next_timestamp (str, optional): The next timestamp of the tasks. Defaults to None.
"""
self.timestamp = timestamp
self.next_timestamp = next_timestamp
self.timestamps = None
self.index = 0

def update(self, total: int, tasks: list[Task]) -> None:
"""
Update the metric.
Args:
total (int): Total number of tasks.
tasks (list[Task]): A task batch.
"""
n_tasks = len(tasks)
if self.timestamps is None:
n = total * 2 + 1 if self.next_timestamp else total + 1
self.timestamps = np.memmap("timestamps.dat", dtype=object, mode="w+", shape=(2, n))
self.index = 1
self.timestamps[:, self.index : self.index + n_tasks] = [
[getattr(t, f"{self.timestamp}_at") for t in tasks],
n_tasks * [1],
]
self.index += n_tasks
if self.next_timestamp:
self.timestamps[:, self.index : self.index + n_tasks] = [
[getattr(t, f"{self.next_timestamp}_at") for t in tasks],
n_tasks * [-1],
]
self.index += n_tasks

def complete(self, start: datetime, end: datetime) -> None:
"""
Complete the metric calculation.
Args:
start (datetime): The start time.
end (datetime): The end time.
"""
self.timestamps[:, 0] = (start, 0)
self.timestamps = self.timestamps[:, self.timestamps[0, :].argsort()]
self.timestamps[1, :] = np.cumsum(self.timestamps[1, :])

@property
def values(self):
"""
Return the timestamps as the metric values.
"""
return self.timestamps
91 changes: 91 additions & 0 deletions src/armonik_analytics/metrics/transitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import numpy as np

from .base import ArmoniKMetric
from armonik.common import Task, TaskTimestamps


class TimestampsTransition(ArmoniKMetric):
"""
Metric to compute statistics on transitions between two timestamps in tasks.
"""

def __init__(self, timestamp_1: str, timestamp_2: str) -> None:
"""
Initialize the metric.
Args:
timestamp_1 (str): The first timestamp.
timestamp_2 (str): The second timestamp.
"""
self.timestamps = (timestamp_1, timestamp_2)
self.avg = 0
self.min = None
self.max = None
self.__class__.__qualname__ = (
f"{self.timestamps[0].capitalize()}To{self.timestamps[1].capitalize()}"
)

@property
def timestamps(self) -> tuple[str, str]:
"""
Get the timestamps.
Returns:
tuple[str, str]: A tuple containing two timestamps.
"""
return self.__timestamps

@timestamps.setter
def timestamps(self, __value: tuple[str, str]) -> None:
"""
Set the timestamps.
Args:
__value (tuple[str, str]): A tuple containing two timestamps.
Raises:
ValueError: If the timestamps are not valid or in inconsistent order.
"""
for timestamp in __value:
if not TaskTimestamps.has_value(timestamp):
raise ValueError(f"{timestamp} is not a valid timestamp.")
if getattr(TaskTimestamps, __value[0].upper()) > getattr(
TaskTimestamps, __value[1].upper()
):
raise ValueError(
f"Inconsistent timestamp order '{__value[0]}' is not prior to '{__value[1]}'."
)
self.__timestamps = __value

def update(self, total: int, tasks: list[Task]) -> None:
"""
Update the metric with new data.
Update the average, minimum, and maximum transition times between the two timestamps.
Args:
total (int): Total number of tasks.
tasks (list[Task]): List of tasks.
"""
deltas = [
(
getattr(t, f"{self.timestamps[1]}_at") - getattr(t, f"{self.timestamps[0]}_at")
).total_seconds()
for t in tasks
]
self.avg += np.sum(deltas) / total
min = np.min(deltas)
max = np.max(deltas)
if self.max is None or self.max < max:
self.max = max
if self.min is None or self.min > min:
self.min = min

@property
def values(self) -> dict[str, float]:
"""
Get the computed values.
Returns:
dict[str, float]: A dictionary containing the average, minimum, and maximum transition times.
"""
return {"avg": self.avg, "min": self.min, "max": self.max}
Loading

0 comments on commit 0088dc9

Please sign in to comment.