Skip to content

Commit

Permalink
Returns to using a process-based tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
erikhuck committed May 1, 2024
1 parent 5d58d06 commit 86a95a0
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 93 deletions.
144 changes: 89 additions & 55 deletions src/gpu_tracker/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,28 @@
import dataclasses as dclass
import platform
import time
import threading as thrd
import multiprocessing as mproc
import os
import typing as typ
import psutil
import subprocess as subp
import logging as log
import enum
import sys
import pickle as pkl
import uuid


class Tracker:
"""
Runs a thread in the background that tracks the compute time, maximum RAM, and maximum GPU RAM usage within a context manager or explicit ``start()`` and ``stop()`` methods.
Runs a sub-process that tracks computational resources of the calling process. Including the compute time, maximum RAM, and maximum GPU RAM usage within a context manager or explicit ``start()`` and ``stop()`` methods.
Calculated quantities are scaled depending on the units chosen for them (e.g. megabytes vs. gigabytes, hours vs. days, etc.).
:ivar MaxRAM max_ram: Description of the maximum RAM usage of the process, any descendents it may have, and the operating system overall.
:ivar MaxGPURAM max_gpu_ram: Description of the maximum GPU RAM usage of the process and any descendents it may have.
:ivar ComputeTime compute_time: Description of the real compute time i.e. the duration of tracking.
"""
_USAGE_FILE_TIME_DIFFERENCE = 10.0

class State(enum.Enum):
"""The state of the Tracker."""
NEW = 0
Expand All @@ -31,17 +34,15 @@ class State(enum.Enum):

def __init__(
self, sleep_time: float = 1.0, ram_unit: str = 'gigabytes', gpu_ram_unit: str = 'gigabytes', time_unit: str = 'hours',
disable_logs: bool = False, n_join_attempts: int = 5, join_timeout: float = 10.0, kill_if_join_fails: bool = False,
process_id: int | None = None):
disable_logs: bool = False, n_join_attempts: int = 5, join_timeout: float = 10.0, process_id: int | None = None):
"""
:param sleep_time: The number of seconds to sleep in between usage-collection iterations.
:param ram_unit: One of 'bytes', 'kilobytes', 'megabytes', 'gigabytes', or 'terabytes'.
:param gpu_ram_unit: One of 'bytes', 'kilobytes', 'megabytes', 'gigabytes', or 'terabytes'.
:param time_unit: One of 'seconds', 'minutes', 'hours', or 'days'.
:param disable_logs: If set, warnings are suppressed during tracking. Otherwise, the Tracker logs warnings as usual.
:param n_join_attempts: The number of times the tracker attempts to join its underlying thread.
:param join_timeout: The amount of time the tracker waits for its underlying thread to join.
:param kill_if_join_fails: If true, kill the process if the underlying thread fails to join.
:param n_join_attempts: The number of times the tracker attempts to join its underlying sub-process.
:param join_timeout: The amount of time the tracker waits for its underlying sub-process to join.
:param process_id: The ID of the process to track. Defaults to the current process.
:raises ValueError: Raised if invalid units are provided.
"""
Expand Down Expand Up @@ -69,26 +70,28 @@ def __init__(
'hours': 1 / (60 * 60),
'days': 1 / (60 * 60 * 24)
}[time_unit]
self._stop_event = thrd.Event()
self._thread = thrd.Thread(target=self._profile)
self._stop_event = mproc.Event()
self._tracking_process = mproc.Process(target=self._track)
self._core_percent_sums = {key: 0. for key in ['system', 'main', 'descendents', 'combined']}
self._cpu_percent_sums = {key: 0. for key in ['system', 'main', 'descendents', 'combined']}
self._tracking_iteration = 1
self.disable_logs = disable_logs
self.n_join_attempts = n_join_attempts
self.join_timeout = join_timeout
self.kill_if_join_fails = kill_if_join_fails
self.process_id = process_id if process_id is not None else os.getpid()
self._main_process = psutil.Process(self.process_id)
self._is_linux = platform.system().lower() == 'linux'
self.max_ram = MaxRAM(unit=ram_unit, system_capacity=psutil.virtual_memory().total * self._ram_coefficient)
self.max_gpu_ram = MaxGPURAM(unit=gpu_ram_unit, system_capacity=self._system_gpu_ram(measurement='total'))
self.cpu_utilization = CPUUtilization(system_core_count=psutil.cpu_count())
self.compute_time = ComputeTime(unit=time_unit)
max_ram = MaxRAM(unit=ram_unit, system_capacity=psutil.virtual_memory().total * self._ram_coefficient)
max_gpu_ram = MaxGPURAM(unit=gpu_ram_unit, system_capacity=self._system_gpu_ram(measurement='total'))
cpu_utilization = CPUUtilization(system_core_count=psutil.cpu_count())
compute_time = ComputeTime(unit=time_unit)
self.resource_usage = ResourceUsage(
max_ram=max_ram, max_gpu_ram=max_gpu_ram, cpu_utilization=cpu_utilization, compute_time=compute_time)
self._resource_usage_file = f'.{uuid.uuid1()}.pkl'
self.state = Tracker.State.NEW

