diff --git a/sem/__init__.py b/sem/__init__.py index a46e88e..7d2be6a 100644 --- a/sem/__init__.py +++ b/sem/__init__.py @@ -4,11 +4,19 @@ from .lptrunner import LptRunner from .gridrunner import BUILD_GRID_PARAMS, SIMULATION_GRID_PARAMS from .database import DatabaseManager -from .utils import list_param_combinations, automatic_parser, stdout_automatic_parser, only_load_some_files +<<<<<<< HEAD +from .utils import list_param_combinations, automatic_parser, stdout_automatic_parser, only_load_some_files, get_command_from_result, CallbackBase +======= +from .utils import list_param_combinations, automatic_parser, stdout_automatic_parser, only_load_some_files, get_command_from_result +>>>>>>> 627d998c7c55cc1901c8d9fbf7f443da5fe4d02e from .cli import cli __all__ = ('CampaignManager', 'SimulationRunner', 'ParallelRunner', 'LptRunner', 'DatabaseManager', 'list_param_combinations', 'automatic_parser', - 'only_load_some_files') +<<<<<<< HEAD + 'only_load_some_files', 'get_command_from_result', 'CallbackBase') +======= + 'only_load_some_files', 'get_command_from_result') +>>>>>>> 627d998c7c55cc1901c8d9fbf7f443da5fe4d02e name = 'sem' diff --git a/sem/cli.py b/sem/cli.py index e289083..c082c0c 100644 --- a/sem/cli.py +++ b/sem/cli.py @@ -235,9 +235,11 @@ def command(results_dir, result_id): click.echo("Simulation command:") click.echo(sem.utils.get_command_from_result(campaign.db.get_script(), + campaign.dir, result)) click.echo("Debug command:") click.echo(sem.utils.get_command_from_result(campaign.db.get_script(), + campaign.dir, result, debug=True)) diff --git a/sem/manager.py b/sem/manager.py index 01e7e7a..e7136fc 100644 --- a/sem/manager.py +++ b/sem/manager.py @@ -290,7 +290,7 @@ def check_and_fill_parameters(self, param_list, needs_rngrun): # Simulation running # ###################### - def run_simulations(self, param_list, show_progress=True, stop_on_errors=True): + def run_simulations(self, param_list, show_progress=True, callbacks: list = [], stop_on_errors=True): """ Run several simulations specified by a list of parameter combinations. @@ -305,6 +305,10 @@ def run_simulations(self, param_list, show_progress=True, stop_on_errors=True): can be either a string or a number). show_progress (bool): whether or not to show a progress bar with percentage and expected remaining time. + callbacks (list): list of objects extending CallbackBase to be + triggered during the run. + stop_on_errors (bool): whether or not to stop the execution of the simulations + if an error occurs. """ # Make sure we have a runner to run simulations with. @@ -315,7 +319,7 @@ def run_simulations(self, param_list, show_progress=True, stop_on_errors=True): " for this CampaignManager.") # Return if the list is empty - if param_list == []: + if not param_list: return self.check_and_fill_parameters(param_list, needs_rngrun=True) @@ -339,12 +343,14 @@ def run_simulations(self, param_list, show_progress=True, stop_on_errors=True): # computation is performed on this line. results = self.runner.run_simulations(param_list, self.db.get_data_dir(), + callbacks=callbacks, stop_on_errors=stop_on_errors) # Wrap the result generator in the progress bar generator. if show_progress: result_generator = tqdm(results, total=len(param_list), unit='simulation', + smoothing=0, desc='Running simulations') else: result_generator = results @@ -377,8 +383,7 @@ def run_and_save_results(self, result_generator, batch_results=True): self.db.insert_results(results_batch) self.db.write_to_disk() - def get_missing_simulations(self, param_list, runs=None, - with_time_estimate=False): + def get_missing_simulations(self, param_list, runs=None, with_time_estimate=False): """ Return a list of the simulations among the required ones that are not available in the database. @@ -389,6 +394,7 @@ def get_missing_simulations(self, param_list, runs=None, runs (int): an integer representing how many repetitions are wanted for each parameter combination, None if the dictionaries in param_list already feature the desired RngRun value. + with_time_estimate (bool): a boolean representing ... """ params_to_simulate = [] @@ -451,6 +457,7 @@ def get_missing_simulations(self, param_list, runs=None, def run_missing_simulations(self, param_list, runs=None, condition_checking_function=None, + callbacks=[], stop_on_errors=True): """ Run the simulations from the parameter list that are not yet available @@ -470,6 +477,10 @@ def run_missing_simulations(self, param_list, runs=None, runs (int): the number of runs to perform for each parameter combination. This parameter is only allowed if the param_list specification doesn't feature an 'RngRun' key already. + callbacks (list): list of objects extending CallbackBase to be + triggered during the run. + stop_on_errors (bool): whether or not to stop the execution of the simulations + if an error occurs. """ # Expand the parameter specification param_list = list_param_combinations(param_list) @@ -503,10 +514,12 @@ def run_missing_simulations(self, param_list, runs=None, self.get_missing_simulations(param_list, runs, with_time_estimate=True), + callbacks=callbacks, stop_on_errors=stop_on_errors) else: self.run_simulations( self.get_missing_simulations(param_list, runs), + callbacks=callbacks, stop_on_errors=stop_on_errors) ##################### diff --git a/sem/parallelrunner.py b/sem/parallelrunner.py index 03fc48c..c90c79a 100644 --- a/sem/parallelrunner.py +++ b/sem/parallelrunner.py @@ -1,28 +1,46 @@ from .runner import SimulationRunner -from multiprocessing import Pool - +from .utils import CallbackBase +from multiprocessing.pool import ThreadPool as Pool +# We use ThreadPool to share the process memory among the different simulations to enable the use of callbacks. +# This may be improved eventually using a grain-fined solution that checks the presence or not of callbacks class ParallelRunner(SimulationRunner): """ A Runner which can perform simulations in parallel on the current machine. """ + data_folder: str = None + stop_on_errors: bool = False + callbacks: [CallbackBase] = [] - def run_simulations(self, parameter_list, data_folder, stop_on_errors=False): + def run_simulations(self, parameter_list, data_folder, callbacks: [CallbackBase] = None, stop_on_errors=False): """ This function runs multiple simulations in parallel. Args: parameter_list (list): list of parameter combinations to simulate. data_folder (str): folder in which to create output folders. + callbacks (list): list of callbacks to be triggered + stop_on_errors (bool): check whether simulation has to stop on errors or not """ + + if callbacks is not None: + for cb in callbacks: + cb.on_simulation_start(len(list(enumerate(parameter_list)))) + cb.controlled_by_parent = True + self.data_folder = data_folder self.stop_on_errors = stop_on_errors + self.callbacks = callbacks with Pool(processes=self.max_parallel_processes) as pool: for result in pool.imap_unordered(self.launch_simulation, parameter_list): yield result + if callbacks is not None: + for cb in callbacks: + cb.on_simulation_end() + def launch_simulation(self, parameter): """ Launch a single simulation, using SimulationRunner's facilities. @@ -35,4 +53,5 @@ def launch_simulation(self, parameter): """ return next(SimulationRunner.run_simulations(self, [parameter], self.data_folder, + callbacks=self.callbacks, stop_on_errors=self.stop_on_errors)) diff --git a/sem/runner.py b/sem/runner.py index 651e130..3cca5ae 100644 --- a/sem/runner.py +++ b/sem/runner.py @@ -8,6 +8,7 @@ import sys from importlib.machinery import SourceFileLoader import types +from .utils import CallbackBase from tqdm import tqdm @@ -72,7 +73,7 @@ def __init__(self, path, script, optimized=True, skip_configuration=False, build_status_fname = ".lock-ns3_%s_build" % sys.platform build_status_path = os.path.join(path, build_status_fname) else: - build_status_fname = "build-status.py" + build_status_fname = "build.py" if optimized: build_status_path = os.path.join(path, 'build/optimized/build-status.py') @@ -289,17 +290,25 @@ def get_available_parameters(self): # Simulation running # ###################### - def run_simulations(self, parameter_list, data_folder, stop_on_errors=False): + def run_simulations(self, parameter_list, data_folder, callbacks: [CallbackBase] = None, stop_on_errors=False): """ Run several simulations using a certain combination of parameters. - Yields results as simulations are completed. + Yield results as simulations are completed. Args: parameter_list (list): list of parameter combinations to simulate. data_folder (str): folder in which to save subfolders containing simulation output. + callbacks (list): list of callbacks to be triggered + stop_on_errors (bool): if true, when a simulation outputs an error the whole campaign will be stopped """ + + # Log simulation start if not already done by parent class + if callbacks is not None: + for cb in callbacks: + if not cb.is_controlled_by_parent(): + cb.on_simulation_start(len(list(enumerate(parameter_list)))) for _, parameter in enumerate(parameter_list): @@ -314,13 +323,19 @@ def run_simulations(self, parameter_list, data_folder, stop_on_errors=False): parameter.items()] # Run from dedicated temporary folder - current_result['meta']['id'] = str(uuid.uuid4()) + sim_uuid = str(uuid.uuid4()) + current_result['meta']['id'] = sim_uuid temp_dir = os.path.join(data_folder, current_result['meta']['id']) os.makedirs(temp_dir) start = time.time() # Time execution stdout_file_path = os.path.join(temp_dir, 'stdout') stderr_file_path = os.path.join(temp_dir, 'stderr') + + if callbacks is not None: + for cb in callbacks: + cb.on_run_start(parameter, sim_uuid) + with open(stdout_file_path, 'w') as stdout_file, open( stderr_file_path, 'w') as stderr_file: return_code = subprocess.call(command, cwd=temp_dir, @@ -329,11 +344,16 @@ def run_simulations(self, parameter_list, data_folder, stop_on_errors=False): stderr=stderr_file) end = time.time() # Time execution + if callbacks is not None: + for cb in callbacks: + cb.on_run_end(sim_uuid, return_code, end - start) + if return_code != 0: + with open(stdout_file_path, 'r') as stdout_file, open( stderr_file_path, 'r') as stderr_file: - complete_command = sem.utils.get_command_from_result(self.script, current_result) - complete_command_debug = sem.utils.get_command_from_result(self.script, current_result, debug=True) + complete_command = sem.utils.get_command_from_result(self.script, self.path, current_result) + complete_command_debug = sem.utils.get_command_from_result(self.script, self.path, current_result, debug=True) error_message = ('\nSimulation exited with an error.\n' 'Params: %s\n' 'Stderr: %s\n' @@ -349,9 +369,15 @@ def run_simulations(self, parameter_list, data_folder, stop_on_errors=False): complete_command_debug)) if stop_on_errors: raise Exception(error_message) - print(error_message) + print(error_message) current_result['meta']['elapsed_time'] = end-start current_result['meta']['exitcode'] = return_code yield current_result + + # Log simulation start if not already done by parent class + if callbacks is not None: + for cb in callbacks: + if not cb.is_controlled_by_parent(): + cb.on_simulation_end() diff --git a/sem/utils.py b/sem/utils.py index 233737e..ffbd40e 100644 --- a/sem/utils.py +++ b/sem/utils.py @@ -1,9 +1,12 @@ import io +import os import math import copy import warnings from itertools import product from functools import wraps +from abc import ABC, abstractmethod +from typing import Dict, Any import matplotlib.pyplot as plt import numpy as np @@ -99,21 +102,25 @@ def list_param_combinations(param_ranges): return [param_ranges_copy] -def get_command_from_result(script, result, debug=False): +def get_command_from_result(script, path, result, debug=False): """ Return the command that is needed to obtain a certain result. Args: params (dict): Dictionary containing parameter: value pairs. + path (str): The path to the ns-3 folder, used to discern which + build system (waf/CMake) is used debug (bool): Whether the command should include the debugging template. """ + command = "./ns3 run " if os.path.exists(os.path.join(path, "ns3")) else "python3 ./waf --run " + if not debug: - command = "python3 waf --run \"" + script + " " + " ".join( + command += "\"" + script + " " + " ".join( ['--%s=%s' % (param, value) for param, value in result['params'].items()]) + "\"" else: - command = "python3 waf --run " + script + " --command-template=\"" +\ + command += "\"" + script + " --command-template=\"" +\ "gdb --args %s " + " ".join(['--%s=%s' % (param, value) for param, value in result['params'].items()]) + "\"" @@ -305,6 +312,88 @@ def compute_sensitivity_analysis( return salib_analyze_function(problem, results) +class CallbackBase(ABC): + """ + Base class for SEM callbacks. + :param verbose: Verbosity level: 0 for no output, 1 for info messages, 2 for debug messages + """ + + def __init__(self, verbose: int = 0): + super().__init__() + # Number of time the callback was called + self.controlled_by_parent = False # type: bool + self.n_runs_over = 0 # type: int + self.n_runs_over_no_errors = 0 # type: int + self.n_runs_over_errors = 0 # type: int + self.n_runs_total = 0 # type: int + self.run_sim_times = [] + self.verbose = verbose + + def init_callback(self, controlled_by_parent) -> None: + """ + Initialize the callback. + """ + self.controlled_by_parent = controlled_by_parent + + def is_controlled_by_parent(self) -> bool: + """ + Whether this runner is aware of all simulations (false) + or it has been triggered by a multithread runner and is thus + aware of a subset of all runs only (true). + """ + return self.controlled_by_parent + + def on_simulation_start(self, n_runs_total) -> None: + self.n_runs_total = n_runs_total + self._on_simulation_start() + + @abstractmethod + def _on_simulation_start(self) -> None: + pass + + def on_run_start(self, configuration, sim_uuid) -> None: + """ + Args: + configuration (dict): dictionary representing the combination of parameters simulated in this specific + sim_uuid (str): unique identifier string for the simulation. This value is used to name the result folder, + and it is referenced in the result JSON file. + """ + self._on_run_start(configuration, sim_uuid) + + @abstractmethod + def _on_run_start(self, configuration: dict, sim_uuid: str) -> None: + pass + + @abstractmethod + def _on_run_end(self, sim_uuid: str, return_code: int, sim_time: int) -> bool: + """ + :return: If the callback returns False, training is aborted early. + # TODO maybe it does not make a lot of sense since this will be eventually overridden by the callback user + """ + return True + + def on_run_end(self, sim_uuid: str, return_code: int, sim_time: int) -> bool: + """ + This method will be called when each simulation run finishes + # TODO maybe it does not make a lot of sense since this will be eventually overridden by the callback user + :return: If the callback returns False, a run has failed. + """ + self.n_runs_over += 1 + self.run_sim_times.append(sim_time) + if return_code == 0: + self.n_runs_over_no_errors += 1 # type: int + else: + self.n_runs_over_errors += 1 # type: int + + return self._on_run_end(sim_uuid, return_code, sim_time) + + def on_simulation_end(self) -> None: + self._on_simulation_end() + + @abstractmethod + def _on_simulation_end(self) -> None: + pass + # def interactive_plot(campaign, param_ranges, result_parsing_function, x_axis, # runs=None): # # Average over RngRuns if param_ranges does not contain RngRun diff --git a/tests/conftest.py b/tests/conftest.py index e48f381..fd233fb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -42,7 +42,7 @@ def ns_3_compiled(tmpdir): # Relocate build by running the same command in the new directory if subprocess.call([build_program, 'configure', '--disable-gtk', - '--build-profile=optimized', '--enable-modules=core', + '--build-profile=optimized', '--enable-modules=core', '--enable-examples', '--out=build/optimized'], cwd=ns_3_tempdir, stdout=subprocess.DEVNULL, @@ -70,7 +70,7 @@ def ns_3_compiled_debug(tmpdir): # Relocate build by running the same command in the new directory if subprocess.call([build_program, 'configure', '--disable-gtk', - '--build-profile=debug', '--enable-modules=core', + '--build-profile=debug', '--enable-modules=core', '--enable-examples', '--out=build'], cwd=ns_3_tempdir, stdout=subprocess.DEVNULL, @@ -85,6 +85,26 @@ def ns_3_compiled_debug(tmpdir): return ns_3_tempdir +@pytest.fixture(scope='function') +def ns_3_compiled_examples(): + # Configure and build WAF-based ns-3 + build_program = get_build_program(ns_3_examples) + + if subprocess.call([build_program, 'configure', '--disable-gtk', + '--build-profile=optimized', '--enable-modules=core', + '--out=build/optimized'], + cwd=ns_3_examples, + stdout=subprocess.DEVNULL, + stderr=subprocess.STDOUT) != 0: + raise Exception("Examples configuration failed.") + + if subprocess.call([build_program, 'build'], + cwd=ns_3_examples, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) > 0: + raise Exception("Examples build failed.") + + return ns_3_examples @pytest.fixture(scope='function') def config(tmpdir, ns_3_compiled): diff --git a/tests/test_runner.py b/tests/test_runner.py index 9c9e2db..18d3672 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -6,11 +6,21 @@ # Runner creation # ################### +""" +First param: Runner type +Second param: Whether to use optimized build +Third param: Whether to use the CMake-version of ns-3 +""" +@pytest.fixture(scope='function', params=[['ParallelRunner', True, True], + ['ParallelRunner', True, False]]) +def runner(ns_3_compiled, ns_3_compiled_debug, ns_3_compiled_examples, config, request): + ns_3_folder = ns_3_compiled + if request.param[1] is False: + ns_3_folder = ns_3_compiled_debug + if request.param[2] is True: + assert(request.param[1] is True) + ns_3_folder = ns_3_compiled_examples -@pytest.fixture(scope='function', params=[['ParallelRunner', True], - ['ParallelRunner', False]]) -def runner(ns_3_compiled, ns_3_compiled_debug, config, request): - ns_3_folder = ns_3_compiled if request.param[1] is True else ns_3_compiled_debug if request.param[0] == 'SimulationRunner': return SimulationRunner(ns_3_folder, config['script'], optimized=request.param[1]) @@ -18,7 +28,6 @@ def runner(ns_3_compiled, ns_3_compiled_debug, config, request): return ParallelRunner(ns_3_folder, config['script'], optimized=request.param[1]) - def test_get_available_parameters(runner, config): # Try getting the available parameters of the script assert runner.get_available_parameters() == config['params'] @@ -26,13 +35,15 @@ def test_get_available_parameters(runner, config): @pytest.mark.parametrize('runner', [ - ['SimulationRunner', True], - ['ParallelRunner', True], + ['SimulationRunner', True, True], + ['ParallelRunner', True, True], + ['ParallelRunner', True, False], ], indirect=True) def test_run_simulations(runner, config, parameter_combination): - # Make sure that simulations run without any issue + # Make sure that simulations run without any issue, + # with CMake optimized and debug builds, and Waf optimized builds data_dir = os.path.join(config['campaign_dir'], 'data') list(runner.run_simulations([parameter_combination], data_dir)) diff --git a/tests/test_utils.py b/tests/test_utils.py index add9ba6..bb596d5 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,9 +1,24 @@ -from sem import list_param_combinations, automatic_parser, stdout_automatic_parser +from sem import list_param_combinations, automatic_parser, stdout_automatic_parser, get_command_from_result, CampaignManager import json import numpy as np +import pytest from operator import getitem +@pytest.fixture(scope='function', params=[['compiled', False]]) +def ns_3_compiled_folder_and_command(ns_3_compiled, ns_3_compiled_examples, request): + if request.param[0] == 'compiled': + if request.param[1] is False: + return [ns_3_compiled, False, './ns3 run \"hash-example --dict=/usr/share/dict/american-english --time=False --RngRun=0\"'] + # elif request.param[1] is True: + # return [ns_3_compiled, True, './ns3 run \"hash-example --dict=/usr/share/dict/american-english --time=False --RngRun=0\"'] + elif request.param[0] == 'compiled_examples': + if request.param[1] is False: + return [ns_3_compiled_examples, False, 'python3 ./waf --run \"hash-example --dict=/usr/share/dict/american-english --time=False --RngRun=0\"'] + # elif request.param[1] is True: + # return [ns_3_compiled_examples, True, 'python3 ./waf --run \"hash-example --dict=/usr/share/dict/american-english --time=False --RngRun=0\"'] + + def test_list_param_combinations(): # Two possible combinations d1 = {'a': [1, 2]} @@ -101,3 +116,30 @@ def test_automatic_parser(result): [6, 7, 8, 9, 10]]) assert parsed['stderr'] == [] + +@pytest.mark.parametrize('ns_3_compiled_folder_and_command', + [ + ['compiled', False], + ['compiled_examples', False] + ], + indirect=True) +def test_get_cmnd_from_result(ns_3_compiled_folder_and_command, config, parameter_combination): + + # Create an ns-3 campaign to run simulations and obtain a result + ns_3_folder = ns_3_compiled_folder_and_command[0] + hardcoded_command = ns_3_compiled_folder_and_command[2] + + cmpgn = CampaignManager.new(ns_3_folder, + config['script'], + config['campaign_dir'], + overwrite=True, + skip_configuration=True) + + cmpgn.run_simulations([parameter_combination], show_progress=False) + + # Retrieve the results, and compare the output of get_command_from_result + # with the expected command + result = cmpgn.db.get_complete_results()[0] + cmnd = get_command_from_result( + config['script'], ns_3_folder, result) + assert (hardcoded_command == cmnd)