Skip to content

Commit

Permalink
Add memory and cpu polling to TestHarness, add --max-mem option
Browse files Browse the repository at this point in the history
  • Loading branch information
loganharbour committed Dec 19, 2024
1 parent dc0e9c8 commit 32ae08b
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 3 deletions.
33 changes: 32 additions & 1 deletion python/TestHarness/TestHarness.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,25 @@ def cleanup(self):
print(str(group[0]).ljust((self.options.term_cols - (len(group[1]) + 4)), ' '), f'[{group[1]}s]')
print('\n')

if self.options.largest_jobs:
valued_tups = [tup for tup in self.test_table if tup[0].getMaxMemoryUsage() not in [None, 0]]
sorted_tups = sorted(valued_tups, key=lambda tup: tup[0].getMaxMemoryUsage(), reverse=True)

print('\n%d largest jobs:' % self.options.largest_jobs)
print(('-' * (self.options.term_cols)))

# Copy the current options and force timing to be true so that
# we get memory when we call formatResult() below
options_with_timing = copy.deepcopy(self.options)
options_with_timing.timing = True

for tup in sorted_tups[0:self.options.largest_jobs]:
job = tup[0]
if not job.isSkip() and job.getMaxMemoryUsage() > 0:
print(util.formatResult(job, options_with_timing, caveats=True))
if len(sorted_tups) == 0:
print('No jobs were completed or no jobs contained resource usage.')

all_jobs = self.scheduler.retrieveJobs()

# Gather and print the jobs with race conditions after the jobs are finished
Expand Down Expand Up @@ -1065,6 +1084,7 @@ def parseCLArgs(self, argv):
parser.add_argument('-l', '--load-average', action='store', type=float, dest='load', help='Do not run additional tests if the load average is at least LOAD')
parser.add_argument('-t', '--timing', action='store_true', dest='timing', help='Report Timing information for passing tests')
parser.add_argument('--longest-jobs', action='store', dest='longest_jobs', type=int, default=0, help='Print the longest running jobs upon completion')
parser.add_argument('--largest-jobs', action='store', dest='largest_jobs', type=int, default=0, help='Print the largest (by max memory usage) jobs upon completion')
parser.add_argument('-s', '--scale', action='store_true', dest='scaling', help='Scale problems that have SCALE_REFINE set')
parser.add_argument('-i', nargs=1, action='store', type=str, dest='input_file_name', help='The test specification file to look for (default: tests)')
parser.add_argument('--libmesh_dir', nargs=1, action='store', type=str, dest='libmesh_dir', help='Currently only needed for bitten code coverage')
Expand Down Expand Up @@ -1131,6 +1151,10 @@ def parseCLArgs(self, argv):
hpcgroup.add_argument('--hpc-no-hold', nargs=1, action='store', type=bool, default=False, dest='hpc_no_hold', help='Do not pre-create hpc jobs to be held')
hpcgroup.add_argument('--pbs-queue', nargs=1, action='store', dest='hpc_queue', type=str, metavar='', help='Submit jobs to the specified queue')

# Options for resource limits
resourcegroup = parser.add_argument_group('Resource Options', 'Options for controlling resource limits')
resourcegroup.add_argument('--max-memory', dest='max_memory', action='store', type=str, default=None, help='Set maximum memory allowed per slot (default none, ex: 100MB)')

# Try to find the terminal size if we can
# Try/except here because the terminal size could fail w/o a display
term_cols = None
Expand All @@ -1142,7 +1166,7 @@ def parseCLArgs(self, argv):

# Optionally load in the environment controlled values
term_cols = int(os.getenv('MOOSE_TERM_COLS', term_cols))
term_format = os.getenv('MOOSE_TERM_FORMAT', 'njcst')
term_format = os.getenv('MOOSE_TERM_FORMAT', 'njcsmt')

# Terminal options
termgroup = parser.add_argument_group('Terminal Options', 'Options for controlling the formatting of terminal output')
Expand Down Expand Up @@ -1171,6 +1195,13 @@ def parseCLArgs(self, argv):
if type(value) == list and len(value) == 1:
setattr(self.options, key, value[0])

# Convert max_memory to bytes
if self.options.max_memory is not None:
try:
self.options.max_memory = util.convertMemoryToBytes(self.options.max_memory)
except:
print(f'ERROR: Failed to parse --max-memory="{self.options.max_memory}"')
sys.exit(1)
self.checkAndUpdateCLArgs()

## Called after options are parsed from the command line
Expand Down
78 changes: 77 additions & 1 deletion python/TestHarness/runners/Runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@
#* Licensed under LGPL 2.1, please see LICENSE for details
#* https://www.gnu.org/licenses/lgpl-2.1.html

import os, json
import os, threading, time
from collections import namedtuple
from TestHarness import OutputInterface, util

class Runner(OutputInterface):
# Helper struct for storing information about sampled resource usage
ResourceUsage = namedtuple('ResourceUsage', 'cpu_percent mem_bytes')

