Skip to content

Commit

Permalink
Merge pull request #1 from aneoconsulting/qd/stats
Browse files Browse the repository at this point in the history
feat: first implementation of statistics
  • Loading branch information
qdelamea-aneo authored Mar 6, 2024
2 parents 5e9444e + e3112ff commit fafe9c6
Show file tree
Hide file tree
Showing 15 changed files with 699 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:
- name: Get Cover
uses: orgoro/coverage@6d7a2607343d2abeab89ef40b54ec9785134e313
with:
coverageFile: packages/python/coverage.xml
coverageFile: coverage.xml
token: ${{ secrets.GITHUB_TOKEN }}

- name: Archive code coverage results html
Expand Down
8 changes: 1 addition & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@






# armonik_analytics
# ArmoniK Analytics

Set of tools for analyzing workload execution on ArmoniK.

Expand Down
67 changes: 67 additions & 0 deletions examples/basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import argparse
import json

import grpc
import matplotlib.pyplot as plt

from armonik.client import TaskFieldFilter
from armonik_analytics import ArmoniKStatistics
from armonik_analytics.metrics import (
AvgThroughput,
TotalElapsedTime,
TimestampsTransition,
TasksInStatusOverTime,
)

from armonik_analytics.utils import TaskTimestamps


def plot_metrics(stats):
plt.figure()
for metric_name in stats.values.keys():
if metric_name.endswith("OverTime"):
values = stats.values[metric_name]
X = values[0, :]
Y = values[1, :]
X = [(x - X[0]).total_seconds() for x in X]
plt.plot(X, Y, label=metric_name)
plt.savefig("metrics.png")


def print_metrics(stats):
print(
json.dumps(
{name: value for name, value in stats.values.items() if not name.endswith("OverTime")}
)
)


if __name__ == "__main__":
parser = argparse.ArgumentParser("Compute statistics for tasks of a given session.")
parser.add_argument("--endpoint", "-e", type=str, help="ArmoniK controle plane endpoint")
parser.add_argument("--session-id", "-s", type=str, help="ID of the session")
args = parser.parse_args()

args.endpoint = args.endpoint.removeprefix("http://")

with grpc.insecure_channel(args.endpoint) as channel:
stats = ArmoniKStatistics(
channel=channel,
task_filter=TaskFieldFilter.SESSION_ID == args.session_id,
metrics=[
AvgThroughput(),
TotalElapsedTime(),
TimestampsTransition(TaskTimestamps.CREATED, TaskTimestamps.SUBMITTED),
TimestampsTransition(TaskTimestamps.SUBMITTED, TaskTimestamps.RECEIVED),
TimestampsTransition(TaskTimestamps.RECEIVED, TaskTimestamps.ACQUIRED),
TimestampsTransition(TaskTimestamps.ACQUIRED, TaskTimestamps.FETCHED),
TimestampsTransition(TaskTimestamps.FETCHED, TaskTimestamps.STARTED),
TimestampsTransition(TaskTimestamps.STARTED, TaskTimestamps.PROCESSED),
TimestampsTransition(TaskTimestamps.PROCESSED, TaskTimestamps.ENDED),
TasksInStatusOverTime(TaskTimestamps.ENDED),
],
)
stats.compute()

plot_metrics(stats)
print_metrics(stats)
8 changes: 7 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ license = {text = "Apache Software License 2.0"}
classifiers = [
"Programming Language :: Python :: 3",
]
dependencies = []
dependencies = [
'armonik>=3.16.1',
'numpy',
]

[project.urls]
Homepage = "https://github.com/aneoconsulting/ArmoniK.Analytics"
Expand Down Expand Up @@ -43,3 +46,6 @@ tests = [
'pytest-cov',
'pytest-benchmark[histogram]',
]
samples = [
'matplotlib'
]
6 changes: 6 additions & 0 deletions src/armonik_analytics/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
from .stats import ArmoniKStatistics


__version__ = "0.1.0"


__all__ = ["__version__", "ArmoniKStatistics"]
2 changes: 0 additions & 2 deletions src/armonik_analytics/hello.py

This file was deleted.

13 changes: 13 additions & 0 deletions src/armonik_analytics/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from .base import ArmoniKMetric
from .common import AvgThroughput, TotalElapsedTime
from .time_series import TasksInStatusOverTime
from .transitions import TimestampsTransition


__all__ = [
"ArmoniKMetric",
"AvgThroughput",
"TotalElapsedTime",
"TasksInStatusOverTime",
"TimestampsTransition",
]
42 changes: 42 additions & 0 deletions src/armonik_analytics/metrics/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from abc import ABC, abstractproperty
from datetime import datetime

from armonik.common import Task


class ArmoniKMetric(ABC):
"""
Abstract base class for ArmoniK metrics.
"""