def __repr__(self):
return (f'State: {self.state.name}')
return f'State: {self.state.name}'

def _log_warning(self, warning: str):
if not self.disable_logs:
Expand Down Expand Up @@ -151,8 +154,8 @@ def _update_gpu_ram(self, attr: str, process_ids: set[int], nvidia_smi_output: s
megabytes_used = int(megabytes_used.replace('MiB', '').strip())
curr_gpu_ram += megabytes_used
curr_gpu_ram *= self._gpu_ram_coefficient
max_gpu_ram = getattr(self.max_gpu_ram, attr)
setattr(self.max_gpu_ram, attr, max(max_gpu_ram, curr_gpu_ram))
max_gpu_ram = getattr(self.resource_usage.max_gpu_ram, attr)
setattr(self.resource_usage.max_gpu_ram, attr, max(max_gpu_ram, curr_gpu_ram))

def _system_gpu_ram(self, measurement: str) -> float:
command = f'nvidia-smi --query-gpu=memory.{measurement} --format=csv,noheader'
Expand All @@ -163,7 +166,7 @@ def _system_gpu_ram(self, measurement: str) -> float:
return ram_sum * self._gpu_ram_coefficient

def _update_cpu_utilization(self, percentages: list[float], attr: str):
cpu_percentages: CPUPercentages = getattr(self.cpu_utilization, attr)
cpu_percentages: CPUPercentages = getattr(self.resource_usage.cpu_utilization, attr)

def update_percentages(percent: float, percent_type: str, percent_sums: dict[str, float]):
percent_sums[attr] += percent
Expand All @@ -172,7 +175,7 @@ def update_percentages(percent: float, percent_type: str, percent_sums: dict[str
max_percent: float = getattr(cpu_percentages, f'max_{percent_type}_percent')
setattr(cpu_percentages, f'max_{percent_type}_percent', max(max_percent, percent))
core_percent = sum(percentages)
cpu_percent = core_percent / self.cpu_utilization.system_core_count
cpu_percent = core_percent / self.resource_usage.cpu_utilization.system_core_count
update_percentages(percent=core_percent, percent_type='core', percent_sums=self._core_percent_sums)
update_percentages(percent=cpu_percent, percent_type='cpu', percent_sums=self._cpu_percent_sums)

Expand All @@ -188,54 +191,64 @@ def get_n_threads(process: psutil.Process):
n_threads_list = self._map_processes(processes, get_n_threads)
n_threads = sum(n_threads_list)
attr = f'{attr}_n_threads'
max_n_threads = getattr(self.cpu_utilization, attr)
setattr(self.cpu_utilization, attr, max(n_threads, max_n_threads))
max_n_threads = getattr(self.resource_usage.cpu_utilization, attr)
setattr(self.resource_usage.cpu_utilization, attr, max(n_threads, max_n_threads))

def _get_descendents(self) -> list[psutil.Process]:
return [process for process in self._main_process.children(recursive=True) if process.pid != self._tracking_process.pid]

def _profile(self):
def _track(self):
"""
Continuously tracks computational resource usage until the end of tracking is triggered, either by exiting the context manager or by a call to stop()
"""
start_time = time.time()
while not self._stop_event.is_set():
start_time = _testable_time()
# Simulate a do-while loop so that the tracking is executed at least once.
while True:
try:
# Get the maximum RAM usage.
self._update_ram(rss_values=self.max_ram.main, processes=[self._main_process])
self._update_ram(rss_values=self.max_ram.descendents, processes=self._main_process.children(recursive=True))
self._update_ram(rss_values=self.resource_usage.max_ram.main, processes=[self._main_process])
self._update_ram(rss_values=self.resource_usage.max_ram.descendents, processes=self._get_descendents())
# Call children() each time it's used to get an updated list in case the children changed since the above call.
self._update_ram(
rss_values=self.max_ram.combined, processes=[self._main_process] + self._main_process.children(recursive=True))
self.max_ram.system = max(self.max_ram.system, psutil.virtual_memory().used * self._ram_coefficient)
self._update_ram(rss_values=self.resource_usage.max_ram.combined, processes=[self._main_process] + self._get_descendents())
self.resource_usage.max_ram.system = max(
self.resource_usage.max_ram.system, psutil.virtual_memory().used * self._ram_coefficient)
# Get the maximum GPU RAM usage.
memory_used_command = 'nvidia-smi --query-compute-apps=pid,used_gpu_memory --format=csv,noheader'
nvidia_smi_output = subp.check_output(memory_used_command.split(), stderr=subp.STDOUT).decode()
if nvidia_smi_output:
process_ids = {self.process_id}
self._update_gpu_ram(attr='main', process_ids=process_ids, nvidia_smi_output=nvidia_smi_output)
process_ids = {process.pid for process in self._main_process.children(recursive=True)}
process_ids = {process.pid for process in self._get_descendents()}
self._update_gpu_ram(attr='descendents', process_ids=process_ids, nvidia_smi_output=nvidia_smi_output)
process_ids.add(self.process_id)
self._update_gpu_ram(attr='combined', process_ids=process_ids, nvidia_smi_output=nvidia_smi_output)
self.max_gpu_ram.system = max(self.max_gpu_ram.system, self._system_gpu_ram(measurement='used'))
self.resource_usage.max_gpu_ram.system = max(
self.resource_usage.max_gpu_ram.system, self._system_gpu_ram(measurement='used'))
# Get the mean and maximum CPU usages.
# noinspection PyTypeChecker
system_core_percentages: list[float] = psutil.cpu_percent(percpu=True)
self._update_cpu_utilization(percentages=system_core_percentages, attr='system')
self._update_cpu_utilization_by_process(processes=[self._main_process], attr='main')
self._update_cpu_utilization_by_process(processes=self._main_process.children(recursive=True), attr='descendents')
self._update_cpu_utilization_by_process(processes=self._get_descendents(), attr='descendents')
self._update_cpu_utilization_by_process(
processes=[self._main_process] + self._main_process.children(recursive=True), attr='combined')
processes=[self._main_process] + self._get_descendents(), attr='combined')
self._update_n_threads(processes=[self._main_process], attr='main')
self._update_n_threads(processes=self._main_process.children(recursive=True), attr='descendents')
self._update_n_threads(processes=[self._main_process] + self._main_process.children(recursive=True), attr='combined')
self._update_n_threads(processes=self._get_descendents(), attr='descendents')
self._update_n_threads(processes=[self._main_process] + self._get_descendents(), attr='combined')
# Update compute time
self.compute_time.time = (time.time() - start_time) * self._time_coefficient
self.resource_usage.compute_time.time = (_testable_time() - start_time) * self._time_coefficient
self._tracking_iteration += 1
with open(self._resource_usage_file, 'wb') as file:
pkl.dump(self.resource_usage, file)
if self._stop_event.is_set():
# Tracking has completed.
break
_testable_sleep(self.sleep_time)
except psutil.NoSuchProcess:
self._log_warning('Failed to track a process that does not exist. '
'This possibly resulted from the process completing before tracking could begin.')
'This possibly resulted from the process completing before it could be tracked.')
except Exception as error:
self._log_warning('The following uncaught exception occurred in the Tracker\'s thread:')
self._log_warning('The following uncaught exception occurred in the tracking process:')
print(error)

def __enter__(self) -> Tracker:
Expand All @@ -244,7 +257,7 @@ def __enter__(self) -> Tracker:
elif self.state == Tracker.State.STOPPED:
raise RuntimeError('Cannot start tracking when tracking has already stopped.')
self.state = Tracker.State.STARTED
self._thread.start()
self._tracking_process.start()
return self

def __exit__(self, *_):
Expand All @@ -255,19 +268,29 @@ def __exit__(self, *_):
n_join_attempts = 0
while n_join_attempts < self.n_join_attempts:
self._stop_event.set()
self._thread.join(timeout=self.join_timeout)
self._tracking_process.join(timeout=self.join_timeout)
n_join_attempts += 1
if self._thread.is_alive():
log.warning('Thread is still alive after join timout. Attempting to join again...')
if self._tracking_process.is_alive():
log.warning('The tracking process is still alive after join timout. Attempting to join again...')
else:
break
if self._thread.is_alive():
if self._tracking_process.is_alive():
log.warning(
f'Thread is still alive after {self.n_join_attempts} attempts to join. '
f'The thread will likely not end until the parent process ends.')
if self.kill_if_join_fails:
log.warning('The thread failed to join and kill_if_join_fails is set. Exiting ...')
sys.exit(1)
f'The tracking process is still alive after {self.n_join_attempts} attempts to join. '
f'Terminating the process by force...')
self._tracking_process.terminate()
self._tracking_process.close()
if os.path.isfile(self._resource_usage_file):
with open(self._resource_usage_file, 'rb') as file:
self.resource_usage = pkl.load(file)
time_since_modified = _testable_time() - os.path.getmtime(self._resource_usage_file)
if time_since_modified > Tracker._USAGE_FILE_TIME_DIFFERENCE:
log.warning(
f'Tracking is stopping and it has been {time_since_modified} seconds since the temporary tracking results file was '
f'last updated. Resource usage was not updated during that time.')
os.remove(self._resource_usage_file)
else:
raise RuntimeError('The temporary tracking results file does not exist. Tracking results cannot be obtained.')
self.state = Tracker.State.STOPPED

