Skip to content

Implement a new --failing-and-slow-first command line argument to test runner. #24624

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ coverage.xml
# ...except the templates.
!/tools/run_python.ps1
!/tools/run_python_compiler.ps1

# Test runner previous run results for sorting the next run
__previous_test_run_results.json
71 changes: 65 additions & 6 deletions test/parallel_testsuite.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# University of Illinois/NCSA Open Source License. Both these licenses can be
# found in the LICENSE file.

import json
import multiprocessing
import os
import sys
Expand All @@ -19,7 +20,12 @@
seen_class = set()


def run_test(test):
def run_test(test, failfast_event):
# If failfast mode is in effect and any of the tests have failed,
# and then we should abort executing further tests immediately.
if failfast_event is not None and failfast_event.is_set():
return None

olddir = os.getcwd()
result = BufferedParallelTestResult()
temp_dir = tempfile.mkdtemp(prefix='emtest_')
Expand All @@ -29,10 +35,16 @@ def run_test(test):
seen_class.add(test.__class__)
test.__class__.setUpClass()
test(result)

# Alert all other multiprocess pool runners that they need to stop executing further tests.
if failfast_event is not None and result.test_result not in ['success', 'skipped']:
failfast_event.set()
except unittest.SkipTest as e:
result.addSkip(test, e)
except Exception as e:
result.addError(test, e)
if failfast_event is not None:
failfast_event.set()
# Before attempting to delete the tmp dir make sure the current
# working directory is not within it.
os.chdir(olddir)
Expand All @@ -46,9 +58,11 @@ class ParallelTestSuite(unittest.BaseTestSuite):
Creates worker threads, manages the task queue, and combines the results.
"""

def __init__(self, max_cores):
def __init__(self, max_cores, options):
super().__init__()
self.max_cores = max_cores
self.failfast = options.failfast
self.failing_and_slow_first = options.failing_and_slow_first

def addTest(self, test):
super().addTest(test)
Expand All @@ -61,12 +75,48 @@ def run(self, result):
# inherited by the child process, but can lead to hard-to-debug windows-only
# issues.
# multiprocessing.set_start_method('spawn')
tests = list(self.reversed_tests())

# If we are running with --failing-and-slow-first, then the test list has been
# pre-sorted based on previous test run results. Otherwise run the tests in
# reverse alphabetical order.
tests = list(self if self.failing_and_slow_first else self.reversed_tests())
use_cores = cap_max_workers_in_pool(min(self.max_cores, len(tests), num_cores()))
print('Using %s parallel test processes' % use_cores)
pool = multiprocessing.Pool(use_cores)
results = [pool.apply_async(run_test, (t,)) for t in tests]
results = [r.get() for r in results]
with multiprocessing.Manager() as manager:
pool = multiprocessing.Pool(use_cores)
failfast_event = manager.Event() if self.failfast else None
results = [pool.apply_async(run_test, (t, failfast_event)) for t in tests]
results = [r.get() for r in results]
results = [r for r in results if r is not None]

try:
previous_test_run_results = json.load(open('__previous_test_run_results.json'))
except FileNotFoundError:
previous_test_run_results = {}

if self.failing_and_slow_first:
for r in results:
# Save a test result record with the specific suite name (e.g. "core0.test_foo")
test_failed = r.test_result not in ['success', 'skipped']
fail_frequency = previous_test_run_results[r.test_name]['fail_frequency'] if r.test_name in previous_test_run_results else int(test_failed)
fail_frequency = (fail_frequency + int(test_failed)) / 2
previous_test_run_results[r.test_name] = {
'result': r.test_result,
'duration': r.test_duration,
'fail_frequency': fail_frequency,
}
# Also save a test result record without suite name (e.g. just "test_foo"). This enables different suite runs to order tests
# for quick --failfast termination, in case a test fails in multiple suites
test_in_any_suite = r.test_name.split(' ')[0]
fail_frequency = previous_test_run_results[test_in_any_suite]['fail_frequency'] if test_in_any_suite in previous_test_run_results else int(test_failed)
fail_frequency = (fail_frequency + int(test_failed)) / 2
previous_test_run_results[test_in_any_suite] = {
'result': r.test_result,
'duration': r.test_duration,
'fail_frequency': fail_frequency,
}

json.dump(previous_test_run_results, open('__previous_test_run_results.json', 'w'), indent=2)
pool.close()
pool.join()
return self.combine_results(result, results)
Expand Down Expand Up @@ -104,6 +154,8 @@ class BufferedParallelTestResult:
def __init__(self):
self.buffered_result = None
self.test_duration = 0
self.test_result = 'errored'
self.test_name = ''

@property
def test(self):
Expand All @@ -122,6 +174,7 @@ def updateResult(self, result):
result.core_time += self.test_duration

def startTest(self, test):
self.test_name = str(test)
self.start_time = time.perf_counter()

def stopTest(self, test):
Expand All @@ -134,28 +187,34 @@ def addSuccess(self, test):
if hasattr(time, 'perf_counter'):
print(test, '... ok (%.2fs)' % (self.calculateElapsed()), file=sys.stderr)
self.buffered_result = BufferedTestSuccess(test)
self.test_result = 'success'

def addExpectedFailure(self, test, err):
if hasattr(time, 'perf_counter'):
print(test, '... expected failure (%.2fs)' % (self.calculateElapsed()), file=sys.stderr)
self.buffered_result = BufferedTestExpectedFailure(test, err)
self.test_result = 'expected failure'

def addUnexpectedSuccess(self, test):
if hasattr(time, 'perf_counter'):
print(test, '... unexpected success (%.2fs)' % (self.calculateElapsed()), file=sys.stderr)
self.buffered_result = BufferedTestUnexpectedSuccess(test)
self.test_result = 'unexpected success'

def addSkip(self, test, reason):
print(test, "... skipped '%s'" % reason, file=sys.stderr)
self.buffered_result = BufferedTestSkip(test, reason)
self.test_result = 'skipped'

def addFailure(self, test, err):
print(test, '... FAIL', file=sys.stderr)
self.buffered_result = BufferedTestFailure(test, err)
self.test_result = 'failed'

def addError(self, test, err):
print(test, '... ERROR', file=sys.stderr)
self.buffered_result = BufferedTestError(test, err)
self.test_result = 'errored'


class BufferedTestBase:
Expand Down
90 changes: 81 additions & 9 deletions test/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import atexit
import fnmatch
import glob
import json
import logging
import math
import operator
Expand All @@ -30,6 +31,7 @@
import sys
import unittest
import time
from functools import cmp_to_key

# Setup

Expand Down Expand Up @@ -270,8 +272,75 @@ def error_on_legacy_suite_names(args):
utils.exit_with_error('`%s` test suite has been replaced with `%s`', a, new)


def load_test_suites(args, modules, start_at, repeat):
found_start = not start_at
def create_test_run_sorter(failfast):
try:
previous_test_run_results = json.load(open('__previous_test_run_results.json'))
except FileNotFoundError:
previous_test_run_results = {}

def read_approx_fail_freq(test_name):
if test_name in previous_test_run_results and 'fail_frequency' in previous_test_run_results[test_name]:
# Quantize the float value to relatively fine-grained buckets for sorting
return round(previous_test_run_results[test_name]['fail_frequency'] * 20) / 20
return 0

def sort_tests_failing_and_slowest_first_comparator(x, y):
x = str(x)
y = str(y)

# Look at the number of times this test has failed, and order by failures count first
# Only do this in --failfast, if we are looking to fail early. (otherwise sorting by last test run duration is more productive)
if failfast:
x_fail_freq = read_approx_fail_freq(x)
y_fail_freq = read_approx_fail_freq(y)
if x_fail_freq != y_fail_freq:
return y_fail_freq - x_fail_freq

# Look at the number of times this test has failed overall in any other suite, and order by failures count first
x_fail_freq = read_approx_fail_freq(x.split(' ')[0])
y_fail_freq = read_approx_fail_freq(y.split(' ')[0])
if x_fail_freq != y_fail_freq:
return y_fail_freq - x_fail_freq

if x in previous_test_run_results:
X = previous_test_run_results[x]

# if test Y has not been run even once, run Y before X
if y not in previous_test_run_results:
return 1
Y = previous_test_run_results[y]

# If both X and Y have been run before, order the tests based on what the previous result was (failures first, skips very last)
# N.b. it is important to sandwich all skipped tests between fails and successes. This is to maximize the chances that when
# a failing test is detected, then the other cores will fail-fast as well. (successful tests are run slowest-first to help
# scheduling)
order_by_result = {'errored': 0, 'failed': 1, 'expected failure': 2, 'unexpected success': 3, 'skipped': 4, 'success': 5}
x_result = order_by_result[X['result']]
y_result = order_by_result[Y['result']]
if x_result != y_result:
return x_result - y_result

# Finally, order by test duration from last run
if X['duration'] != Y['duration']:
if X['result'] == 'success':
# If both tests were successful tests, run the slower test first to improve parallelism
return Y['duration'] - X['duration']
else:
# If both tests were failing tests, run the quicker test first to improve --failfast detection time
return X['duration'] - Y['duration']

# if test X has not been run even once, but Y has, run X before Y
if y in previous_test_run_results:
return -1

# Neither test have been run before, so run them in alphabetical order
return (x > y) - (x < y)

return sort_tests_failing_and_slowest_first_comparator


def load_test_suites(args, modules, options):
found_start = not options.start_at

loader = unittest.TestLoader()
error_on_legacy_suite_names(args)
Expand All @@ -291,20 +360,22 @@ def load_test_suites(args, modules, start_at, repeat):
if names_in_module:
loaded_tests = loader.loadTestsFromNames(sorted(names_in_module), m)
tests = flattened_tests(loaded_tests)
suite = suite_for_module(m, tests)
suite = suite_for_module(m, tests, options)
if options.failing_and_slow_first:
tests = sorted(tests, key=cmp_to_key(create_test_run_sorter(options.failfast)))
for test in tests:
if not found_start:
# Skip over tests until we find the start
if test.id().endswith(start_at):
if test.id().endswith(options.start_at):
found_start = True
else:
continue
for _x in range(repeat):
for _x in range(options.repeat):
total_tests += 1
suite.addTest(test)
suites.append((m.__name__, suite))
if not found_start:
utils.exit_with_error(f'unable to find --start-at test: {start_at}')
utils.exit_with_error(f'unable to find --start-at test: {options.start_at}')
if total_tests == 1 or parallel_testsuite.num_cores() == 1:
# TODO: perhaps leave it at 2 if it was 2 before?
common.EMTEST_SAVE_DIR = 1
Expand All @@ -318,13 +389,13 @@ def flattened_tests(loaded_tests):
return tests


def suite_for_module(module, tests):
def suite_for_module(module, tests, options):
suite_supported = module.__name__ in ('test_core', 'test_other', 'test_posixtest')
if not common.EMTEST_SAVE_DIR and not shared.DEBUG:
has_multiple_tests = len(tests) > 1
has_multiple_cores = parallel_testsuite.num_cores() > 1
if suite_supported and has_multiple_tests and has_multiple_cores:
return parallel_testsuite.ParallelTestSuite(len(tests))
return parallel_testsuite.ParallelTestSuite(len(tests), options)
return unittest.TestSuite()


Expand Down Expand Up @@ -394,6 +465,7 @@ def parse_args():
help='Command to launch web browser in which to run browser tests.')
parser.add_argument('tests', nargs='*')
parser.add_argument('--failfast', action='store_true')
parser.add_argument('--failing-and-slow-first', action='store_true', help='Run failing tests first, then sorted by slowest first. Combine with --failfast for fast fail-early CI runs.')
parser.add_argument('--start-at', metavar='NAME', help='Skip all tests up until <NAME>')
parser.add_argument('--continue', dest='_continue', action='store_true',
help='Resume from the last run test.'
Expand Down Expand Up @@ -488,7 +560,7 @@ def prepend_default(arg):
if os.path.exists(common.LAST_TEST):
options.start_at = utils.read_file(common.LAST_TEST).strip()

suites, unmatched_tests = load_test_suites(tests, modules, options.start_at, options.repeat)
suites, unmatched_tests = load_test_suites(tests, modules, options)
if unmatched_tests:
print('ERROR: could not find the following tests: ' + ' '.join(unmatched_tests))
return 1
Expand Down