diff --git a/run_all_experiments.py b/run_all_experiments.py index 857c4303a..9496509a4 100755 --- a/run_all_experiments.py +++ b/run_all_experiments.py @@ -23,7 +23,7 @@ import time import traceback from datetime import timedelta -from multiprocessing import Pool +from multiprocessing import Pool, Process from typing import Any import run_one_experiment @@ -60,8 +60,6 @@ LOG_FMT = ('%(asctime)s.%(msecs)03d %(levelname)s ' '%(module)s - %(funcName)s: %(message)s') -EXPERIMENT_RESULTS = [] - class Result: benchmark: benchmarklib.Benchmark @@ -335,15 +333,24 @@ def extend_report_with_coverage_gains() -> None: comparative_cov_gains) -def _print_and_dump_experiment_result(result: Result): +def extend_report_with_coverage_gains_process(): + """A process that continuously runs to update coverage gains in the + background.""" + while True: + time.sleep(300) # 5 minutes. + try: + extend_report_with_coverage_gains() + except Exception: + logger.error('Failed to extend report with coverage gains') + traceback.print_exc() + + +def _print_experiment_result(result: Result): """Prints the |result| of a single experiment.""" logger.info('\n**** Finished benchmark %s, %s ****\n%s', result.benchmark.project, result.benchmark.function_signature, result.result) - EXPERIMENT_RESULTS.append(result) - extend_report_with_coverage_gains() - def _print_experiment_results(results: list[Result], cov_gain: dict[str, dict[str, Any]]): @@ -503,7 +510,7 @@ def _process_total_coverage_gain() -> dict[str, dict[str, Any]]: def main(): - global WORK_DIR, EXPERIMENT_RESULTS + global WORK_DIR args = parse_args() _setup_logging(args.log_level) @@ -528,27 +535,38 @@ def main(): len(experiment_targets), str(NUM_EXP)) # Set global variables that are updated throughout experiment runs. - EXPERIMENT_RESULTS = [] WORK_DIR = args.work_dir + + coverage_gains_process = Process( + target=extend_report_with_coverage_gains_process) + coverage_gains_process.start() + + experiment_results = [] if NUM_EXP == 1: for target_benchmark in experiment_targets: result = run_experiments(target_benchmark, args) - _print_and_dump_experiment_result(result) + _print_experiment_result(result) + experiment_results.append(result) else: experiment_tasks = [] with Pool(NUM_EXP, maxtasksperchild=1) as p: for target_benchmark in experiment_targets: - experiment_task = p.apply_async( - run_experiments, (target_benchmark, args), - callback=_print_and_dump_experiment_result) + experiment_task = p.apply_async(run_experiments, + (target_benchmark, args), + callback=_print_experiment_result) experiment_tasks.append(experiment_task) time.sleep(args.delay) + + experiment_results = [task.get() for task in experiment_tasks] + # Signal that no more work will be submitte to the pool. p.close() # Wait for all workers to complete. p.join() + # Do a final coverage aggregation. + coverage_gains_process.kill() extend_report_with_coverage_gains() # Capture time at end @@ -559,7 +577,7 @@ def main(): str(timedelta(seconds=end - start))) coverage_gain_dict = _process_total_coverage_gain() - _print_experiment_results(EXPERIMENT_RESULTS, coverage_gain_dict) + _print_experiment_results(experiment_results, coverage_gain_dict) if __name__ == '__main__':