def update(self, total: int, tasks: list[Task]) -> None:
"""
Abstract method to be override.
Update the metric with a given task batch.
Args:
total (int): Total number of task on which the metric is computed.
tasks (list[Task]): A task batch.
"""
pass

def complete(self, start: datetime, end: datetime) -> None:
"""
Complete the metric computation.
Args:
start (datetime): The start datetime.
end (datetime): The end datetime.
"""
pass

@abstractproperty
def values(self) -> any:
"""
Abstract method to be override.
Property to access the values of the metric.
Return:
any: The values of the metric.
"""
pass
73 changes: 73 additions & 0 deletions src/armonik_analytics/metrics/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from datetime import datetime

from .base import ArmoniKMetric
from armonik.common import Task


class TotalElapsedTime(ArmoniKMetric):
"""
A metric to compute the total elapsed time between the first task and the last task.
"""

def __init__(self) -> None:
self.elapsed = None

def complete(self, start: datetime, end: datetime) -> None:
"""
Calculate the total elapsed time.
Args:
start (datetime): The start time.
end (datetime): The end time.
"""
self.elapsed = (end - start).total_seconds()

@property
def values(self) -> float:
"""
Return the total elapsed time as the metric value.
Return:
int: The total elasped time.
"""
return self.elapsed


class AvgThroughput(ArmoniKMetric):
"""
A metric to compute the average throughput.
"""

def __init__(self) -> None:
self.throughput = None
self.total = None

def update(self, total: int, tasks: list[Task]) -> None:
"""
Update the total number of tasks.
Args:
total (int): Total number of tasks.
tasks (list[Task]): A task batch.
"""
self.total = total

def complete(self, start: datetime, end: datetime) -> None:
"""
Calculate the average throughput.
Args:
start (datetime): The start time.
end (datetime): The end time.
"""
self.throughput = self.total / (end - start).total_seconds()

@property
def values(self) -> float:
"""
Return the average throughput as the metric value.
Return:
int: The average throughput.
"""
return self.throughput
96 changes: 96 additions & 0 deletions src/armonik_analytics/metrics/time_series.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from datetime import datetime

import numpy as np
from armonik.common import Task

from .base import ArmoniKMetric
from ..utils import TaskTimestamps


class TasksInStatusOverTime(ArmoniKMetric):
"""
A metric to track tasks in a particular status over time.
"""

def __init__(
self, timestamp: TaskTimestamps, next_timestamp: TaskTimestamps | None = None
) -> None:
"""
Initialize the metric.
Args:
timestamp (str): The current timestamp of the tasks.
next_timestamp (str, optional): The next timestamp of the tasks. Defaults to None.
"""
self.timestamp = timestamp
self.next_timestamp = next_timestamp
self.timestamps = None
self.index = 0

@property
def timestamp(self) -> TaskTimestamps:
return self.__timestamp

@timestamp.setter
def timestamp(self, __value: TaskTimestamps) -> None:
if __value not in TaskTimestamps:
raise ValueError(f"{__value} is not a valid timestamp.")
self.__timestamp = __value

@property
def next_timestamp(self) -> TaskTimestamps:
return self.__next_timestamp

@next_timestamp.setter
def next_timestamp(self, __value: TaskTimestamps) -> None:
if __value is not None:
assert __value in TaskTimestamps
if __value < self.timestamp:
raise ValueError(
f"Inconsistent timestamp order '{self.timestamp.name}' is not prior to '{__value.name}'."
)
self.__next_timestamp = __value

def update(self, total: int, tasks: list[Task]) -> None:
"""
Update the metric.
Args:
total (int): Total number of tasks.
tasks (list[Task]): A task batch.
"""
n_tasks = len(tasks)
if self.timestamps is None:
n = (2 * total) + 1 if self.next_timestamp else total + 1
self.timestamps = np.memmap("timestamps.dat", dtype=object, mode="w+", shape=(2, n))
self.index = 1
self.timestamps[:, self.index : self.index + n_tasks] = [
[getattr(t, f"{self.timestamp.name.lower()}_at") for t in tasks],
n_tasks * [1],
]
self.index += n_tasks
if self.next_timestamp:
self.timestamps[:, self.index : self.index + n_tasks] = [
[getattr(t, f"{self.next_timestamp.name.lower()}_at") for t in tasks],
n_tasks * [-1],
]
self.index += n_tasks

def complete(self, start: datetime, end: datetime) -> None:
"""
Complete the metric calculation.
Args:
start (datetime): The start time.
end (datetime): The end time.
"""
self.timestamps[:, 0] = (start, 0)
self.timestamps = self.timestamps[:, self.timestamps[0, :].argsort()]
self.timestamps[1, :] = np.cumsum(self.timestamps[1, :])

@property
def values(self):
"""
Return the timestamps as the metric values.
"""
return self.timestamps
Loading

0 comments on commit fafe9c6

Please sign in to comment.