def start(self):
Expand Down Expand Up @@ -315,12 +338,7 @@ def to_json(self) -> dict[str, dict]:
"""
Constructs a dictionary of the computational-resource-usage measurements and their units.
"""
return {
'max_ram': dclass.asdict(self.max_ram),
'max_gpu_ram': dclass.asdict(self.max_gpu_ram),
'cpu_utilization': dclass.asdict(self.cpu_utilization),
'compute_time': dclass.asdict(self.compute_time)
}
return dclass.asdict(self.resource_usage)


@dclass.dataclass
Expand Down Expand Up @@ -399,9 +417,25 @@ class ComputeTime:
time: float = 0.


@dclass.dataclass
class ResourceUsage:
max_ram: MaxRAM
max_gpu_ram: MaxGPURAM
cpu_utilization: CPUUtilization
compute_time: ComputeTime


def _testable_sleep(sleep_time: float):
""" The time.sleep() function causes issues when mocked in tests, so we create this wrapper that can be safely mocked.
:return: The result of time.sleep()
"""
return time.sleep(sleep_time) # pragma: no cover


def _testable_time():
""" The time.time() function causes issues when mocked in tests, so we create this wrapper that can be safely mocked.
:return: The result of time.time()
"""
return time.time() # pragma: no cover
2 changes: 1 addition & 1 deletion tests/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ python3 -m venv .env/
source .env/bin/activate || source .env/Scripts/activate # Windows has Scripts instead of bin
python3 -m pip install --upgrade pip
python3 -m pip install pytest pytest-mock pytest-cov sphinx sphinx-rtd-theme notebook
python3 -m pip install -e .
python3 -m pip install -e .
2 changes: 1 addition & 1 deletion tests/run.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
source .env/bin/activate || source .env/Scripts/activate # Windows has Scripts instead of bin
python3 -m pytest tests --cov --cov-branch --cov-report=term-missing
python3 -m pytest tests --cov --cov-branch --cov-report=term-missing
Loading

0 comments on commit 86a95a0

Please sign in to comment.