diff --git a/benchcab/bench_config.py b/benchcab/bench_config.py index d902ee2a..509f551e 100644 --- a/benchcab/bench_config.py +++ b/benchcab/bench_config.py @@ -43,6 +43,42 @@ def check_config(config: dict): "that is compatible with the f90nml python package." ) + # the "fluxsite" key is optional + if "fluxsite" in config: + if not isinstance(config["fluxsite"], dict): + raise TypeError("The 'fluxsite' key must be a dictionary.") + # the "pbs" key is optional + if "pbs" in config["fluxsite"]: + if not isinstance(config["fluxsite"]["pbs"], dict): + raise TypeError("The 'pbs' key must be a dictionary.") + # the "ncpus" key is optional + if "ncpus" in config["fluxsite"]["pbs"] and not isinstance( + config["fluxsite"]["pbs"]["ncpus"], int + ): + raise TypeError("The 'ncpus' key must be an integer.") + # the "mem" key is optional + if "mem" in config["fluxsite"]["pbs"] and not isinstance( + config["fluxsite"]["pbs"]["mem"], str + ): + raise TypeError("The 'mem' key must be a string.") + # the "walltime" key is optional + if "walltime" in config["fluxsite"]["pbs"] and not isinstance( + config["fluxsite"]["pbs"]["walltime"], str + ): + raise TypeError("The 'walltime' key must be a string.") + # the "storage" key is optional + if "storage" in config["fluxsite"]["pbs"]: + if not isinstance(config["fluxsite"]["pbs"]["storage"], list) or any( + not isinstance(val, str) + for val in config["fluxsite"]["pbs"]["storage"] + ): + raise TypeError("The 'storage' key must be a list of strings.") + # the "multiprocessing" key is optional + if "multiprocessing" in config["fluxsite"] and not isinstance( + config["fluxsite"]["multiprocessing"], bool + ): + raise TypeError("The 'multiprocessing' key must be a boolean.") + valid_experiments = ( list(internal.MEORG_EXPERIMENTS) + internal.MEORG_EXPERIMENTS["five-site-test"] ) diff --git a/benchcab/benchcab.py b/benchcab/benchcab.py index 6b04fc11..b6850f66 100644 --- a/benchcab/benchcab.py +++ b/benchcab/benchcab.py @@ -132,10 +132,10 @@ def fluxsite_submit_job(self) -> None: project=self.config["project"], config_path=self.args.config, modules=self.config["modules"], - storage_flags=[], # TODO(Sean) add storage flags option to config verbose=self.args.verbose, skip_bitwise_cmp="fluxsite-bitwise-cmp" in self.args.skip, benchcab_path=str(self.benchcab_exe_path), + pbs_config=self.config.get("pbs"), ) file.write(contents) @@ -211,8 +211,15 @@ def fluxsite_run_tasks(self): """Endpoint for `benchcab fluxsite-run-tasks`.""" tasks = self.tasks if self.tasks else self._initialise_tasks() print("Running fluxsite tasks...") - if internal.MULTIPROCESS: - run_tasks_in_parallel(tasks, verbose=self.args.verbose) + try: + multiprocess = self.config["fluxsite"]["multiprocess"] + except KeyError: + multiprocess = internal.FLUXSITE_DEFAULT_MULTIPROCESS + if multiprocess: + ncpus = self.config.get("pbs", {}).get( + "ncpus", internal.FLUXSITE_DEFAULT_PBS["ncpus"] + ) + run_tasks_in_parallel(tasks, n_processes=ncpus, verbose=self.args.verbose) else: run_tasks(tasks, verbose=self.args.verbose) print("Successfully ran fluxsite tasks") @@ -230,8 +237,18 @@ def fluxsite_bitwise_cmp(self): comparisons = get_fluxsite_comparisons(tasks) print("Running comparison tasks...") - if internal.MULTIPROCESS: - run_comparisons_in_parallel(comparisons, verbose=self.args.verbose) + try: + multiprocess = self.config["fluxsite"]["multiprocess"] + except KeyError: + multiprocess = internal.FLUXSITE_DEFAULT_MULTIPROCESS + if multiprocess: + try: + ncpus = self.config["fluxsite"]["pbs"]["ncpus"] + except KeyError: + ncpus = internal.FLUXSITE_DEFAULT_PBS["ncpus"] + run_comparisons_in_parallel( + comparisons, n_processes=ncpus, verbose=self.args.verbose + ) else: run_comparisons(comparisons, verbose=self.args.verbose) print("Successfully ran comparison tasks") diff --git a/benchcab/comparison.py b/benchcab/comparison.py index 4a2b08f8..627bbe3f 100644 --- a/benchcab/comparison.py +++ b/benchcab/comparison.py @@ -59,7 +59,9 @@ def run_comparisons(comparison_tasks: list[ComparisonTask], verbose=False) -> No def run_comparisons_in_parallel( - comparison_tasks: list[ComparisonTask], verbose=False + comparison_tasks: list[ComparisonTask], + n_processes=internal.FLUXSITE_DEFAULT_PBS["ncpus"], + verbose=False, ) -> None: """Runs bitwise comparison tasks in parallel across multiple processes.""" @@ -68,7 +70,7 @@ def run_comparisons_in_parallel( task_queue.put(task) processes = [] - for _ in range(internal.NCPUS): + for _ in range(n_processes): proc = multiprocessing.Process( target=worker_comparison, args=[task_queue, verbose] ) diff --git a/benchcab/internal.py b/benchcab/internal.py index f24e061d..daf0de37 100644 --- a/benchcab/internal.py +++ b/benchcab/internal.py @@ -2,6 +2,7 @@ import os from pathlib import Path +from typing import Any _, NODENAME, _, _, _ = os.uname() @@ -10,11 +11,14 @@ # Parameters for job script: QSUB_FNAME = "benchmark_cable_qsub.sh" -NCPUS = 18 -MEM = "30GB" -WALL_TIME = "6:00:00" +FLUXSITE_DEFAULT_PBS: Any = { + "ncpus": 18, + "mem": "30GB", + "walltime": "6:00:00", + "storage": [], +} MPI = False -MULTIPROCESS = True +FLUXSITE_DEFAULT_MULTIPROCESS = True # DIRECTORY PATHS/STRUCTURE: diff --git a/benchcab/task.py b/benchcab/task.py index b91d3731..30395d50 100644 --- a/benchcab/task.py +++ b/benchcab/task.py @@ -334,7 +334,9 @@ def run_tasks(tasks: list[Task], verbose=False): task.run(verbose=verbose) -def run_tasks_in_parallel(tasks: list[Task], verbose=False): +def run_tasks_in_parallel( + tasks: list[Task], n_processes=internal.FLUXSITE_DEFAULT_PBS["ncpus"], verbose=False +): """Runs tasks in `tasks` in parallel across multiple processes.""" task_queue: multiprocessing.Queue = multiprocessing.Queue() @@ -342,7 +344,7 @@ def run_tasks_in_parallel(tasks: list[Task], verbose=False): task_queue.put(task) processes = [] - for _ in range(internal.NCPUS): + for _ in range(n_processes): proc = multiprocessing.Process(target=worker_run, args=[task_queue, verbose]) proc.start() processes.append(proc) diff --git a/benchcab/utils/pbs.py b/benchcab/utils/pbs.py index 607efe78..c02abb25 100644 --- a/benchcab/utils/pbs.py +++ b/benchcab/utils/pbs.py @@ -1,5 +1,7 @@ """Contains helper functions for manipulating PBS job scripts.""" +from typing import Optional + from benchcab import internal @@ -7,10 +9,10 @@ def render_job_script( project: str, config_path: str, modules: list, - storage_flags: list, benchcab_path: str, verbose=False, skip_bitwise_cmp=False, + pbs_config: Optional[dict] = None, ) -> str: """Returns the text for a PBS job script that executes all computationally expensive commands. @@ -18,16 +20,27 @@ def render_job_script( between model output files. """ + if pbs_config is None: + pbs_config = internal.FLUXSITE_DEFAULT_PBS + module_load_lines = "\n".join( f"module load {module_name}" for module_name in modules ) verbose_flag = "-v" if verbose else "" - storage_flags = ["gdata/ks32", "gdata/hh5", f"gdata/{project}", *storage_flags] + ncpus = pbs_config.get("ncpus", internal.FLUXSITE_DEFAULT_PBS["ncpus"]) + mem = pbs_config.get("mem", internal.FLUXSITE_DEFAULT_PBS["mem"]) + walltime = pbs_config.get("walltime", internal.FLUXSITE_DEFAULT_PBS["walltime"]) + storage_flags = [ + "gdata/ks32", + "gdata/hh5", + f"gdata/{project}", + *pbs_config.get("storage", internal.FLUXSITE_DEFAULT_PBS["storage"]), + ] return f"""#!/bin/bash #PBS -l wd -#PBS -l ncpus={internal.NCPUS} -#PBS -l mem={internal.MEM} -#PBS -l walltime={internal.WALL_TIME} +#PBS -l ncpus={ncpus} +#PBS -l mem={mem} +#PBS -l walltime={walltime} #PBS -q normal #PBS -P {project} #PBS -j oe diff --git a/docs/user_guide/config_options.md b/docs/user_guide/config_options.md index 0bfaeb6f..3e2f4ec8 100644 --- a/docs/user_guide/config_options.md +++ b/docs/user_guide/config_options.md @@ -43,6 +43,51 @@ The different running modes of `benchcab` are solely dependent on the options us : NCI modules to use for compiling CABLE +### `fluxsite` +: Contains settings specific to fluxsite tests. +: This key is **optional**. Default settings for the fluxsite tests will be used if it is not present + +#### `pbs` + +: Contains settings specific to the PBS scheduler at NCI for the PBS script running the CABLE simulations at FLUXNET sites and the bitwise comparison for these simulations. +: This key is **optional**. Default values for the PBS settings will apply if it is not specified. + +##### `ncpus` + +: The number of CPU cores to allocate for the PBS job, i.e. the `-l ncpus=<4>` PBS flag (see [PBS Directives Explained][nci-pbs-directives]). +: This key is **optional** and can be omitted from the config file. By default `ncpus` is set to `18`. + +##### `mem` + +: The total memory limit for the PBS job, i.e. the `-l mem=<10GB>` PBS flag (see [PBS Directives Explained][nci-pbs-directives]). +: This key is **optional** and can be omitted from the config file. By default `mem` is set to `30GB`. + +##### `walltime` + +: The wall clock time limit for the PBS job, i.e. `-l walltime=` PBS flag (see [PBS Directives Explained][nci-pbs-directives]). +: This key is **optional** and can be omitted from the config file. By default `walltime` is set to `6:00:00`. + +##### `storage` + +: A list of extra storage flags required for the PBS job, i.e. `-l storage=` (see [PBS Directives Explained][nci-pbs-directives]). +: This key is **optional** and can be omitted from the config file. By default `storage` is set to `[]`. + +#### `multiprocess` + +: Enables or disables multiprocessing for executing embarrassingly parallel tasks. +: This key is **optional** and can be omitted from the config file. By default `multiprocess` is set to `True`. + +Example: +```yaml +fluxsite: + pbs: + ncpus: 16 + mem: 64GB + walltime: 00:01:00 + storage: [scratch/a00, gdata/xy11] + multiprocess: True +``` + ## Simulations options ### `realisations` @@ -138,4 +183,5 @@ science_configurations: [ [forty-two-me]: https://modelevaluation.org/experiment/display/urTKSXEsojdvEPwdR [five-me]: https://modelevaluation.org/experiment/display/xNZx2hSvn4PMKAa9R [f90nml-github]: https://github.com/marshallward/f90nml -[environment-modules]: https://modules.sourceforge.net/ \ No newline at end of file +[environment-modules]: https://modules.sourceforge.net/ +[nci-pbs-directives]: https://opus.nci.org.au/display/Help/PBS+Directives+Explained \ No newline at end of file diff --git a/tests/common.py b/tests/common.py index 73bc5a61..a135249b 100644 --- a/tests/common.py +++ b/tests/common.py @@ -55,6 +55,15 @@ def get_mock_config() -> dict: } }, ], + "fluxsite": { + "pbs": { + "ncpus": 16, + "mem": "64G", + "walltime": "01:00:00", + "storage": ["gdata/foo123"], + }, + "multiprocessing": True, + }, } return config diff --git a/tests/test_bench_config.py b/tests/test_bench_config.py index 505e7103..02a7adf9 100644 --- a/tests/test_bench_config.py +++ b/tests/test_bench_config.py @@ -55,6 +55,41 @@ def test_check_config(): config.pop("science_configurations") check_config(config) + # Success case: test config without fluxsite key is valid + config = get_mock_config() + config.pop("fluxsite") + check_config(config) + + # Success case: test config without multiprocessing key is valid + config = get_mock_config() + config["fluxsite"].pop("multiprocessing") + check_config(config) + + # Success case: test config without pbs key is valid + config = get_mock_config() + config["fluxsite"].pop("pbs") + check_config(config) + + # Success case: test config without ncpus key is valid + config = get_mock_config() + config["fluxsite"]["pbs"].pop("ncpus") + check_config(config) + + # Success case: test config without mem key is valid + config = get_mock_config() + config["fluxsite"]["pbs"].pop("mem") + check_config(config) + + # Success case: test config without walltime key is valid + config = get_mock_config() + config["fluxsite"]["pbs"].pop("walltime") + check_config(config) + + # Success case: test config without storage key is valid + config = get_mock_config() + config["fluxsite"]["pbs"].pop("storage") + check_config(config) + # Failure case: test missing required keys raises an exception with pytest.raises( ValueError, @@ -207,6 +242,54 @@ def test_check_config(): config["science_configurations"] = [r"cable_user%GS_SWITCH = 'medlyn'"] check_config(config) + # Failure case: type of config["fluxsite"] is not a dict + with pytest.raises(TypeError, match="The 'fluxsite' key must be a dictionary."): + config = get_mock_config() + config["fluxsite"] = ["ncpus: 16\nmem: 64GB\n"] + check_config(config) + + # Failure case: type of config["pbs"] is not a dict + with pytest.raises(TypeError, match="The 'pbs' key must be a dictionary."): + config = get_mock_config() + config["fluxsite"]["pbs"] = "-l ncpus=16" + check_config(config) + + # Failure case: type of config["pbs"]["ncpus"] is not an int + with pytest.raises(TypeError, match="The 'ncpus' key must be an integer."): + config = get_mock_config() + config["fluxsite"]["pbs"]["ncpus"] = "16" + check_config(config) + + # Failure case: type of config["pbs"]["mem"] is not a string + with pytest.raises(TypeError, match="The 'mem' key must be a string."): + config = get_mock_config() + config["fluxsite"]["pbs"]["mem"] = 64 + check_config(config) + + # Failure case: type of config["pbs"]["walltime"] is not a string + with pytest.raises(TypeError, match="The 'walltime' key must be a string."): + config = get_mock_config() + config["fluxsite"]["pbs"]["walltime"] = 60 + check_config(config) + + # Failure case: type of config["pbs"]["storage"] is not a list + with pytest.raises(TypeError, match="The 'storage' key must be a list of strings."): + config = get_mock_config() + config["fluxsite"]["pbs"]["storage"] = "gdata/foo+gdata/bar" + check_config(config) + + # Failure case: type of config["pbs"]["storage"] is not a list of strings + with pytest.raises(TypeError, match="The 'storage' key must be a list of strings."): + config = get_mock_config() + config["fluxsite"]["pbs"]["storage"] = [1, 2, 3] + check_config(config) + + # Failure case: type of config["multiprocessing"] is not a bool + with pytest.raises(TypeError, match="The 'multiprocessing' key must be a boolean."): + config = get_mock_config() + config["fluxsite"]["multiprocessing"] = 1 + check_config(config) + def test_read_config(): """Tests for `read_config()`.""" diff --git a/tests/test_pbs.py b/tests/test_pbs.py index c3561d0f..cb678b17 100644 --- a/tests/test_pbs.py +++ b/tests/test_pbs.py @@ -12,19 +12,18 @@ def test_render_job_script(): project="tm70", config_path="/path/to/config.yaml", modules=["foo", "bar", "baz"], - storage_flags=["scratch/tm70"], benchcab_path="/absolute/path/to/benchcab", ) == ( f"""#!/bin/bash #PBS -l wd -#PBS -l ncpus={internal.NCPUS} -#PBS -l mem={internal.MEM} -#PBS -l walltime={internal.WALL_TIME} +#PBS -l ncpus={internal.FLUXSITE_DEFAULT_PBS["ncpus"]} +#PBS -l mem={internal.FLUXSITE_DEFAULT_PBS["mem"]} +#PBS -l walltime={internal.FLUXSITE_DEFAULT_PBS["walltime"]} #PBS -q normal #PBS -P tm70 #PBS -j oe #PBS -m e -#PBS -l storage=gdata/ks32+gdata/hh5+gdata/tm70+scratch/tm70 +#PBS -l storage=gdata/ks32+gdata/hh5+gdata/tm70 module purge module load foo @@ -50,20 +49,19 @@ def test_render_job_script(): project="tm70", config_path="/path/to/config.yaml", modules=["foo", "bar", "baz"], - storage_flags=["scratch/tm70"], verbose=True, benchcab_path="/absolute/path/to/benchcab", ) == ( f"""#!/bin/bash #PBS -l wd -#PBS -l ncpus={internal.NCPUS} -#PBS -l mem={internal.MEM} -#PBS -l walltime={internal.WALL_TIME} +#PBS -l ncpus={internal.FLUXSITE_DEFAULT_PBS["ncpus"]} +#PBS -l mem={internal.FLUXSITE_DEFAULT_PBS["mem"]} +#PBS -l walltime={internal.FLUXSITE_DEFAULT_PBS["walltime"]} #PBS -q normal #PBS -P tm70 #PBS -j oe #PBS -m e -#PBS -l storage=gdata/ks32+gdata/hh5+gdata/tm70+scratch/tm70 +#PBS -l storage=gdata/ks32+gdata/hh5+gdata/tm70 module purge module load foo @@ -89,20 +87,58 @@ def test_render_job_script(): project="tm70", config_path="/path/to/config.yaml", modules=["foo", "bar", "baz"], - storage_flags=["scratch/tm70"], skip_bitwise_cmp=True, benchcab_path="/absolute/path/to/benchcab", ) == ( f"""#!/bin/bash #PBS -l wd -#PBS -l ncpus={internal.NCPUS} -#PBS -l mem={internal.MEM} -#PBS -l walltime={internal.WALL_TIME} +#PBS -l ncpus={internal.FLUXSITE_DEFAULT_PBS["ncpus"]} +#PBS -l mem={internal.FLUXSITE_DEFAULT_PBS["mem"]} +#PBS -l walltime={internal.FLUXSITE_DEFAULT_PBS["walltime"]} +#PBS -q normal +#PBS -P tm70 +#PBS -j oe +#PBS -m e +#PBS -l storage=gdata/ks32+gdata/hh5+gdata/tm70 + +module purge +module load foo +module load bar +module load baz + +/absolute/path/to/benchcab fluxsite-run-tasks --config=/path/to/config.yaml +if [ $? -ne 0 ]; then + echo 'Error: benchcab fluxsite-run-tasks failed. Exiting...' + exit 1 +fi + +""" + ) + + # Success case: specify parameters in pbs_config + assert render_job_script( + project="tm70", + config_path="/path/to/config.yaml", + modules=["foo", "bar", "baz"], + skip_bitwise_cmp=True, + benchcab_path="/absolute/path/to/benchcab", + pbs_config={ + "ncpus": 4, + "mem": "16GB", + "walltime": "00:00:30", + "storage": ["gdata/foo"], + }, + ) == ( + """#!/bin/bash +#PBS -l wd +#PBS -l ncpus=4 +#PBS -l mem=16GB +#PBS -l walltime=00:00:30 #PBS -q normal #PBS -P tm70 #PBS -j oe #PBS -m e -#PBS -l storage=gdata/ks32+gdata/hh5+gdata/tm70+scratch/tm70 +#PBS -l storage=gdata/ks32+gdata/hh5+gdata/tm70+gdata/foo module purge module load foo @@ -118,20 +154,20 @@ def test_render_job_script(): """ ) - # Success case: test with storage_flags set to [] + # Success case: if the pbs_config is empty, use the default values assert render_job_script( project="tm70", config_path="/path/to/config.yaml", modules=["foo", "bar", "baz"], - storage_flags=[], skip_bitwise_cmp=True, benchcab_path="/absolute/path/to/benchcab", + pbs_config={}, ) == ( f"""#!/bin/bash #PBS -l wd -#PBS -l ncpus={internal.NCPUS} -#PBS -l mem={internal.MEM} -#PBS -l walltime={internal.WALL_TIME} +#PBS -l ncpus={internal.FLUXSITE_DEFAULT_PBS["ncpus"]} +#PBS -l mem={internal.FLUXSITE_DEFAULT_PBS["mem"]} +#PBS -l walltime={internal.FLUXSITE_DEFAULT_PBS["walltime"]} #PBS -q normal #PBS -P tm70 #PBS -j oe