"""
Base class for running a process via a command.
Expand All @@ -33,6 +37,11 @@ def __init__(self, job, options):
# their output separately
self.run_output = OutputInterface()

# Resource usage over time for the spawned process
self.resource_usage = None
# Thread that collects resource usage (if any)
self.resource_usage_thread = None

def getRunOutput(self):
""" Get the OutputInterface object for the actual run """
return self.run_output
Expand Down Expand Up @@ -109,3 +118,70 @@ def readOutput(self, stream):
if output and output[-1] != '\n':
output += '\n'
return output

def startResourcePoll(self):
"""
To be called in the derived Runner to start the resource usage poller
"""
self.resource_usage = []
self.resource_usage_thread = threading.Thread(target=self._pollResources)
self.resource_usage_thread.start()

def _pollResources(self):
"""
Internal method for a while loop that calls pollResourceUsage()
to capture resource usage as the process continues
Will kill the running process if a resource limit is set and said limit
is exceeded.
"""
while True:
time.sleep(0.1)
usage = None
try:
usage = self.pollResourceUsage()
except:
return
if usage is None:
return
self.resource_usage.append(usage)

slots = self.job.getSlots()

if self.options.max_memory is not None:
allowed_mem = slots * self.options.max_memory
if usage.mem_bytes > allowed_mem:
usage_human = util.humanMemory(usage.mem_bytes)
allowed_human = util.humanMemory(allowed_mem)
self.appendOutput(f'Process killed; memory usage {usage_human} exceeded {allowed_human}')
self.job.setStatus(self.job.error, 'EXCEEDED MEM')
self.kill()
return

# allowed_precent = float((self.job.getSlots() + 1) * 100)
# if usage.cpu_percent > allowed_precent:
# self.appendOutput(f'Process killed; cpu usage {allowed_precent}% exceeded allowed {allowed_precent}%')
# self.job.setStatus(self.job.error, 'EXCEEDED CPU')
# self.kill()
# return

def pollResourceUsage(self):
"""
To be overridden in the derived Runner to collect the resource usage at
this current time.
Should return a ResourceUsage object.
"""
return None

def getMaxMemoryUsage(self):
"""
Get the max memory usage (in bytes) of the spawned process if it was
able to be captured
"""
if self.resource_usage is None:
return None
max_mem = 0
for usage in self.resource_usage:
max_mem = max(max_mem, usage.mem_bytes)
return max_mem
17 changes: 16 additions & 1 deletion python/TestHarness/runners/SubprocessRunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from tempfile import SpooledTemporaryFile
from signal import SIGTERM
from TestHarness.runners.Runner import Runner
from TestHarness import util

class SubprocessRunner(Runner):
"""
Expand Down Expand Up @@ -70,14 +69,30 @@ def spawn(self, timer):
except Exception as e:
raise Exception('Error in launching a new task') from e

self.startResourcePoll()
timer.start('runner_run')

def pollResourceUsage(self):
try:
if self.process is None or self.process.poll() is not None:
return None
cmd = ['ps', '-o', 'pcpu,rss', '--no-headers', f'{self.process.pid}']
process = subprocess.run(cmd, text=True, capture_output=True)
if process.returncode == 0 and process.stdout is not None:
result = process.stdout.strip().split()
usage = self.ResourceUsage(cpu_percent=float(result[0]),
mem_bytes=int(result[1]) * 1024)
return usage
except:
return None

def wait(self, timer):
self.process.wait()

timer.stop('runner_run')

self.exit_code = self.process.poll()
self.process = None

# This should have been cleared before the job started
if self.getRunOutput().hasOutput():
Expand Down
6 changes: 6 additions & 0 deletions python/TestHarness/schedulers/Job.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,12 @@ def getTiming(self):
return self.timer.totalTime()
return 0.0

def getMaxMemoryUsage(self):
""" Return max memory usage of the runner process, if available """
if self._runner is not None:
return self._runner.getMaxMemoryUsage()
return None

def getStatus(self):
return self.job_status.getStatus()

Expand Down
54 changes: 54 additions & 0 deletions python/TestHarness/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,23 @@ def formatResult(job, options, result='', color=True, **kwargs):
f_time = '[' + '{0: <6}'.format('%0.*fs' % (precision, actual)) + ']'
formatCase(f_key, (f_time, None), formatted_results)

if str(f_key).lower() == 'm' and options.timing:
value = 0
suffix = 'MB'
if not job.isSkip():
max_mem = job.getMaxMemoryUsage()
if max_mem is None or max_mem == 0:
value = None
else:
value, suffix = humanMemory(max_mem, split=True)
if isinstance(value, (float, int)):
int_len = len(str(int(value)))
precision = min(3, max(0,(4-int_len)))
f_max_mem = f'[{value:<5.{precision}f}{suffix}]'
else:
f_max_mem = f'[?????{suffix}]'
formatCase(f_key, (f_max_mem, None), formatted_results)

# Decorate Caveats
if job.getCaveats() and caveat_index is not None and 'caveats' in kwargs and kwargs['caveats']:
caveats = ','.join(job.getCaveats())
Expand Down Expand Up @@ -890,3 +907,40 @@ def outputHeader(header, ending=True):
begin_sep = '#' * 80
end_sep = f'{begin_sep}\n' if ending else ''
return f'{begin_sep}\n{header}\n{end_sep}'

# Conversions for memory sizes
byte_conversions = {'B': 1,
'KB': 1024,
'MB':1024**2,
'GB':1024**3,
'TB':1024**4}

def convertMemoryToBytes(size_str: str) -> int:
"""
Converts the given size string (100B, 100MB, etc)
into an integer number of bytes
"""
search = re.fullmatch(r'(\d+(?:.\d+)?)([A-Z]+)', size_str)
if search is None:
raise ValueError(f'Failed to parse memory size from "{size_str}"')
value = search.group(1)
unit = search.group(2)
if unit not in byte_conversions:
raise ValueError(f'Unknown memory unit "{unit}"')
return float(value) * byte_conversions[unit]

def humanMemory(bytes: int, digits=2, split=False) -> str:
"""
Convert the given size in bytes to a human readable memory value
The split option returns the value and unit separately.
"""
value = f'{bytes}B'
for unit, conversion in byte_conversions.items():
scaled_value = float(bytes) / float(conversion)
value = f'{scaled_value:.{digits}f}{unit}'
if scaled_value < 999:
break
if split:
return scaled_value, unit
return value

0 comments on commit 32ae08b

Please sign in to comment.