Skip to content

Commit

Permalink
Merge pull request #39 from MoseleyBioinformaticsLab/process
Browse files Browse the repository at this point in the history
Returns to using a process-based tracker
  • Loading branch information
erikhuck authored May 15, 2024
2 parents 5d58d06 + 3108741 commit 924beec
Show file tree
Hide file tree
Showing 7 changed files with 335 additions and 251 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,8 @@ jobs:
run: bash tests/install.sh
- name: Test with pytest
run: bash tests/run.sh
- name: Debug with tmate on failure
if: ${{ failure() }}
uses: mxschmitt/action-tmate@v3
with:
limit-access-to-actor: true
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.coverage
.coverage*
.env/
.idea/
dist/
Expand Down
10 changes: 7 additions & 3 deletions src/gpu_tracker/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
Tracks the computational resource usage (RAM, GPU RAM, and compute time) of a process corresponding to a given shell command.
Usage:
gpu-tracker -h | --help
gpu-tracker -v | --version
gpu-tracker --execute=<command> [--output=<output>] [--format=<format>] [--st=<sleep-time>] [--ru=<ram-unit>] [--gru=<gpu-ram-unit>] [--tu=<time-unit>] [--disable-logs]
Options:
-h --help Show this help message.
-h --help Show this help message and exit.
-v --version Show package version and exit.
-e --execute=<command> The command to run along with its arguments all within quotes e.g. "ls -l -a".
-o --output=<output> File path to store the computational-resource-usage measurements. If not set, prints measurements to the screen.
-f --format=<format> File format of the output. Either 'json' or 'text'. Defaults to 'text'.
Expand All @@ -21,10 +24,11 @@
import logging as log
import sys
from . import Tracker
from . import __version__


def main():
args = doc.docopt(__doc__)
args = doc.docopt(__doc__, version=__version__)
command = args['--execute'].split(' ')
output = args['--output']
output_format = args['--format'] if args['--format'] is not None else 'text'
Expand All @@ -37,7 +41,7 @@ def main():
}
kwargs = {
option_map[option]: value for option, value in args.items() if value is not None and option not in {
'--execute', '--output', '--format'}}
'--execute', '--output', '--format', '--help', '--version'}}
if 'sleep_time' in kwargs.keys():
kwargs['sleep_time'] = float(kwargs['sleep_time'])
try:
Expand Down
408 changes: 232 additions & 176 deletions src/gpu_tracker/tracker.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion tests/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ python3 -m venv .env/
source .env/bin/activate || source .env/Scripts/activate # Windows has Scripts instead of bin
python3 -m pip install --upgrade pip
python3 -m pip install pytest pytest-mock pytest-cov sphinx sphinx-rtd-theme notebook
python3 -m pip install -e .
python3 -m pip install -e .
2 changes: 1 addition & 1 deletion tests/run.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
source .env/bin/activate || source .env/Scripts/activate # Windows has Scripts instead of bin
python3 -m pytest tests --cov --cov-branch --cov-report=term-missing
python3 -m pytest tests --cov --cov-branch --cov-report=term-missing
157 changes: 88 additions & 69 deletions tests/test_tracker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import typing as typ
import gpu_tracker as gput
import json
import os
import pytest as pt


Expand All @@ -14,8 +14,8 @@ def get_use_context_manager(request) -> bool:
yield request.param


def double_list(_list: list) -> list:
return [item for item in _list for _ in range(2)]
def multiply_list(_list: list, multiple=2) -> list:
return [item for item in _list for _ in range(multiple)]


test_tracker_data = [
Expand All @@ -38,20 +38,9 @@ def is_set(self) -> bool:
self.count += 1
return self.count > 3

class ThreadMock:
def __init__(self, target: typ.Callable):
self.target = target
self.start = mocker.MagicMock(wraps=self.start)
self.join = mocker.MagicMock()
self.is_alive = mocker.MagicMock(return_value=False)

def start(self):
self.target()

system_mock = mocker.patch('gpu_tracker.tracker.platform.system', return_value=operating_system)
EventMock = mocker.patch('gpu_tracker.tracker.thrd.Event', wraps=EventMock)
ThreadMock = mocker.patch('gpu_tracker.tracker.thrd.Thread', wraps=ThreadMock)
process_id = 12
system_mock = mocker.patch('gpu_tracker.tracker.platform.system', return_value=operating_system)
EventMock = mocker.patch('gpu_tracker.tracker.mproc.Event', wraps=EventMock)
main_process_id = 12
child1_id = 21
child2_id = 22
process_rams = [440400, 440400, 550500]
Expand All @@ -72,13 +61,12 @@ def start(self):
child2_shared_dirty = [[79, 408], [537, 1517, 1937], [1908, 405, 1126, 1436]]
child2_shared_clean = [[376, 1412], [1621, 241, 734], [1565, 1348, 1877, 775]]
child2_paths = [['heap', 'stack'], ['heap', 'stack', 'so3'], ['heap', 'stack', 'so5', 'so6']]
getpid_mock = mocker.patch('gpu_tracker.tracker.os.getpid', return_value=process_id)

def get_process_mock(
pid: int, rams: list[int], private_dirty: list[list[int]], private_clean: list[list[int]], shared_dirty: list[list[int]],
shared_clean: list[list[int]], paths: list[list[str]], cpu_percentages: list[float], num_threads: list[float],
children: list[mocker.MagicMock] | None = None) -> mocker.MagicMock:
memory_maps_side_effect = list[list[mocker.MagicMock()]]()
memory_maps_side_effect = list[list[mocker.MagicMock]]()
for private_dirty, private_clean, shared_dirty, shared_clean, paths in zip(
private_dirty, private_clean, shared_dirty, shared_clean, paths):
memory_map_mocks = list[mocker.MagicMock]()
Expand All @@ -89,9 +77,9 @@ def get_process_mock(
path=path)
memory_map_mocks.append(memory_map_mock)
memory_maps_side_effect.extend([memory_map_mocks, memory_map_mocks])
rams = double_list(rams)
cpu_percentages = double_list(cpu_percentages)
num_threads = double_list(num_threads)
rams = multiply_list(rams)
cpu_percentages = multiply_list(cpu_percentages)
num_threads = multiply_list(num_threads)
return mocker.MagicMock(
pid=pid,
memory_info=mocker.MagicMock(side_effect=[mocker.MagicMock(rss=ram) for ram in rams]),
Expand All @@ -107,16 +95,37 @@ def get_process_mock(
pid=child2_id, rams=child2_rams, private_dirty=child2_private_dirty, private_clean=child2_private_clean,
shared_dirty=child2_shared_dirty, shared_clean=child2_shared_clean, paths=child2_paths, cpu_percentages=[45.6, 22.5, 43.5],
num_threads=[4, 2, 3])
process_mock = get_process_mock(
pid=process_id, rams=process_rams, private_dirty=process_private_dirty, private_clean=process_private_clean,
tracking_process_pid = 666
tracking_process_mock = mocker.MagicMock(pid=tracking_process_pid)
resource_tracker_pid = 13
resource_tracker_mock = mocker.MagicMock(pid=resource_tracker_pid)
main_process_mock = get_process_mock(
pid=main_process_id, rams=process_rams, private_dirty=process_private_dirty, private_clean=process_private_clean,
shared_dirty=process_shared_dirty, shared_clean=process_shared_clean, paths=process_paths, cpu_percentages=[60.4, 198.9, 99.8],
num_threads=[0, 0, 2], children=[child1_mock, child2_mock])
ProcessMock = mocker.patch('gpu_tracker.tracker.psutil.Process', return_value=process_mock)
num_threads=[0, 0, 2], children=[child1_mock, child2_mock, tracking_process_mock, resource_tracker_mock])
child_mocks = [mocker.MagicMock(pid=pid) for pid in (child1_id, child2_id)]
current_process_mock = mocker.MagicMock(
children=mocker.MagicMock(side_effect=[child_mocks, [resource_tracker_mock] + child_mocks]))
PsProcessMock = mocker.patch('gpu_tracker.tracker.psutil.Process', side_effect=[current_process_mock, main_process_mock])

def start_mock(self):
start_mock.called = True
self.pid = tracking_process_pid
self.run()

start_mock.called = False
mocker.patch.object(gput.tracker._TrackingProcess, 'start', new=start_mock)
mocker.patch.object(gput.tracker._TrackingProcess, 'pid', new=None)
mocker.patch.object(gput.tracker._TrackingProcess, 'join')
mocker.patch.object(gput.tracker._TrackingProcess, 'is_alive', return_value=False)
mocker.patch.object(gput.tracker._TrackingProcess, 'terminate')
mocker.patch.object(gput.tracker._TrackingProcess, 'close')
virtual_memory_mock = mocker.patch(
'gpu_tracker.tracker.psutil.virtual_memory', side_effect=[
mocker.MagicMock(total=67 * 1e9), mocker.MagicMock(used=30 * 1e9), mocker.MagicMock(used=31 * 1e9),
mocker.MagicMock(used=29 * 1e9)])
nvidia_smi_outputs = [
b'',
b'12198 MiB\n12198 MiB',
b'',
b'',
Expand All @@ -128,8 +137,9 @@ def get_process_mock(
cpu_count_mock = mocker.patch('gpu_tracker.tracker.psutil.cpu_count', return_value=4)
cpu_percent_mock = mocker.patch(
'gpu_tracker.tracker.psutil.cpu_percent', side_effect=[[67.5, 27.3, 77.8, 97.9], [57.6, 58.2, 23.5, 99.8], [78.3, 88.3, 87.2, 22.5]])
time_mock = mocker.patch('gpu_tracker.tracker.time.time', side_effect=[800, 900, 1000, 1100])
sleep_mock = mocker.patch('gpu_tracker.tracker._testable_sleep')
os_mock = mocker.patch('gpu_tracker.tracker.os', wraps=os, getpid=mocker.MagicMock(return_value=main_process_id))
time_mock = mocker.patch(
'gpu_tracker.tracker.time', time=mocker.MagicMock(side_effect=[800, 900, 1000, 1100, 0]), sleep=mocker.MagicMock())
log_spy = mocker.spy(gput.tracker.log, 'warning')
sleep_time = 1.5
join_timeout = 5.5
Expand All @@ -143,33 +153,39 @@ def get_process_mock(
sleep_time=sleep_time, join_timeout=join_timeout, ram_unit=ram_unit, gpu_ram_unit=gpu_ram_unit, time_unit=time_unit)
tracker.start()
tracker.stop()
assert start_mock.called
assert not os.path.isfile(tracker._resource_usage_file)
assert not log_spy.called
_assert_args_list(virtual_memory_mock, [()] * 4)
system_mock.assert_called_once_with()
EventMock.assert_called_once_with()
ThreadMock.assert_called_once_with(target=tracker._profile)
tracker._thread.start.assert_called_once_with()
_assert_args_list(mock=tracker._stop_event.is_set, expected_args_list=[()] * 4)
_assert_args_list(mock=getpid_mock, expected_args_list=[()])
_assert_args_list(mock=ProcessMock, expected_args_list=[(process_id,)])
_assert_args_list(mock=process_mock.children, expected_args_list=[{'recursive': True}] * 20, use_kwargs=True)
_assert_args_list(mock=PsProcessMock, expected_args_list=[(main_process_id,)] * 2)
_assert_args_list(current_process_mock.children, [()] * 2)
_assert_args_list(mock=main_process_mock.children, expected_args_list=[{'recursive': True}] * 3, use_kwargs=True)
if operating_system == 'Linux':
_assert_args_list(mock=process_mock.memory_maps, expected_args_list=[{'grouped': False}] * 6, use_kwargs=True)
_assert_args_list(mock=main_process_mock.memory_maps, expected_args_list=[{'grouped': False}] * 6, use_kwargs=True)
_assert_args_list(mock=child1_mock.memory_maps, expected_args_list=[{'grouped': False}] * 6, use_kwargs=True)
_assert_args_list(mock=child2_mock.memory_maps, expected_args_list=[{'grouped': False}] * 6, use_kwargs=True)
else:
_assert_args_list(mock=process_mock.memory_info, expected_args_list=[()] * 6)
_assert_args_list(mock=main_process_mock.memory_info, expected_args_list=[()] * 6)
_assert_args_list(mock=child1_mock.memory_info, expected_args_list=[()] * 6)
_assert_args_list(mock=child2_mock.memory_info, expected_args_list=[()] * 6)
assert len(check_output_mock.call_args_list) == 7
_assert_args_list(mock=time_mock, expected_args_list=[()] * 4)
_assert_args_list(mock=sleep_mock, expected_args_list=[(sleep_time,)] * 3)
assert len(check_output_mock.call_args_list) == 8
os_mock.getpid.assert_called_once_with()
_assert_args_list(mock=time_mock.time, expected_args_list=[()] * 5)
cpu_percent_interval = gput.tracker._TrackingProcess._CPU_PERCENT_INTERVAL
true_sleep_time = sleep_time - cpu_percent_interval
_assert_args_list(
mock=time_mock.sleep, expected_args_list=[(cpu_percent_interval,), (true_sleep_time,)] * 3)
tracker._stop_event.set.assert_called_once_with()
tracker._thread.join.assert_called_once_with(timeout=join_timeout)
_assert_args_list(mock=tracker._thread.is_alive, expected_args_list=[()] * 2)
expected_measurements_file = f'tests/data/{use_context_manager}-{operating_system}-{ram_unit}-{gpu_ram_unit}-{time_unit}'
tracker._tracking_process.join.assert_called_once_with(timeout=join_timeout)
_assert_args_list(mock=tracker._tracking_process.is_alive, expected_args_list=[()] * 2)
assert not tracker._tracking_process.terminate.called
tracker._tracking_process.close.assert_called_once_with()
cpu_count_mock.assert_called_once_with()
_assert_args_list(cpu_percent_mock, [()] * 3)
expected_measurements_file = f'tests/data/{use_context_manager}-{operating_system}-{ram_unit}-{gpu_ram_unit}-{time_unit}'
with open(f'{expected_measurements_file}.txt', 'r') as file:
expected_tracker_str = file.read()
assert expected_tracker_str == str(tracker)
Expand All @@ -183,48 +199,51 @@ def _assert_args_list(mock, expected_args_list: list[tuple | dict], use_kwargs:
assert actual_args_list == expected_args_list


@pt.mark.parametrize('kill_if_join_fails', [True, False])
def test_warnings(mocker, kill_if_join_fails: bool, caplog):
def test_warnings(mocker, caplog):
n_join_attempts = 3
join_timeout = 5.2
mocker.patch('gpu_tracker.tracker.thrd.Event', return_value=mocker.MagicMock(set=mocker.MagicMock()))
mocker.patch(
'gpu_tracker.tracker.thrd.Thread',
return_value=mocker.MagicMock(start=mocker.MagicMock(), is_alive=mocker.MagicMock(return_value=True), join=mocker.MagicMock())
)
exit_mock = mocker.patch('gpu_tracker.tracker.sys.exit')
check_output_mock = mocker.patch('gpu_tracker.tracker.subp.check_output', side_effect=[b''])
system_mock = mocker.patch('gpu_tracker.tracker.platform.system', return_value='linux')
with gput.Tracker(kill_if_join_fails=kill_if_join_fails, n_join_attempts=n_join_attempts, join_timeout=join_timeout) as tracker:
pass
check_output_mock.assert_called_once()
system_mock.assert_called_once()
_assert_args_list(mock=tracker._stop_event.set, expected_args_list=[()] * n_join_attempts)
_assert_args_list(mock=tracker._thread.join, expected_args_list=[{'timeout': join_timeout}] * n_join_attempts, use_kwargs=True)
_assert_args_list(mock=tracker._thread.is_alive, expected_args_list=[()] * (n_join_attempts + 1))
expected_warnings = ['Thread is still alive after join timout. Attempting to join again...'] * n_join_attempts
subprocess_mock = mocker.patch('gpu_tracker.tracker.subp', check_output=mocker.MagicMock(side_effect=FileNotFoundError))
mocker.patch('gpu_tracker.tracker.time', time=mocker.MagicMock(return_value=23.0))
mocker.patch('gpu_tracker.tracker.os.path.getmtime', return_value=12.0)
mocker.patch.object(gput.tracker._TrackingProcess, 'is_alive', return_value=True)
join_spy = mocker.spy(gput.tracker._TrackingProcess, 'join')
terminate_spy = mocker.spy(gput.tracker._TrackingProcess, 'terminate')
close_spy = mocker.spy(gput.tracker._TrackingProcess, 'close')
with gput.Tracker(n_join_attempts=n_join_attempts, join_timeout=join_timeout) as tracker:
set_spy = mocker.spy(tracker._stop_event, 'set')
subprocess_mock.check_output.assert_called_once()
_assert_args_list(mock=set_spy, expected_args_list=[()] * n_join_attempts)
_assert_args_list(
mock=join_spy, expected_args_list=[{'timeout': join_timeout}] * n_join_attempts, use_kwargs=True)
_assert_args_list(mock=tracker._tracking_process.is_alive, expected_args_list=[()] * (n_join_attempts + 1))
terminate_spy.assert_called_once()
close_spy.assert_called_once()
expected_warnings = [
'The nvidia-smi command is not available. Please install the Nvidia drivers to track GPU usage. '
'Otherwise the Max GPU RAM values will remain 0.0']
expected_warnings += ['The tracking process is still alive after join timout. Attempting to join again...'] * n_join_attempts
expected_warnings.append(
'Thread is still alive after 3 attempts to join. The thread will likely not end until the parent process ends.')
if kill_if_join_fails:
expected_warnings.append('The thread failed to join and kill_if_join_fails is set. Exiting ...')
exit_mock.assert_called_once_with(1)
else:
assert not exit_mock.called
'The tracking process is still alive after 3 attempts to join. Terminating the process by force...')
expected_warnings.append(
'Tracking is stopping and it has been 11.0 seconds since the temporary tracking results file was last updated. '
'Resource usage was not updated during that time.')
for expected_warning, record in zip(expected_warnings, caplog.records):
assert record.levelname == 'WARNING'
assert record.message == expected_warning
assert not os.path.isfile(tracker._resource_usage_file)


def test_validate_unit():
with pt.raises(ValueError) as error:
gput.Tracker(sleep_time=0.0)
assert str(error.value) == 'Sleep time of 0.0 is invalid. Must be at least 0.1 seconds.'
with pt.raises(ValueError) as error:
gput.Tracker(ram_unit='milibytes')
assert str(error.value) == '"milibytes" is not a valid memory unit. Valid values are bytes, gigabytes, kilobytes, megabytes, terabytes'
assert str(error.value) == '"milibytes" is not a valid RAM unit. Valid values are bytes, gigabytes, kilobytes, megabytes, terabytes'


def test_state(mocker):
mocker.patch('gpu_tracker.tracker.subp.check_output', side_effect=[b''])
mocker.patch('gpu_tracker.tracker.platform.system', return_value='linux')

mocker.patch('gpu_tracker.tracker.subp.check_output', side_effect=FileNotFoundError)
tracker = gput.Tracker()
assert tracker.__repr__() == 'State: NEW'
with pt.raises(RuntimeError) as error:
Expand Down

0 comments on commit 924beec

Please sign in to comment.