Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for tasks with incomplete timestamps #3

Merged
merged 4 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions examples/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

def plot_metrics(stats):
plt.figure()
plt.xlabel("Time (s)")
plt.ylabel("Number of tasks")
plt.legend(loc="upper right")
plt.title("Tasks in status over time")
for metric_name in stats.values.keys():
if metric_name.endswith("OverTime"):
values = stats.values[metric_name]
Expand Down Expand Up @@ -58,6 +62,13 @@ def print_metrics(stats):
TimestampsTransition(TaskTimestamps.FETCHED, TaskTimestamps.STARTED),
TimestampsTransition(TaskTimestamps.STARTED, TaskTimestamps.PROCESSED),
TimestampsTransition(TaskTimestamps.PROCESSED, TaskTimestamps.ENDED),
TasksInStatusOverTime(TaskTimestamps.CREATED, TaskTimestamps.SUBMITTED),
TasksInStatusOverTime(TaskTimestamps.SUBMITTED, TaskTimestamps.RECEIVED),
TasksInStatusOverTime(TaskTimestamps.RECEIVED, TaskTimestamps.ACQUIRED),
TasksInStatusOverTime(TaskTimestamps.ACQUIRED, TaskTimestamps.FETCHED),
TasksInStatusOverTime(TaskTimestamps.FETCHED, TaskTimestamps.STARTED),
TasksInStatusOverTime(TaskTimestamps.STARTED, TaskTimestamps.PROCESSED),
TasksInStatusOverTime(TaskTimestamps.PROCESSED, TaskTimestamps.ENDED),
TasksInStatusOverTime(TaskTimestamps.ENDED),
],
)
Expand Down
3 changes: 0 additions & 3 deletions ruff.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ fix = false
# Enable application of unsafe fixes.
unsafe-fixes = false

# Whether to show source code snippets when reporting lint violation.
show-source = false

# Enumerate all fixed violations.
show-fixes = true

Expand Down
4 changes: 4 additions & 0 deletions src/armonik_analytics/metrics/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ def complete(self, start: datetime, end: datetime) -> None:
"""
pass

@property
def name(self) -> str:
return self.__class__.__name__

@abstractproperty
def values(self) -> any:
"""
Expand Down
23 changes: 17 additions & 6 deletions src/armonik_analytics/metrics/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@ class TotalElapsedTime(ArmoniKMetric):
def __init__(self) -> None:
self.elapsed = None

def complete(self, start: datetime, end: datetime) -> None:
def complete(self, start: datetime | None, end: datetime | None) -> None:
"""
Calculate the total elapsed time.

Args:
start (datetime): The start time.
end (datetime): The end time.
"""
self.elapsed = (end - start).total_seconds()
if isinstance(start, datetime) and isinstance(end, datetime):
self.elapsed = (end - start).total_seconds()
else:
self.elapsed = None

@property
def values(self) -> float:
Expand All @@ -40,7 +43,7 @@ class AvgThroughput(ArmoniKMetric):

def __init__(self) -> None:
self.throughput = None
self.total = None
self.ended = None

def update(self, total: int, tasks: list[Task]) -> None:
"""
Expand All @@ -50,17 +53,25 @@ def update(self, total: int, tasks: list[Task]) -> None:
total (int): Total number of tasks.
tasks (list[Task]): A task batch.
"""
self.total = total
n_ended = len([t for t in tasks if t.ended_at])
self.ended = self.ended + n_ended if self.ended else n_ended

def complete(self, start: datetime, end: datetime) -> None:
def complete(self, start: datetime | None, end: datetime | None) -> None:
"""
Calculate the average throughput.

Args:
start (datetime): The start time.
end (datetime): The end time.
"""
self.throughput = self.total / (end - start).total_seconds()
if (
isinstance(self.ended, int)
and isinstance(start, datetime)
and isinstance(end, datetime)
):
self.throughput = self.ended / (end - start).total_seconds()
else:
self.throughput = None

@property
def values(self) -> float:
Expand Down
53 changes: 38 additions & 15 deletions src/armonik_analytics/metrics/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,20 @@ def __init__(
self.timestamp = timestamp
self.next_timestamp = next_timestamp
self.timestamps = None
self.steps = None
self.index = 0

@property
def name(self) -> str:
return f"{self.timestamp.name.capitalize()}TasksOverTime"

@property
def timestamp(self) -> TaskTimestamps:
return self.__timestamp

@timestamp.setter
def timestamp(self, __value: TaskTimestamps) -> None:
if __value not in TaskTimestamps:
raise ValueError(f"{__value} is not a valid timestamp.")
__value = TaskTimestamps(__value)
self.__timestamp = __value

@property
Expand All @@ -44,7 +48,7 @@ def next_timestamp(self) -> TaskTimestamps:
@next_timestamp.setter
def next_timestamp(self, __value: TaskTimestamps) -> None:
if __value is not None:
assert __value in TaskTimestamps
__value = TaskTimestamps(__value)
if __value < self.timestamp:
raise ValueError(
f"Inconsistent timestamp order '{self.timestamp.name}' is not prior to '{__value.name}'."
Expand All @@ -62,35 +66,54 @@ def update(self, total: int, tasks: list[Task]) -> None:
n_tasks = len(tasks)
if self.timestamps is None:
n = (2 * total) + 1 if self.next_timestamp else total + 1
self.timestamps = np.memmap("timestamps.dat", dtype=object, mode="w+", shape=(2, n))
self.timestamps = np.memmap(
f"{self.name}_timestamps.dat", dtype=datetime, mode="w+", shape=(n,)
)
self.steps = np.memmap(f"{self.name}_steps.dat", dtype=np.int8, mode="w+", shape=(n,))
self.index = 1
self.timestamps[:, self.index : self.index + n_tasks] = [
[getattr(t, f"{self.timestamp.name.lower()}_at") for t in tasks],
n_tasks * [1],
self.timestamps[self.index : self.index + n_tasks] = [
getattr(t, f"{self.timestamp.name.lower()}_at") for t in tasks
]
self.steps[self.index : self.index + n_tasks] = n_tasks * [1]
self.index += n_tasks
if self.next_timestamp:
self.timestamps[:, self.index : self.index + n_tasks] = [
[getattr(t, f"{self.next_timestamp.name.lower()}_at") for t in tasks],
n_tasks * [-1],
self.timestamps[self.index : self.index + n_tasks] = [
getattr(t, f"{self.next_timestamp.name.lower()}_at") for t in tasks
]
self.steps[self.index : self.index + n_tasks] = n_tasks * [-1]
self.index += n_tasks

def complete(self, start: datetime, end: datetime) -> None:
def complete(self, start: datetime | None, end: datetime | None) -> None:
"""
Complete the metric calculation.

Args:
start (datetime): The start time.
end (datetime): The end time.
"""
self.timestamps[:, 0] = (start, 0)
self.timestamps = self.timestamps[:, self.timestamps[0, :].argsort()]
self.timestamps[1, :] = np.cumsum(self.timestamps[1, :])
if start is None or self.timestamps.shape[0] == 1:
self.timestamps = None
self.steps = None
return
# Add start date with no step
self.timestamps[0] = start
self.steps[0] = 0
# Remove inconsistent data (due to missing timestamps in task metadata)
inconsistent_values = np.atleast_1d(self.timestamps == np.array(None)).nonzero()[0]
self.timestamps = np.delete(self.timestamps, inconsistent_values)
self.steps = np.delete(self.steps, inconsistent_values)
# Sort the arrays by timestamp dates
sort_indices = self.timestamps.argsort()
self.timestamps = self.timestamps[sort_indices]
self.steps = self.steps[sort_indices]
# Compute the number of task in the given timestamp over time
self.steps = np.cumsum(self.steps)

@property
def values(self):
"""
Return the timestamps as the metric values.
"""
return self.timestamps
if self.timestamps is None:
return None
return np.vstack((self.timestamps, self.steps))
32 changes: 22 additions & 10 deletions src/armonik_analytics/metrics/transitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ def __init__(self, timestamp_1: str, timestamp_2: str) -> None:
timestamp_2 (str): The second timestamp.
"""
self.timestamps = (timestamp_1, timestamp_2)
self.avg = 0
self.avg = None
self.min = None
self.max = None
self.__class__.__qualname__ = f"{self.timestamps[0].name.lower().capitalize()}To{self.timestamps[1].name.capitalize()}"

@property
def name(self) -> str:
return f"{self.timestamps[0].name.capitalize()}To{self.timestamps[1].name.capitalize()}"

@property
def timestamps(self) -> tuple[TaskTimestamps, TaskTimestamps]:
Expand All @@ -46,7 +49,7 @@ def timestamps(self, __value: tuple[TaskTimestamps, TaskTimestamps]) -> None:
ValueError: If the timestamps are not valid or in inconsistent order.
"""
for timestamp in __value:
assert timestamp in TaskTimestamps
TaskTimestamps(timestamp)
if __value[0] > __value[1]:
raise ValueError(
f"Inconsistent timestamp order '{__value[0].name}' is not prior to '{__value[1].name}'."
Expand All @@ -68,14 +71,23 @@ def update(self, total: int, tasks: list[Task]) -> None:
- getattr(t, f"{self.timestamps[0].name.lower()}_at")
).total_seconds()
for t in tasks
if (
getattr(t, f"{self.timestamps[1].name.lower()}_at") is not None
and getattr(t, f"{self.timestamps[0].name.lower()}_at") is not None
)
]
self.avg += np.sum(deltas) / total
min = np.min(deltas)
max = np.max(deltas)
if self.max is None or self.max < max:
self.max = max
if self.min is None or self.min > min:
self.min = min
if deltas:
self.avg = (
self.avg + np.sum(deltas) / len(deltas)
if self.avg
else np.sum(deltas) / len(deltas)
)
min = np.min(deltas)
max = np.max(deltas)
if self.max is None or self.max < max:
self.max = max
if self.min is None or self.min > min:
self.min = min

@property
def values(self) -> dict[str, float]:
Expand Down
18 changes: 11 additions & 7 deletions src/armonik_analytics/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,16 @@ def compute(self) -> None:
while tasks:
for metric in self.metrics:
metric.update(total, tasks)
min_start = np.min([t.created_at for t in tasks])
max_end = np.max([t.ended_at for t in tasks])
if start is None or min_start < start:
start = min_start
if end is None or max_end > end:
end = max_end
starts = [t.created_at for t in tasks if t.created_at]
ends = [t.ended_at for t in tasks if t.ended_at]
if starts:
min_start = np.min(starts)
if start is None or min_start < start:
start = min_start
if ends:
max_end = np.max(ends)
if end is None or max_end > end:
end = max_end
page += 1
_, tasks = self.client.list_tasks(task_filter=self.filter, page=page)

Expand All @@ -59,4 +63,4 @@ def compute(self) -> None:
@property
def values(self):
"""Dict[str, Union[float, dict]]: A dictionary containing computed statistics."""
return {metric.__class__.__qualname__: metric.values for metric in self.metrics}
return {metric.name: metric.values for metric in self.metrics}
Loading
Loading