diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 243ce6c..2ec8fbd 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -27,3 +27,8 @@ jobs: run: bash tests/install.sh - name: Test with pytest run: bash tests/run.sh + - name: Debug with tmate on failure + if: ${{ failure() }} + uses: mxschmitt/action-tmate@v3 + with: + limit-access-to-actor: true diff --git a/.gitignore b/.gitignore index a5ed996..603e21b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ -.coverage +.coverage* .env/ .idea/ dist/ diff --git a/src/gpu_tracker/__main__.py b/src/gpu_tracker/__main__.py index 30d465c..1756345 100644 --- a/src/gpu_tracker/__main__.py +++ b/src/gpu_tracker/__main__.py @@ -2,10 +2,13 @@ Tracks the computational resource usage (RAM, GPU RAM, and compute time) of a process corresponding to a given shell command. Usage: + gpu-tracker -h | --help + gpu-tracker -v | --version gpu-tracker --execute= [--output=] [--format=] [--st=] [--ru=] [--gru=] [--tu=] [--disable-logs] Options: - -h --help Show this help message. + -h --help Show this help message and exit. + -v --version Show package version and exit. -e --execute= The command to run along with its arguments all within quotes e.g. "ls -l -a". -o --output= File path to store the computational-resource-usage measurements. If not set, prints measurements to the screen. -f --format= File format of the output. Either 'json' or 'text'. Defaults to 'text'. @@ -21,10 +24,11 @@ import logging as log import sys from . import Tracker +from . import __version__ def main(): - args = doc.docopt(__doc__) + args = doc.docopt(__doc__, version=__version__) command = args['--execute'].split(' ') output = args['--output'] output_format = args['--format'] if args['--format'] is not None else 'text' @@ -37,7 +41,7 @@ def main(): } kwargs = { option_map[option]: value for option, value in args.items() if value is not None and option not in { - '--execute', '--output', '--format'}} + '--execute', '--output', '--format', '--help', '--version'}} if 'sleep_time' in kwargs.keys(): kwargs['sleep_time'] = float(kwargs['sleep_time']) try: diff --git a/src/gpu_tracker/tracker.py b/src/gpu_tracker/tracker.py index 4f5f105..67597c9 100644 --- a/src/gpu_tracker/tracker.py +++ b/src/gpu_tracker/tracker.py @@ -4,104 +4,152 @@ import dataclasses as dclass import platform import time -import threading as thrd +import multiprocessing as mproc import os import typing as typ import psutil import subprocess as subp import logging as log import enum -import sys - - -class Tracker: - """ - Runs a thread in the background that tracks the compute time, maximum RAM, and maximum GPU RAM usage within a context manager or explicit ``start()`` and ``stop()`` methods. - Calculated quantities are scaled depending on the units chosen for them (e.g. megabytes vs. gigabytes, hours vs. days, etc.). - - :ivar MaxRAM max_ram: Description of the maximum RAM usage of the process, any descendents it may have, and the operating system overall. - :ivar MaxGPURAM max_gpu_ram: Description of the maximum GPU RAM usage of the process and any descendents it may have. - :ivar ComputeTime compute_time: Description of the real compute time i.e. the duration of tracking. - """ - class State(enum.Enum): - """The state of the Tracker.""" - NEW = 0 - STARTED = 1 - STOPPED = 2 +import pickle as pkl +import uuid + + +class _TrackingProcess(mproc.Process): + _CPU_PERCENT_INTERVAL = 0.1 + _ram_unit2coefficient = { + 'bytes': 1.0, + 'kilobytes': 1e-3, + 'megabytes': 1e-6, + 'gigabytes': 1e-9, + 'terabytes': 1e-12 + } + _gpu_ram_unit2coefficient = { + 'bytes': 1e6, + 'kilobytes': 1e3, + 'megabytes': 1.0, + 'gigabytes': 1e-3, + 'terabytes': 1e-6 + } + _time_unit2coefficient = { + 'seconds': 1.0, + 'minutes': 1 / 60, + 'hours': 1 / (60 * 60), + 'days': 1 / (60 * 60 * 24) + } def __init__( - self, sleep_time: float = 1.0, ram_unit: str = 'gigabytes', gpu_ram_unit: str = 'gigabytes', time_unit: str = 'hours', - disable_logs: bool = False, n_join_attempts: int = 5, join_timeout: float = 10.0, kill_if_join_fails: bool = False, - process_id: int | None = None): - """ - :param sleep_time: The number of seconds to sleep in between usage-collection iterations. - :param ram_unit: One of 'bytes', 'kilobytes', 'megabytes', 'gigabytes', or 'terabytes'. - :param gpu_ram_unit: One of 'bytes', 'kilobytes', 'megabytes', 'gigabytes', or 'terabytes'. - :param time_unit: One of 'seconds', 'minutes', 'hours', or 'days'. - :param disable_logs: If set, warnings are suppressed during tracking. Otherwise, the Tracker logs warnings as usual. - :param n_join_attempts: The number of times the tracker attempts to join its underlying thread. - :param join_timeout: The amount of time the tracker waits for its underlying thread to join. - :param kill_if_join_fails: If true, kill the process if the underlying thread fails to join. - :param process_id: The ID of the process to track. Defaults to the current process. - :raises ValueError: Raised if invalid units are provided. - """ - Tracker._validate_mem_unit(ram_unit) - Tracker._validate_mem_unit(gpu_ram_unit) - Tracker._validate_unit(time_unit, valid_units={'seconds', 'minutes', 'hours', 'days'}, unit_type='time') - self.sleep_time = sleep_time - self._ram_coefficient: float = { - 'bytes': 1.0, - 'kilobytes': 1 / 1e3, - 'megabytes': 1 / 1e6, - 'gigabytes': 1 / 1e9, - 'terabytes': 1 / 1e12 - }[ram_unit] - self._gpu_ram_coefficient: float = { - 'bytes': 1e6, - 'kilobytes': 1e3, - 'megabytes': 1.0, - 'gigabytes': 1 / 1e3, - 'terabytes': 1 / 1e6 - }[gpu_ram_unit] - self._time_coefficient: float = { - 'seconds': 1.0, - 'minutes': 1 / 60, - 'hours': 1 / (60 * 60), - 'days': 1 / (60 * 60 * 24) - }[time_unit] - self._stop_event = thrd.Event() - self._thread = thrd.Thread(target=self._profile) + self, stop_event: mproc.Event, sleep_time: float, ram_unit: str, gpu_ram_unit: str, time_unit: str, + disable_logs: bool, main_process_id: int, resource_usage_file: str, extraneous_process_ids: set[int]): + super().__init__() + self._stop_event = stop_event + if sleep_time < _TrackingProcess._CPU_PERCENT_INTERVAL: + raise ValueError( + f'Sleep time of {sleep_time} is invalid. Must be at least {_TrackingProcess._CPU_PERCENT_INTERVAL} seconds.') + self._sleep_time = sleep_time + self._ram_coefficient = _TrackingProcess._validate_unit( + ram_unit, _TrackingProcess._ram_unit2coefficient, unit_type='RAM') + self._gpu_ram_coefficient = _TrackingProcess._validate_unit( + gpu_ram_unit, _TrackingProcess._gpu_ram_unit2coefficient, unit_type='GPU RAM') + self._time_coefficient = _TrackingProcess._validate_unit( + time_unit, _TrackingProcess._time_unit2coefficient, unit_type='time') + self._disable_logs = disable_logs + self._main_process_id = main_process_id self._core_percent_sums = {key: 0. for key in ['system', 'main', 'descendents', 'combined']} self._cpu_percent_sums = {key: 0. for key in ['system', 'main', 'descendents', 'combined']} self._tracking_iteration = 1 - self.disable_logs = disable_logs - self.n_join_attempts = n_join_attempts - self.join_timeout = join_timeout - self.kill_if_join_fails = kill_if_join_fails - self.process_id = process_id if process_id is not None else os.getpid() - self._main_process = psutil.Process(self.process_id) self._is_linux = platform.system().lower() == 'linux' - self.max_ram = MaxRAM(unit=ram_unit, system_capacity=psutil.virtual_memory().total * self._ram_coefficient) - self.max_gpu_ram = MaxGPURAM(unit=gpu_ram_unit, system_capacity=self._system_gpu_ram(measurement='total')) - self.cpu_utilization = CPUUtilization(system_core_count=psutil.cpu_count()) - self.compute_time = ComputeTime(unit=time_unit) - self.state = Tracker.State.NEW - - def __repr__(self): - return (f'State: {self.state.name}') - - def _log_warning(self, warning: str): - if not self.disable_logs: - log.warning(warning) - - @staticmethod - def _validate_mem_unit(unit: str): - Tracker._validate_unit(unit, valid_units={'bytes', 'kilobytes', 'megabytes', 'gigabytes', 'terabytes'}, unit_type='memory') - - @staticmethod - def _validate_unit(unit: str, valid_units: set[str], unit_type: str): - if unit not in valid_units: - raise ValueError(f'"{unit}" is not a valid {unit_type} unit. Valid values are {", ".join(sorted(valid_units))}') + self._nvidia_available = True + try: + subp.check_output('nvidia-smi') + except FileNotFoundError: + self._nvidia_available = False + self._log_warning( + 'The nvidia-smi command is not available. Please install the Nvidia drivers to track GPU usage. ' + 'Otherwise the Max GPU RAM values will remain 0.0') + max_ram = MaxRAM(unit=ram_unit, system_capacity=psutil.virtual_memory().total * self._ram_coefficient) + max_gpu_ram = MaxGPURAM( + unit=gpu_ram_unit, system_capacity=self._system_gpu_ram(measurement='total') if self._nvidia_available else 0.0) + cpu_utilization = CPUUtilization(system_core_count=psutil.cpu_count()) + compute_time = ComputeTime(unit=time_unit) + self._resource_usage = ResourceUsage( + max_ram=max_ram, max_gpu_ram=max_gpu_ram, cpu_utilization=cpu_utilization, compute_time=compute_time) + self._resource_usage_file = resource_usage_file + self._extraneous_process_ids = extraneous_process_ids + + def run(self): + """ + Continuously tracks computational resource usage until the end of tracking is triggered, either by exiting the context manager or by a call to stop() + """ + start_time = time.time() + self._extraneous_process_ids.add(self.pid) + get_cpu_percent = lambda process: process.cpu_percent() + try: + main_process = psutil.Process(self._main_process_id) + except psutil.NoSuchProcess: + main_process = None + self._log_warning(f'The target process of ID {self._main_process_id} ended before tracking could begin.') + self._stop_event.set() + # Simulate a do-while loop so that the tracking is executed at least once. + while True: + with open(self._resource_usage_file, 'wb') as file: + pkl.dump(self._resource_usage, file) + if self._stop_event.is_set(): + # Tracking has completed. + break + try: + descendent_processes = [ + process for process in main_process.children(recursive=True) if process.pid not in self._extraneous_process_ids] + combined_processes = [main_process] + descendent_processes + # The first call to cpu_percent returns a meaningless value of 0.0 and should be ignored. + # And it's recommended to wait a specified amount of time after the first call to cpu_percent. + # See https://psutil.readthedocs.io/en/latest/#psutil.Process.cpu_percent + self._map_processes(processes=combined_processes, map_func=get_cpu_percent) + # Get the maximum RAM usage. + self._update_ram(rss_values=self._resource_usage.max_ram.main, processes=[main_process]) + self._update_ram(rss_values=self._resource_usage.max_ram.descendents, processes=descendent_processes) + self._update_ram(rss_values=self._resource_usage.max_ram.combined, processes=combined_processes) + self._resource_usage.max_ram.system = max( + self._resource_usage.max_ram.system, psutil.virtual_memory().used * self._ram_coefficient) + # Get the maximum GPU RAM usage if available. + if self._nvidia_available: + memory_used_command = 'nvidia-smi --query-compute-apps=pid,used_gpu_memory --format=csv,noheader' + nvidia_smi_output = subp.check_output(memory_used_command.split(), stderr=subp.STDOUT).decode() + if nvidia_smi_output: + process_ids = {self._main_process_id} + self._update_gpu_ram(attr='main', process_ids=process_ids, nvidia_smi_output=nvidia_smi_output) + process_ids = { + process_id for process_id in self._map_processes( + processes=descendent_processes, map_func=lambda process: process.pid)} + self._update_gpu_ram(attr='descendents', process_ids=process_ids, + nvidia_smi_output=nvidia_smi_output) + process_ids.add(self._main_process_id) + self._update_gpu_ram(attr='combined', process_ids=process_ids, nvidia_smi_output=nvidia_smi_output) + self._resource_usage.max_gpu_ram.system = max( + self._resource_usage.max_gpu_ram.system, self._system_gpu_ram(measurement='used')) + # Get the mean and maximum CPU usages. + self._update_n_threads(processes=[main_process], attr='main') + self._update_n_threads(processes=descendent_processes, attr='descendents') + self._update_n_threads(processes=combined_processes, attr='combined') + # noinspection PyTypeChecker + system_core_percentages: list[float] = psutil.cpu_percent(percpu=True) + self._update_cpu_utilization(percentages=system_core_percentages, attr='system') + time.sleep(_TrackingProcess._CPU_PERCENT_INTERVAL) + main_percentage = main_process.cpu_percent() + descendent_percentages = self._map_processes(processes=descendent_processes, map_func=get_cpu_percent) + self._update_cpu_utilization(percentages=[main_percentage], attr='main') + self._update_cpu_utilization(percentages=descendent_percentages, attr='descendents') + self._update_cpu_utilization(percentages=[main_percentage] + descendent_percentages, attr='combined') + # Update compute time. + self._resource_usage.compute_time.time = (time.time() - start_time) * self._time_coefficient + self._tracking_iteration += 1 + time.sleep(self._sleep_time - _TrackingProcess._CPU_PERCENT_INTERVAL) + except psutil.NoSuchProcess as error: + self._log_warning(f'Failed to track a process (PID: {error.pid}) that does not exist. ' + f'This possibly resulted from the process completing before it could be tracked.') + except Exception as error: + self._log_warning('The following uncaught exception occurred in the tracking process:') + print(error) def _map_processes(self, processes: list[psutil.Process], map_func: typ.Callable[[psutil.Process], typ.Any]) -> list: mapped_list = list[list]() @@ -114,10 +162,8 @@ def _map_processes(self, processes: list[psutil.Process], map_func: typ.Callable def _update_ram(self, rss_values: RSSValues, processes: list[psutil.Process]): if self._is_linux: - def get_memory_maps(process: psutil.Process) -> list: - return process.memory_maps(grouped=False) - - memory_maps_list: list[list] = self._map_processes(processes, map_func=get_memory_maps) + memory_maps_list: list[list] = self._map_processes( + processes, map_func=lambda process: process.memory_maps(grouped=False)) private_rss = 0 path_to_shared_rss = dict[str, float]() for memory_maps in memory_maps_list: @@ -135,9 +181,7 @@ def get_memory_maps(process: psutil.Process) -> list: rss_values.shared_rss = max(rss_values.shared_rss, shared_rss) total_rss = private_rss + shared_rss else: - def get_rss(process: psutil.Process) -> int: - return process.memory_info().rss - total_rss = sum(self._map_processes(processes, map_func=get_rss)) + total_rss = sum(self._map_processes(processes, map_func=lambda process: process.memory_info().rss)) total_rss *= self._ram_coefficient rss_values.total_rss = max(rss_values.total_rss, total_rss) @@ -151,8 +195,8 @@ def _update_gpu_ram(self, attr: str, process_ids: set[int], nvidia_smi_output: s megabytes_used = int(megabytes_used.replace('MiB', '').strip()) curr_gpu_ram += megabytes_used curr_gpu_ram *= self._gpu_ram_coefficient - max_gpu_ram = getattr(self.max_gpu_ram, attr) - setattr(self.max_gpu_ram, attr, max(max_gpu_ram, curr_gpu_ram)) + max_gpu_ram = getattr(self._resource_usage.max_gpu_ram, attr) + setattr(self._resource_usage.max_gpu_ram, attr, max(max_gpu_ram, curr_gpu_ram)) def _system_gpu_ram(self, measurement: str) -> float: command = f'nvidia-smi --query-gpu=memory.{measurement} --format=csv,noheader' @@ -163,7 +207,7 @@ def _system_gpu_ram(self, measurement: str) -> float: return ram_sum * self._gpu_ram_coefficient def _update_cpu_utilization(self, percentages: list[float], attr: str): - cpu_percentages: CPUPercentages = getattr(self.cpu_utilization, attr) + cpu_percentages: CPUPercentages = getattr(self._resource_usage.cpu_utilization, attr) def update_percentages(percent: float, percent_type: str, percent_sums: dict[str, float]): percent_sums[attr] += percent @@ -171,72 +215,79 @@ def update_percentages(percent: float, percent_type: str, percent_sums: dict[str setattr(cpu_percentages, f'mean_{percent_type}_percent', mean_percent) max_percent: float = getattr(cpu_percentages, f'max_{percent_type}_percent') setattr(cpu_percentages, f'max_{percent_type}_percent', max(max_percent, percent)) + core_percent = sum(percentages) - cpu_percent = core_percent / self.cpu_utilization.system_core_count + cpu_percent = core_percent / self._resource_usage.cpu_utilization.system_core_count update_percentages(percent=core_percent, percent_type='core', percent_sums=self._core_percent_sums) update_percentages(percent=cpu_percent, percent_type='cpu', percent_sums=self._cpu_percent_sums) - def _update_cpu_utilization_by_process(self, processes: list[psutil.Process], attr: str): - def get_cpu_percent(process: psutil.Process) -> float: - return process.cpu_percent() - percentages = self._map_processes(processes, map_func=get_cpu_percent) - self._update_cpu_utilization(percentages, attr) - def _update_n_threads(self, processes: list[psutil.Process], attr: str): - def get_n_threads(process: psutil.Process): - return process.num_threads() - n_threads_list = self._map_processes(processes, get_n_threads) + n_threads_list = self._map_processes(processes, map_func=lambda process: process.num_threads()) n_threads = sum(n_threads_list) attr = f'{attr}_n_threads' - max_n_threads = getattr(self.cpu_utilization, attr) - setattr(self.cpu_utilization, attr, max(n_threads, max_n_threads)) + max_n_threads = getattr(self._resource_usage.cpu_utilization, attr) + setattr(self._resource_usage.cpu_utilization, attr, max(n_threads, max_n_threads)) + + @staticmethod + def _validate_unit(unit: str, unit2coefficient: dict[str, float], unit_type: str) -> float: + valid_units = set(unit2coefficient.keys()) + if unit not in valid_units: + raise ValueError(f'"{unit}" is not a valid {unit_type} unit. Valid values are {", ".join(sorted(valid_units))}') + return unit2coefficient[unit] + + def _log_warning(self, warning: str): + if not self._disable_logs: + log.warning(warning) + + +class Tracker: + """ + Runs a sub-process that tracks computational resources of the calling process. Including the compute time, maximum RAM, and maximum GPU RAM usage within a context manager or explicit ``start()`` and ``stop()`` methods. + Calculated quantities are scaled depending on the units chosen for them (e.g. megabytes vs. gigabytes, hours vs. days, etc.). + + :ivar resource_usage: Data class containing the max_ram (Description of the maximum RAM usage of the process, any descendents it may have, and the operating system overall), max_gpu_ram (Description of the maximum GPU RAM usage of the process and any descendents it may have), and compute_time (Description of the real compute time i.e. the duration of tracking) attributes. + """ + _USAGE_FILE_TIME_DIFFERENCE = 10.0 - def _profile(self): + class State(enum.Enum): + """The state of the Tracker.""" + NEW = 0 + STARTED = 1 + STOPPED = 2 + + def __init__( + self, sleep_time: float = 1.0, ram_unit: str = 'gigabytes', gpu_ram_unit: str = 'gigabytes', time_unit: str = 'hours', + disable_logs: bool = False, process_id: int | None = None, n_join_attempts: int = 5, join_timeout: float = 10.0): """ - Continuously tracks computational resource usage until the end of tracking is triggered, either by exiting the context manager or by a call to stop() + :param sleep_time: The number of seconds to sleep in between usage-collection iterations. + :param ram_unit: One of 'bytes', 'kilobytes', 'megabytes', 'gigabytes', or 'terabytes'. + :param gpu_ram_unit: One of 'bytes', 'kilobytes', 'megabytes', 'gigabytes', or 'terabytes'. + :param time_unit: One of 'seconds', 'minutes', 'hours', or 'days'. + :param disable_logs: If set, warnings are suppressed during tracking. Otherwise, the Tracker logs warnings as usual. + :param process_id: The ID of the process to track. Defaults to the current process. + :param n_join_attempts: The number of times the tracker attempts to join its underlying sub-process. + :param join_timeout: The amount of time the tracker waits for its underlying sub-process to join. + :raises ValueError: Raised if invalid units are provided. """ - start_time = time.time() - while not self._stop_event.is_set(): - try: - # Get the maximum RAM usage. - self._update_ram(rss_values=self.max_ram.main, processes=[self._main_process]) - self._update_ram(rss_values=self.max_ram.descendents, processes=self._main_process.children(recursive=True)) - # Call children() each time it's used to get an updated list in case the children changed since the above call. - self._update_ram( - rss_values=self.max_ram.combined, processes=[self._main_process] + self._main_process.children(recursive=True)) - self.max_ram.system = max(self.max_ram.system, psutil.virtual_memory().used * self._ram_coefficient) - # Get the maximum GPU RAM usage. - memory_used_command = 'nvidia-smi --query-compute-apps=pid,used_gpu_memory --format=csv,noheader' - nvidia_smi_output = subp.check_output(memory_used_command.split(), stderr=subp.STDOUT).decode() - if nvidia_smi_output: - process_ids = {self.process_id} - self._update_gpu_ram(attr='main', process_ids=process_ids, nvidia_smi_output=nvidia_smi_output) - process_ids = {process.pid for process in self._main_process.children(recursive=True)} - self._update_gpu_ram(attr='descendents', process_ids=process_ids, nvidia_smi_output=nvidia_smi_output) - process_ids.add(self.process_id) - self._update_gpu_ram(attr='combined', process_ids=process_ids, nvidia_smi_output=nvidia_smi_output) - self.max_gpu_ram.system = max(self.max_gpu_ram.system, self._system_gpu_ram(measurement='used')) - # Get the mean and maximum CPU usages. - # noinspection PyTypeChecker - system_core_percentages: list[float] = psutil.cpu_percent(percpu=True) - self._update_cpu_utilization(percentages=system_core_percentages, attr='system') - self._update_cpu_utilization_by_process(processes=[self._main_process], attr='main') - self._update_cpu_utilization_by_process(processes=self._main_process.children(recursive=True), attr='descendents') - self._update_cpu_utilization_by_process( - processes=[self._main_process] + self._main_process.children(recursive=True), attr='combined') - self._update_n_threads(processes=[self._main_process], attr='main') - self._update_n_threads(processes=self._main_process.children(recursive=True), attr='descendents') - self._update_n_threads(processes=[self._main_process] + self._main_process.children(recursive=True), attr='combined') - # Update compute time - self.compute_time.time = (time.time() - start_time) * self._time_coefficient - self._tracking_iteration += 1 - _testable_sleep(self.sleep_time) - except psutil.NoSuchProcess: - self._log_warning('Failed to track a process that does not exist. ' - 'This possibly resulted from the process completing before tracking could begin.') - except Exception as error: - self._log_warning('The following uncaught exception occurred in the Tracker\'s thread:') - print(error) + current_process_id = os.getpid() + current_process = psutil.Process(current_process_id) + # Sometimes a "resource tracker" process is started after creating an Event object. + # We want to exclude the resource tracker process(es) from the processes being tracked if it's (they're) created. + # See https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods + legit_child_ids = {process.pid for process in current_process.children()} + self._stop_event = mproc.Event() + extraneous_ids = {process.pid for process in current_process.children()} - legit_child_ids + self._resource_usage_file = f'.{uuid.uuid1()}.pkl' + self._tracking_process = _TrackingProcess( + self._stop_event, sleep_time, ram_unit, gpu_ram_unit, time_unit, disable_logs, + process_id if process_id is not None else current_process_id, self._resource_usage_file, extraneous_ids) + self.resource_usage = None + self.n_join_attempts = n_join_attempts + self.join_timeout = join_timeout + self.state = Tracker.State.NEW + + def __repr__(self): + return f'State: {self.state.name}' def __enter__(self) -> Tracker: if self.state == Tracker.State.STARTED: @@ -244,7 +295,7 @@ def __enter__(self) -> Tracker: elif self.state == Tracker.State.STOPPED: raise RuntimeError('Cannot start tracking when tracking has already stopped.') self.state = Tracker.State.STARTED - self._thread.start() + self._tracking_process.start() return self def __exit__(self, *_): @@ -255,19 +306,29 @@ def __exit__(self, *_): n_join_attempts = 0 while n_join_attempts < self.n_join_attempts: self._stop_event.set() - self._thread.join(timeout=self.join_timeout) + self._tracking_process.join(timeout=self.join_timeout) n_join_attempts += 1 - if self._thread.is_alive(): - log.warning('Thread is still alive after join timout. Attempting to join again...') + if self._tracking_process.is_alive(): + log.warning('The tracking process is still alive after join timout. Attempting to join again...') else: break - if self._thread.is_alive(): + if self._tracking_process.is_alive(): log.warning( - f'Thread is still alive after {self.n_join_attempts} attempts to join. ' - f'The thread will likely not end until the parent process ends.') - if self.kill_if_join_fails: - log.warning('The thread failed to join and kill_if_join_fails is set. Exiting ...') - sys.exit(1) + f'The tracking process is still alive after {self.n_join_attempts} attempts to join. ' + f'Terminating the process by force...') + self._tracking_process.terminate() + self._tracking_process.close() + if os.path.isfile(self._resource_usage_file): + with open(self._resource_usage_file, 'rb') as file: + self.resource_usage = pkl.load(file) + time_since_modified = time.time() - os.path.getmtime(self._resource_usage_file) + if time_since_modified > Tracker._USAGE_FILE_TIME_DIFFERENCE: + log.warning( + f'Tracking is stopping and it has been {time_since_modified} seconds since the temporary tracking results file was ' + f'last updated. Resource usage was not updated during that time.') + os.remove(self._resource_usage_file) + else: + raise RuntimeError('The temporary tracking results file does not exist. Tracking results cannot be obtained.') self.state = Tracker.State.STOPPED def start(self): @@ -306,21 +367,16 @@ def _format_float(dictionary: dict): :param dictionary: The dictionary to format. """ for key, value in dictionary.items(): - if type(value) == float: + if isinstance(value, float): dictionary[key] = float(f'{value:.3f}') - elif type(value) == dict: + elif isinstance(value, dict): Tracker._format_float(value) def to_json(self) -> dict[str, dict]: """ Constructs a dictionary of the computational-resource-usage measurements and their units. """ - return { - 'max_ram': dclass.asdict(self.max_ram), - 'max_gpu_ram': dclass.asdict(self.max_gpu_ram), - 'cpu_utilization': dclass.asdict(self.cpu_utilization), - 'compute_time': dclass.asdict(self.compute_time) - } + return dclass.asdict(self.resource_usage) @dclass.dataclass @@ -399,9 +455,9 @@ class ComputeTime: time: float = 0. -def _testable_sleep(sleep_time: float): - """ The time.sleep() function causes issues when mocked in tests, so we create this wrapper that can be safely mocked. - - :return: The result of time.sleep() - """ - return time.sleep(sleep_time) # pragma: no cover +@dclass.dataclass +class ResourceUsage: + max_ram: MaxRAM + max_gpu_ram: MaxGPURAM + cpu_utilization: CPUUtilization + compute_time: ComputeTime diff --git a/tests/install.sh b/tests/install.sh index b37f879..0189375 100644 --- a/tests/install.sh +++ b/tests/install.sh @@ -5,4 +5,4 @@ python3 -m venv .env/ source .env/bin/activate || source .env/Scripts/activate # Windows has Scripts instead of bin python3 -m pip install --upgrade pip python3 -m pip install pytest pytest-mock pytest-cov sphinx sphinx-rtd-theme notebook -python3 -m pip install -e . \ No newline at end of file +python3 -m pip install -e . diff --git a/tests/run.sh b/tests/run.sh index 173fb57..e9fbdab 100644 --- a/tests/run.sh +++ b/tests/run.sh @@ -1,2 +1,2 @@ source .env/bin/activate || source .env/Scripts/activate # Windows has Scripts instead of bin -python3 -m pytest tests --cov --cov-branch --cov-report=term-missing \ No newline at end of file +python3 -m pytest tests --cov --cov-branch --cov-report=term-missing diff --git a/tests/test_tracker.py b/tests/test_tracker.py index bc8613a..8841aa7 100644 --- a/tests/test_tracker.py +++ b/tests/test_tracker.py @@ -1,6 +1,6 @@ -import typing as typ import gpu_tracker as gput import json +import os import pytest as pt @@ -14,8 +14,8 @@ def get_use_context_manager(request) -> bool: yield request.param -def double_list(_list: list) -> list: - return [item for item in _list for _ in range(2)] +def multiply_list(_list: list, multiple=2) -> list: + return [item for item in _list for _ in range(multiple)] test_tracker_data = [ @@ -38,20 +38,9 @@ def is_set(self) -> bool: self.count += 1 return self.count > 3 - class ThreadMock: - def __init__(self, target: typ.Callable): - self.target = target - self.start = mocker.MagicMock(wraps=self.start) - self.join = mocker.MagicMock() - self.is_alive = mocker.MagicMock(return_value=False) - - def start(self): - self.target() - - system_mock = mocker.patch('gpu_tracker.tracker.platform.system', return_value=operating_system) - EventMock = mocker.patch('gpu_tracker.tracker.thrd.Event', wraps=EventMock) - ThreadMock = mocker.patch('gpu_tracker.tracker.thrd.Thread', wraps=ThreadMock) - process_id = 12 + system_mock = mocker.patch('gpu_tracker.tracker.platform.system', return_value=operating_system) + EventMock = mocker.patch('gpu_tracker.tracker.mproc.Event', wraps=EventMock) + main_process_id = 12 child1_id = 21 child2_id = 22 process_rams = [440400, 440400, 550500] @@ -72,13 +61,12 @@ def start(self): child2_shared_dirty = [[79, 408], [537, 1517, 1937], [1908, 405, 1126, 1436]] child2_shared_clean = [[376, 1412], [1621, 241, 734], [1565, 1348, 1877, 775]] child2_paths = [['heap', 'stack'], ['heap', 'stack', 'so3'], ['heap', 'stack', 'so5', 'so6']] - getpid_mock = mocker.patch('gpu_tracker.tracker.os.getpid', return_value=process_id) def get_process_mock( pid: int, rams: list[int], private_dirty: list[list[int]], private_clean: list[list[int]], shared_dirty: list[list[int]], shared_clean: list[list[int]], paths: list[list[str]], cpu_percentages: list[float], num_threads: list[float], children: list[mocker.MagicMock] | None = None) -> mocker.MagicMock: - memory_maps_side_effect = list[list[mocker.MagicMock()]]() + memory_maps_side_effect = list[list[mocker.MagicMock]]() for private_dirty, private_clean, shared_dirty, shared_clean, paths in zip( private_dirty, private_clean, shared_dirty, shared_clean, paths): memory_map_mocks = list[mocker.MagicMock]() @@ -89,9 +77,9 @@ def get_process_mock( path=path) memory_map_mocks.append(memory_map_mock) memory_maps_side_effect.extend([memory_map_mocks, memory_map_mocks]) - rams = double_list(rams) - cpu_percentages = double_list(cpu_percentages) - num_threads = double_list(num_threads) + rams = multiply_list(rams) + cpu_percentages = multiply_list(cpu_percentages) + num_threads = multiply_list(num_threads) return mocker.MagicMock( pid=pid, memory_info=mocker.MagicMock(side_effect=[mocker.MagicMock(rss=ram) for ram in rams]), @@ -107,16 +95,37 @@ def get_process_mock( pid=child2_id, rams=child2_rams, private_dirty=child2_private_dirty, private_clean=child2_private_clean, shared_dirty=child2_shared_dirty, shared_clean=child2_shared_clean, paths=child2_paths, cpu_percentages=[45.6, 22.5, 43.5], num_threads=[4, 2, 3]) - process_mock = get_process_mock( - pid=process_id, rams=process_rams, private_dirty=process_private_dirty, private_clean=process_private_clean, + tracking_process_pid = 666 + tracking_process_mock = mocker.MagicMock(pid=tracking_process_pid) + resource_tracker_pid = 13 + resource_tracker_mock = mocker.MagicMock(pid=resource_tracker_pid) + main_process_mock = get_process_mock( + pid=main_process_id, rams=process_rams, private_dirty=process_private_dirty, private_clean=process_private_clean, shared_dirty=process_shared_dirty, shared_clean=process_shared_clean, paths=process_paths, cpu_percentages=[60.4, 198.9, 99.8], - num_threads=[0, 0, 2], children=[child1_mock, child2_mock]) - ProcessMock = mocker.patch('gpu_tracker.tracker.psutil.Process', return_value=process_mock) + num_threads=[0, 0, 2], children=[child1_mock, child2_mock, tracking_process_mock, resource_tracker_mock]) + child_mocks = [mocker.MagicMock(pid=pid) for pid in (child1_id, child2_id)] + current_process_mock = mocker.MagicMock( + children=mocker.MagicMock(side_effect=[child_mocks, [resource_tracker_mock] + child_mocks])) + PsProcessMock = mocker.patch('gpu_tracker.tracker.psutil.Process', side_effect=[current_process_mock, main_process_mock]) + + def start_mock(self): + start_mock.called = True + self.pid = tracking_process_pid + self.run() + + start_mock.called = False + mocker.patch.object(gput.tracker._TrackingProcess, 'start', new=start_mock) + mocker.patch.object(gput.tracker._TrackingProcess, 'pid', new=None) + mocker.patch.object(gput.tracker._TrackingProcess, 'join') + mocker.patch.object(gput.tracker._TrackingProcess, 'is_alive', return_value=False) + mocker.patch.object(gput.tracker._TrackingProcess, 'terminate') + mocker.patch.object(gput.tracker._TrackingProcess, 'close') virtual_memory_mock = mocker.patch( 'gpu_tracker.tracker.psutil.virtual_memory', side_effect=[ mocker.MagicMock(total=67 * 1e9), mocker.MagicMock(used=30 * 1e9), mocker.MagicMock(used=31 * 1e9), mocker.MagicMock(used=29 * 1e9)]) nvidia_smi_outputs = [ + b'', b'12198 MiB\n12198 MiB', b'', b'', @@ -128,8 +137,9 @@ def get_process_mock( cpu_count_mock = mocker.patch('gpu_tracker.tracker.psutil.cpu_count', return_value=4) cpu_percent_mock = mocker.patch( 'gpu_tracker.tracker.psutil.cpu_percent', side_effect=[[67.5, 27.3, 77.8, 97.9], [57.6, 58.2, 23.5, 99.8], [78.3, 88.3, 87.2, 22.5]]) - time_mock = mocker.patch('gpu_tracker.tracker.time.time', side_effect=[800, 900, 1000, 1100]) - sleep_mock = mocker.patch('gpu_tracker.tracker._testable_sleep') + os_mock = mocker.patch('gpu_tracker.tracker.os', wraps=os, getpid=mocker.MagicMock(return_value=main_process_id)) + time_mock = mocker.patch( + 'gpu_tracker.tracker.time', time=mocker.MagicMock(side_effect=[800, 900, 1000, 1100, 0]), sleep=mocker.MagicMock()) log_spy = mocker.spy(gput.tracker.log, 'warning') sleep_time = 1.5 join_timeout = 5.5 @@ -143,33 +153,39 @@ def get_process_mock( sleep_time=sleep_time, join_timeout=join_timeout, ram_unit=ram_unit, gpu_ram_unit=gpu_ram_unit, time_unit=time_unit) tracker.start() tracker.stop() + assert start_mock.called + assert not os.path.isfile(tracker._resource_usage_file) assert not log_spy.called _assert_args_list(virtual_memory_mock, [()] * 4) system_mock.assert_called_once_with() EventMock.assert_called_once_with() - ThreadMock.assert_called_once_with(target=tracker._profile) - tracker._thread.start.assert_called_once_with() _assert_args_list(mock=tracker._stop_event.is_set, expected_args_list=[()] * 4) - _assert_args_list(mock=getpid_mock, expected_args_list=[()]) - _assert_args_list(mock=ProcessMock, expected_args_list=[(process_id,)]) - _assert_args_list(mock=process_mock.children, expected_args_list=[{'recursive': True}] * 20, use_kwargs=True) + _assert_args_list(mock=PsProcessMock, expected_args_list=[(main_process_id,)] * 2) + _assert_args_list(current_process_mock.children, [()] * 2) + _assert_args_list(mock=main_process_mock.children, expected_args_list=[{'recursive': True}] * 3, use_kwargs=True) if operating_system == 'Linux': - _assert_args_list(mock=process_mock.memory_maps, expected_args_list=[{'grouped': False}] * 6, use_kwargs=True) + _assert_args_list(mock=main_process_mock.memory_maps, expected_args_list=[{'grouped': False}] * 6, use_kwargs=True) _assert_args_list(mock=child1_mock.memory_maps, expected_args_list=[{'grouped': False}] * 6, use_kwargs=True) _assert_args_list(mock=child2_mock.memory_maps, expected_args_list=[{'grouped': False}] * 6, use_kwargs=True) else: - _assert_args_list(mock=process_mock.memory_info, expected_args_list=[()] * 6) + _assert_args_list(mock=main_process_mock.memory_info, expected_args_list=[()] * 6) _assert_args_list(mock=child1_mock.memory_info, expected_args_list=[()] * 6) _assert_args_list(mock=child2_mock.memory_info, expected_args_list=[()] * 6) - assert len(check_output_mock.call_args_list) == 7 - _assert_args_list(mock=time_mock, expected_args_list=[()] * 4) - _assert_args_list(mock=sleep_mock, expected_args_list=[(sleep_time,)] * 3) + assert len(check_output_mock.call_args_list) == 8 + os_mock.getpid.assert_called_once_with() + _assert_args_list(mock=time_mock.time, expected_args_list=[()] * 5) + cpu_percent_interval = gput.tracker._TrackingProcess._CPU_PERCENT_INTERVAL + true_sleep_time = sleep_time - cpu_percent_interval + _assert_args_list( + mock=time_mock.sleep, expected_args_list=[(cpu_percent_interval,), (true_sleep_time,)] * 3) tracker._stop_event.set.assert_called_once_with() - tracker._thread.join.assert_called_once_with(timeout=join_timeout) - _assert_args_list(mock=tracker._thread.is_alive, expected_args_list=[()] * 2) - expected_measurements_file = f'tests/data/{use_context_manager}-{operating_system}-{ram_unit}-{gpu_ram_unit}-{time_unit}' + tracker._tracking_process.join.assert_called_once_with(timeout=join_timeout) + _assert_args_list(mock=tracker._tracking_process.is_alive, expected_args_list=[()] * 2) + assert not tracker._tracking_process.terminate.called + tracker._tracking_process.close.assert_called_once_with() cpu_count_mock.assert_called_once_with() _assert_args_list(cpu_percent_mock, [()] * 3) + expected_measurements_file = f'tests/data/{use_context_manager}-{operating_system}-{ram_unit}-{gpu_ram_unit}-{time_unit}' with open(f'{expected_measurements_file}.txt', 'r') as file: expected_tracker_str = file.read() assert expected_tracker_str == str(tracker) @@ -183,48 +199,51 @@ def _assert_args_list(mock, expected_args_list: list[tuple | dict], use_kwargs: assert actual_args_list == expected_args_list -@pt.mark.parametrize('kill_if_join_fails', [True, False]) -def test_warnings(mocker, kill_if_join_fails: bool, caplog): +def test_warnings(mocker, caplog): n_join_attempts = 3 join_timeout = 5.2 - mocker.patch('gpu_tracker.tracker.thrd.Event', return_value=mocker.MagicMock(set=mocker.MagicMock())) - mocker.patch( - 'gpu_tracker.tracker.thrd.Thread', - return_value=mocker.MagicMock(start=mocker.MagicMock(), is_alive=mocker.MagicMock(return_value=True), join=mocker.MagicMock()) - ) - exit_mock = mocker.patch('gpu_tracker.tracker.sys.exit') - check_output_mock = mocker.patch('gpu_tracker.tracker.subp.check_output', side_effect=[b'']) - system_mock = mocker.patch('gpu_tracker.tracker.platform.system', return_value='linux') - with gput.Tracker(kill_if_join_fails=kill_if_join_fails, n_join_attempts=n_join_attempts, join_timeout=join_timeout) as tracker: - pass - check_output_mock.assert_called_once() - system_mock.assert_called_once() - _assert_args_list(mock=tracker._stop_event.set, expected_args_list=[()] * n_join_attempts) - _assert_args_list(mock=tracker._thread.join, expected_args_list=[{'timeout': join_timeout}] * n_join_attempts, use_kwargs=True) - _assert_args_list(mock=tracker._thread.is_alive, expected_args_list=[()] * (n_join_attempts + 1)) - expected_warnings = ['Thread is still alive after join timout. Attempting to join again...'] * n_join_attempts + subprocess_mock = mocker.patch('gpu_tracker.tracker.subp', check_output=mocker.MagicMock(side_effect=FileNotFoundError)) + mocker.patch('gpu_tracker.tracker.time', time=mocker.MagicMock(return_value=23.0)) + mocker.patch('gpu_tracker.tracker.os.path.getmtime', return_value=12.0) + mocker.patch.object(gput.tracker._TrackingProcess, 'is_alive', return_value=True) + join_spy = mocker.spy(gput.tracker._TrackingProcess, 'join') + terminate_spy = mocker.spy(gput.tracker._TrackingProcess, 'terminate') + close_spy = mocker.spy(gput.tracker._TrackingProcess, 'close') + with gput.Tracker(n_join_attempts=n_join_attempts, join_timeout=join_timeout) as tracker: + set_spy = mocker.spy(tracker._stop_event, 'set') + subprocess_mock.check_output.assert_called_once() + _assert_args_list(mock=set_spy, expected_args_list=[()] * n_join_attempts) + _assert_args_list( + mock=join_spy, expected_args_list=[{'timeout': join_timeout}] * n_join_attempts, use_kwargs=True) + _assert_args_list(mock=tracker._tracking_process.is_alive, expected_args_list=[()] * (n_join_attempts + 1)) + terminate_spy.assert_called_once() + close_spy.assert_called_once() + expected_warnings = [ + 'The nvidia-smi command is not available. Please install the Nvidia drivers to track GPU usage. ' + 'Otherwise the Max GPU RAM values will remain 0.0'] + expected_warnings += ['The tracking process is still alive after join timout. Attempting to join again...'] * n_join_attempts expected_warnings.append( - 'Thread is still alive after 3 attempts to join. The thread will likely not end until the parent process ends.') - if kill_if_join_fails: - expected_warnings.append('The thread failed to join and kill_if_join_fails is set. Exiting ...') - exit_mock.assert_called_once_with(1) - else: - assert not exit_mock.called + 'The tracking process is still alive after 3 attempts to join. Terminating the process by force...') + expected_warnings.append( + 'Tracking is stopping and it has been 11.0 seconds since the temporary tracking results file was last updated. ' + 'Resource usage was not updated during that time.') for expected_warning, record in zip(expected_warnings, caplog.records): assert record.levelname == 'WARNING' assert record.message == expected_warning + assert not os.path.isfile(tracker._resource_usage_file) def test_validate_unit(): + with pt.raises(ValueError) as error: + gput.Tracker(sleep_time=0.0) + assert str(error.value) == 'Sleep time of 0.0 is invalid. Must be at least 0.1 seconds.' with pt.raises(ValueError) as error: gput.Tracker(ram_unit='milibytes') - assert str(error.value) == '"milibytes" is not a valid memory unit. Valid values are bytes, gigabytes, kilobytes, megabytes, terabytes' + assert str(error.value) == '"milibytes" is not a valid RAM unit. Valid values are bytes, gigabytes, kilobytes, megabytes, terabytes' def test_state(mocker): - mocker.patch('gpu_tracker.tracker.subp.check_output', side_effect=[b'']) - mocker.patch('gpu_tracker.tracker.platform.system', return_value='linux') - + mocker.patch('gpu_tracker.tracker.subp.check_output', side_effect=FileNotFoundError) tracker = gput.Tracker() assert tracker.__repr__() == 'State: NEW' with pt.raises(RuntimeError) as error: