From be6a32acd7dd0f40d91329c62b8158317e884e38 Mon Sep 17 00:00:00 2001 From: Oliver Chang Date: Fri, 8 Nov 2024 17:10:57 +1100 Subject: [PATCH] Don't call extend_report_with_coverage_gains in apply_async callback. (#709) Per https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.apply_async, callbacks should return immediately or they will otherwise block the entire Pool from making progress. For large experiments, this is likely causing problems causing our throughput to slow to a crawl as the experiment runs, as every single benchmark experiment finishing requires this expensive calculation. From debugging with GDB on https://github.com/google/oss-fuzz-gen/pull/692, it looks like a large number of worker processes are stuck waiting to report results: ``` (gdb) py-bt Traceback (most recent call first): File "/usr/lib/python3.11/multiprocessing/synchronize.py", line 95, in __enter__ return self._semlock.__enter__() File "/usr/lib/python3.11/multiprocessing/queues.py", line 376, in put with self._wlock: File "/usr/lib/python3.11/multiprocessing/pool.py", line 131, in worker put((job, i, result)) File "/usr/lib/python3.11/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap self.run() ``` This partially reverts https://github.com/google/oss-fuzz-gen/pull/566. We instead just create a new sub-process to periodically call this in the background to avoid blocking anything. --- run_all_experiments.py | 46 +++++++++++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/run_all_experiments.py b/run_all_experiments.py index 857c4303a..9496509a4 100755 --- a/run_all_experiments.py +++ b/run_all_experiments.py @@ -23,7 +23,7 @@ import time import traceback from datetime import timedelta -from multiprocessing import Pool +from multiprocessing import Pool, Process from typing import Any import run_one_experiment @@ -60,8 +60,6 @@ LOG_FMT = ('%(asctime)s.%(msecs)03d %(levelname)s ' '%(module)s - %(funcName)s: %(message)s') -EXPERIMENT_RESULTS = [] - class Result: benchmark: benchmarklib.Benchmark @@ -335,15 +333,24 @@ def extend_report_with_coverage_gains() -> None: comparative_cov_gains) -def _print_and_dump_experiment_result(result: Result): +def extend_report_with_coverage_gains_process(): + """A process that continuously runs to update coverage gains in the + background.""" + while True: + time.sleep(300) # 5 minutes. + try: + extend_report_with_coverage_gains() + except Exception: + logger.error('Failed to extend report with coverage gains') + traceback.print_exc() + + +def _print_experiment_result(result: Result): """Prints the |result| of a single experiment.""" logger.info('\n**** Finished benchmark %s, %s ****\n%s', result.benchmark.project, result.benchmark.function_signature, result.result) - EXPERIMENT_RESULTS.append(result) - extend_report_with_coverage_gains() - def _print_experiment_results(results: list[Result], cov_gain: dict[str, dict[str, Any]]): @@ -503,7 +510,7 @@ def _process_total_coverage_gain() -> dict[str, dict[str, Any]]: def main(): - global WORK_DIR, EXPERIMENT_RESULTS + global WORK_DIR args = parse_args() _setup_logging(args.log_level) @@ -528,27 +535,38 @@ def main(): len(experiment_targets), str(NUM_EXP)) # Set global variables that are updated throughout experiment runs. - EXPERIMENT_RESULTS = [] WORK_DIR = args.work_dir + + coverage_gains_process = Process( + target=extend_report_with_coverage_gains_process) + coverage_gains_process.start() + + experiment_results = [] if NUM_EXP == 1: for target_benchmark in experiment_targets: result = run_experiments(target_benchmark, args) - _print_and_dump_experiment_result(result) + _print_experiment_result(result) + experiment_results.append(result) else: experiment_tasks = [] with Pool(NUM_EXP, maxtasksperchild=1) as p: for target_benchmark in experiment_targets: - experiment_task = p.apply_async( - run_experiments, (target_benchmark, args), - callback=_print_and_dump_experiment_result) + experiment_task = p.apply_async(run_experiments, + (target_benchmark, args), + callback=_print_experiment_result) experiment_tasks.append(experiment_task) time.sleep(args.delay) + + experiment_results = [task.get() for task in experiment_tasks] + # Signal that no more work will be submitte to the pool. p.close() # Wait for all workers to complete. p.join() + # Do a final coverage aggregation. + coverage_gains_process.kill() extend_report_with_coverage_gains() # Capture time at end @@ -559,7 +577,7 @@ def main(): str(timedelta(seconds=end - start))) coverage_gain_dict = _process_total_coverage_gain() - _print_experiment_results(EXPERIMENT_RESULTS, coverage_gain_dict) + _print_experiment_results(experiment_results, coverage_gain_dict) if __name__ == '__main__':