diff --git a/.conda/benchcab-dev.yaml b/.conda/benchcab-dev.yaml index 9b915012..2fbe1411 100644 --- a/.conda/benchcab-dev.yaml +++ b/.conda/benchcab-dev.yaml @@ -8,4 +8,6 @@ dependencies: - netcdf4 - numpy - pytest + - coverage - pyyaml + - flatdict diff --git a/.conda/meta.yaml b/.conda/meta.yaml index 2fdd45f3..0ba08259 100644 --- a/.conda/meta.yaml +++ b/.conda/meta.yaml @@ -25,3 +25,4 @@ requirements: - netCDF4 - PyYAML - f90nml + - flatdict diff --git a/benchcab/bench_config.py b/benchcab/bench_config.py index 0e70d33f..a3cc4ce4 100644 --- a/benchcab/bench_config.py +++ b/benchcab/bench_config.py @@ -1,12 +1,9 @@ -""" -A module containing all *_config() functions. - -""" +"""A module containing all *_config() functions.""" from pathlib import Path import yaml -from benchcab.internal import MEORG_EXPERIMENTS, DEFAULT_SCIENCE_CONFIGURATIONS +from benchcab import internal def check_config(config: dict): @@ -15,12 +12,10 @@ def check_config(config: dict): If the config is invalid, an exception is raised. Otherwise, do nothing. """ - required_keys = ["realisations", "project", "modules", "experiment"] - if any(key not in config for key in required_keys): + if any(key not in config for key in internal.CONFIG_REQUIRED_KEYS): raise ValueError( "The config file does not list all required entries. " - "Those are: " - ", ".join(required_keys) + "Those are: " + ", ".join(internal.CONFIG_REQUIRED_KEYS) ) if not isinstance(config["project"], str): @@ -46,12 +41,13 @@ def check_config(config: dict): "that is compatible with the f90nml python package." ) - valid_experiments = list(MEORG_EXPERIMENTS) + MEORG_EXPERIMENTS["five-site-test"] + valid_experiments = ( + list(internal.MEORG_EXPERIMENTS) + internal.MEORG_EXPERIMENTS["five-site-test"] + ) if config["experiment"] not in valid_experiments: raise ValueError( "The 'experiment' key is invalid.\n" - "Valid experiments are: " - ", ".join(valid_experiments) + "Valid experiments are: " + ", ".join(valid_experiments) ) if not isinstance(config["realisations"], list): @@ -119,6 +115,6 @@ def read_config(config_path: str) -> dict: branch.setdefault("build_script", "") # Add "science_configurations" if not provided and set to default value - config.setdefault("science_configurations", DEFAULT_SCIENCE_CONFIGURATIONS) + config.setdefault("science_configurations", internal.DEFAULT_SCIENCE_CONFIGURATIONS) return config diff --git a/benchcab/benchcab.py b/benchcab/benchcab.py index affaa444..97735cb1 100644 --- a/benchcab/benchcab.py +++ b/benchcab/benchcab.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python - """Contains the main program entry point for `benchcab`.""" import sys @@ -7,24 +5,32 @@ from benchcab.job_script import create_job_script, submit_job from benchcab.bench_config import read_config from benchcab.benchtree import setup_fluxnet_directory_tree, setup_src_dir -from benchcab.build_cable import build_cable +from benchcab.build_cable import default_build, custom_build from benchcab.get_cable import ( checkout_cable, checkout_cable_auxiliary, - archive_rev_number, + svn_info_show_item, + next_path, ) from benchcab.internal import ( validate_environment, get_met_sites, + CWD, MULTIPROCESS, SITE_LOG_DIR, SITE_TASKS_DIR, SITE_OUTPUT_DIR, ) -from benchcab.task import get_fluxnet_tasks, get_fluxnet_comparisons, Task +from benchcab.task import ( + get_fluxnet_tasks, + get_fluxnet_comparisons, + run_tasks, + run_tasks_in_parallel, + run_comparisons, + run_comparisons_in_parallel, + Task, +) from benchcab.cli import generate_parser -from benchcab.run_cable_site import run_tasks, run_tasks_in_parallel -from benchcab.run_comparison import run_comparisons, run_comparisons_in_parallel from benchcab.environment_modules import module_load, module_is_loaded @@ -52,23 +58,42 @@ def _initialise_tasks(self) -> list[Task]: def checkout(self): """Endpoint for `benchcab checkout`.""" + setup_src_dir() + print("Checking out repositories...") + rev_number_log = "" for branch in self.config["realisations"]: - checkout_cable(branch, verbose=self.args.verbose) + path_to_repo = checkout_cable(branch, verbose=self.args.verbose) + rev_number_log += ( + f"{branch['name']} last changed revision: " + f"{svn_info_show_item(path_to_repo, 'last-changed-revision')}\n" + ) + + # TODO(Sean) we should archive revision numbers for CABLE-AUX checkout_cable_auxiliary(self.args.verbose) - archive_rev_number() + + rev_number_log_path = CWD / next_path("rev_number-*.log") + print(f"Writing revision number info to {rev_number_log_path.relative_to(CWD)}") + with open(rev_number_log_path, "w", encoding="utf-8") as file: + file.write(rev_number_log) + print("") def build(self): """Endpoint for `benchcab build`.""" for branch in self.config["realisations"]: - build_cable( - branch["build_script"], - branch["name"], - self.config["modules"], - verbose=self.args.verbose, - ) + if branch["build_script"]: + custom_build( + branch["build_script"], branch["name"], verbose=self.args.verbose + ) + else: + default_build( + branch["name"], + self.config["modules"], + verbose=self.args.verbose, + ) + print(f"Successfully compiled CABLE for realisation {branch['name']}") print("") def fluxnet_setup_work_directory(self): diff --git a/benchcab/benchtree.py b/benchcab/benchtree.py index 58df2f13..1dfaefa9 100644 --- a/benchcab/benchtree.py +++ b/benchcab/benchtree.py @@ -7,79 +7,77 @@ from benchcab.task import Task -def clean_directory_tree(root_dir=internal.CWD): - """Remove pre-existing directories in `root_dir`.""" - src_dir = Path(root_dir, internal.SRC_DIR) +def clean_directory_tree(): + """Remove pre-existing directories in current working directory.""" + src_dir = Path(internal.CWD, internal.SRC_DIR) if src_dir.exists(): shutil.rmtree(src_dir) - run_dir = Path(root_dir, internal.RUN_DIR) + run_dir = Path(internal.CWD, internal.RUN_DIR) if run_dir.exists(): shutil.rmtree(run_dir) -def setup_src_dir(root_dir=internal.CWD): +def setup_src_dir(): """Make `src` directory.""" - src_dir = Path(root_dir, internal.SRC_DIR) + src_dir = Path(internal.CWD, internal.SRC_DIR) if not src_dir.exists(): - print(f"Creating {src_dir.relative_to(root_dir)} directory: {src_dir}") + print(f"Creating {src_dir.relative_to(internal.CWD)} directory: {src_dir}") os.makedirs(src_dir) -def setup_fluxnet_directory_tree( - fluxnet_tasks: list[Task], root_dir=internal.CWD, verbose=False -): +def setup_fluxnet_directory_tree(fluxnet_tasks: list[Task], verbose=False): """Generate the directory structure used of `benchcab`.""" - run_dir = Path(root_dir, internal.RUN_DIR) + run_dir = Path(internal.CWD, internal.RUN_DIR) if not run_dir.exists(): os.makedirs(run_dir) - site_run_dir = Path(root_dir, internal.SITE_RUN_DIR) + site_run_dir = Path(internal.CWD, internal.SITE_RUN_DIR) if not site_run_dir.exists(): os.makedirs(site_run_dir) - site_log_dir = Path(root_dir, internal.SITE_LOG_DIR) + site_log_dir = Path(internal.CWD, internal.SITE_LOG_DIR) if not site_log_dir.exists(): print( - f"Creating {site_log_dir.relative_to(root_dir)} directory: {site_log_dir}" + f"Creating {site_log_dir.relative_to(internal.CWD)} directory: {site_log_dir}" ) os.makedirs(site_log_dir) - site_output_dir = Path(root_dir, internal.SITE_OUTPUT_DIR) + site_output_dir = Path(internal.CWD, internal.SITE_OUTPUT_DIR) if not site_output_dir.exists(): print( - f"Creating {site_output_dir.relative_to(root_dir)} directory: {site_output_dir}" + f"Creating {site_output_dir.relative_to(internal.CWD)} directory: {site_output_dir}" ) os.makedirs(site_output_dir) - site_tasks_dir = Path(root_dir, internal.SITE_TASKS_DIR) + site_tasks_dir = Path(internal.CWD, internal.SITE_TASKS_DIR) if not site_tasks_dir.exists(): print( - f"Creating {site_tasks_dir.relative_to(root_dir)} directory: {site_tasks_dir}" + f"Creating {site_tasks_dir.relative_to(internal.CWD)} directory: {site_tasks_dir}" ) os.makedirs(site_tasks_dir) - site_analysis_dir = Path(root_dir, internal.SITE_ANALYSIS_DIR) + site_analysis_dir = Path(internal.CWD, internal.SITE_ANALYSIS_DIR) if not site_analysis_dir.exists(): print( - f"Creating {site_analysis_dir.relative_to(root_dir)} directory: {site_analysis_dir}" + f"Creating {site_analysis_dir.relative_to(internal.CWD)} directory: {site_analysis_dir}" ) os.makedirs(site_analysis_dir) - site_bitwise_cmp_dir = Path(root_dir, internal.SITE_BITWISE_CMP_DIR) + site_bitwise_cmp_dir = Path(internal.CWD, internal.SITE_BITWISE_CMP_DIR) if not site_bitwise_cmp_dir.exists(): print( - f"Creating {site_bitwise_cmp_dir.relative_to(root_dir)} directory: " + f"Creating {site_bitwise_cmp_dir.relative_to(internal.CWD)} directory: " f"{site_bitwise_cmp_dir}" ) os.makedirs(site_bitwise_cmp_dir) print("Creating task directories...") for task in fluxnet_tasks: - task_dir = Path(root_dir, internal.SITE_TASKS_DIR, task.get_task_name()) + task_dir = Path(internal.CWD, internal.SITE_TASKS_DIR, task.get_task_name()) if not task_dir.exists(): if verbose: - print(f"Creating {task_dir.relative_to(root_dir)}: " f"{task_dir}") + print(f"Creating {task_dir.relative_to(internal.CWD)}: " f"{task_dir}") os.makedirs(task_dir) diff --git a/benchcab/build_cable.py b/benchcab/build_cable.py index 8e3d452d..4823c6e0 100755 --- a/benchcab/build_cable.py +++ b/benchcab/build_cable.py @@ -1,220 +1,115 @@ -#!/usr/bin/env python - -""" -Build CABLE executables ... - -That's all folks. -""" - -__author__ = "Martin De Kauwe" -__version__ = "1.0 (09.03.2019)" -__email__ = "mdekauwe@gmail.com" +"""A module containing functions for building CABLE.""" import os +import contextlib import stat import subprocess -from pathlib import Path - -from benchcab.internal import CWD, SRC_DIR, MPI - - -def add_module_load(lines, nindent, modules): - """Read in the environment file using config data. - Add lines to load each module listed in environment file - at the end of the list of strings, lines - - lines: list of strings - nindent: integer, number of indent spaces to add for each line""" - - loclines = lines.copy() - - # Append new lines to the list of lines for each module - for mod in modules: - # Add newline if not in "mod" - if "\n" not in mod: - mod = mod + "\n" - toadd = "".join([" " * nindent, "module load ", mod]) - loclines.append(toadd) - - return loclines - - -def find_purge_line(filelines, filename: Path): - """Find the line with module purge in the list of file lines. - Check there is only 1 such line. Return the index of the line. - - filelines: list of strings such as returned by readlines() - filename: name of input file""" - - purge_line = [ - purge_ind for purge_ind, ll in enumerate(filelines) if "module purge" in ll - ] - # Check we found only 1 module purge line. - assert ( - len(purge_line) == 1 - ), f"{filename} should contain exactly one line with 'module purge'" - purge_line = purge_line[0] - - return purge_line - - -def change_build_lines(filelines, modules, filename: Path): - """Get the lines from the build script and modify them: - - remove all the module load and module add lines - - read in the environment file for Gadi - - insert the module load lines for the modules in the env. file - filelines: list of strings such as returned by readlines() - filename: name of input file""" - - # Remove any line loading a module - nomodulelines = [ - ll - for ll in filelines - if all([substring not in ll for substring in ["module load", "module add"]]) - ] - - # Find the line with "module purge" - purge_line = find_purge_line(nomodulelines, filename=filename) - - # Get the indentation right: copy the indentation from the module purge line - nindent = nomodulelines[purge_line].find("module purge") - - outlines = nomodulelines[: purge_line + 1] # Take all lines until module purge - - # append lines to load the correct modules - outlines = add_module_load(outlines, nindent, modules) - - # add the end of the file as in the original file - outlines.extend(nomodulelines[purge_line + 1 :]) - - return outlines - - -def adjust_build_script(build_file_path: Path, modules: list): - """Customise the build script with the modules listed in the configuration file.""" - - f = open(build_file_path, "r") - lines = f.readlines() - f.close() +import shlex +import shutil +import pathlib - if MPI: - ofname = build_file_path.parent / "my_build_mpi.ksh" - else: - ofname = build_file_path.parent / "my_build.ksh" - of = open(ofname, "w") +from benchcab import internal +from benchcab import environment_modules - # We find all the "module load" lines and remove them from - # the list of lines. - # Then after the line "module purge", we add a line for - # each module listed in gadi_env.sh - outlines = change_build_lines(lines, modules=modules, filename=build_file_path) - of.writelines(outlines) - of.close() +@contextlib.contextmanager +def chdir(newdir: pathlib.Path): + """Context manager `cd`.""" + prevdir = pathlib.Path.cwd() + os.chdir(newdir.expanduser()) + try: + yield + finally: + os.chdir(prevdir) - # Add executable permissions to the new build script for the user - st = os.stat(ofname) - os.chmod(ofname, st.st_mode | stat.S_IEXEC) - return ofname +def patch_build_script(file_path): + """Remove lines from `file_path` that call the environment modules package.""" + with open(file_path, "r", encoding="utf-8") as file: + contents = file.read() + with open(file_path, "w", encoding="utf-8") as file: + for line in contents.splitlines(True): + cmds = shlex.split(line, comments=True) + if "module" not in cmds: + file.write(line) -def clean_if_needed(): - """Clean a previous compilation if latest executable doesn't have the name we want.""" - - wanted_exe = f"cable{'-mpi'*MPI}" - - exe_list = [Path("cable-mpi"), Path("cable")] - exe_found = [this_exe for this_exe in exe_list if this_exe.is_file()] - - clean_compil = False - if len(exe_found) > 0: - newest_exe = max(exe_found, key=lambda x: x.stat().st_mtime) - clean_compil = newest_exe != wanted_exe - - # Clean compilation if needed - if clean_compil: - cmd = f"rm -fr .tmp" - error = subprocess.call(cmd, shell=True) - if error == 1: - raise ("Error cleaning previous compilation") +def default_build(branch_name: str, modules: list, verbose=False): + """Build CABLE using the default script. + This loads the modules specified in the configuration file. + """ + print( + f"Compiling CABLE {'with MPI' if internal.MPI else 'serially'} for " + f"realisation {branch_name}..." + ) -def default_build(branch_name: str, modules: list, root_dir=CWD, verbose=False): - """Build CABLE using the default script. - This loads the modules specified in the configuration file.""" + default_script_path = ( + internal.CWD / internal.SRC_DIR / branch_name / "offline" / "build3.sh" + ) - default_script_path = Path(root_dir, SRC_DIR, branch_name, "offline", "build3.sh") if not default_script_path.is_file(): - raise RuntimeError( - f"The default build script, {default_script_path}, could not be found." + raise FileNotFoundError( + f"The default build script, {default_script_path}, could not be found. " "Do you need to specify a different build script with the " "'build_script' option in config.yaml?", ) - build_script_path = adjust_build_script(default_script_path, modules) + tmp_script_path = default_script_path.parent / "tmp-build3.sh" - os.chdir(build_script_path.parent) - clean_if_needed() - cmd = f"{build_script_path} {'mpi'*MPI}" if verbose: - print(cmd) - error = subprocess.call( - cmd, shell=True, stdout=None if verbose else subprocess.DEVNULL - ) - if error == 1: - raise ("Error building executable with default script") - - os.chdir(CWD) - - -def custom_build( - config_build_script: str, branch_name: str, root_dir=CWD, verbose=False -): - """Build CABLE with a script provided in configuration file""" + print(f" Copying {default_script_path} to {tmp_script_path}") + shutil.copy(default_script_path, tmp_script_path) + if verbose: + print(f" chmod +x {tmp_script_path}") + tmp_script_path.chmod(tmp_script_path.stat().st_mode | stat.S_IEXEC) - build_script_path = Path(root_dir, SRC_DIR, branch_name, config_build_script) + if verbose: + print( + f" Patching {tmp_script_path.name}: remove lines that call " + "environment modules" + ) + patch_build_script(tmp_script_path) - if not build_script_path.is_file(): - raise RuntimeError( - f"The build script specified in the config.yaml file, {build_script_path}, " - "is not a valid file." + if verbose: + print(" Loading modules: " + " ".join(modules)) + environment_modules.module_load(*modules) + + with chdir(default_script_path.parent): + cmd = f"./{tmp_script_path.name}" + (" mpi" if internal.MPI else "") + if verbose: + print(f" {cmd}") + subprocess.run( + cmd, + shell=True, + check=True, + stdout=None if verbose else subprocess.DEVNULL, + stderr=subprocess.STDOUT, ) - os.chdir(build_script_path.parent) - cmd = f"{build_script_path}" if verbose: - print(cmd) - error = subprocess.call( - cmd, shell=True, stdout=None if verbose else subprocess.DEVNULL - ) - if error == 1: - raise ("Error building executable with custom script") + print(" Unloading modules: " + " ".join(modules)) + environment_modules.module_unload(*modules) - os.chdir(CWD) +def custom_build(config_build_script: str, branch_name: str, verbose=False): + """Build CABLE with a script provided in configuration file""" + print( + "Compiling CABLE using custom build script for " f"realisation {branch_name}..." + ) -def build_cable( - config_build_script: str, - branch_name: str, - modules: list, - root_dir=CWD, - verbose=False, -): - if config_build_script: - # Use provided script as is - print( - "Compiling CABLE using custom build script for " - f"realisation {branch_name}..." - ) - custom_build(config_build_script, branch_name, root_dir, verbose=verbose) + build_script_path = ( + internal.CWD / internal.SRC_DIR / branch_name / config_build_script + ) - else: - # Use default script with provided module versions - print( - f"Compiling CABLE {'with MPI' if MPI else 'serially'} for " - f"realisation {branch_name}..." + with chdir(build_script_path.parent): + cmd = f"./{build_script_path.name}" + if verbose: + print(f" {cmd}") + subprocess.run( + cmd, + shell=True, + check=True, + stdout=None if verbose else subprocess.DEVNULL, + stderr=subprocess.STDOUT, ) - default_build(branch_name, modules, root_dir, verbose=verbose) - - print(f"Successfully compiled CABLE for realisation {branch_name}") diff --git a/benchcab/environment_modules.py b/benchcab/environment_modules.py index 8ed2a531..5d4beb14 100644 --- a/benchcab/environment_modules.py +++ b/benchcab/environment_modules.py @@ -14,6 +14,10 @@ # when running pytest locally (outside of Gadi) +class EnvironmentModulesError(Exception): + """Custom exception class for environment modules errors.""" + + def module_is_avail(*args: str): """Wrapper around `module is-avail modulefile...`""" return module("is-avail", *args) @@ -26,4 +30,11 @@ def module_is_loaded(*args: str): def module_load(*args: str): """Wrapper around `module load modulefile...`""" - module("load", *args) + if not module("load", *args): + raise EnvironmentModulesError("Failed to load modules: " + " ".join(args)) + + +def module_unload(*args: str): + """Wrapper around `module unload modulefile...`""" + if not module("unload", *args): + raise EnvironmentModulesError("Failed to unload modules: " + " ".join(args)) diff --git a/benchcab/get_cable.py b/benchcab/get_cable.py index 2a3c9842..d9f6a8b2 100755 --- a/benchcab/get_cable.py +++ b/benchcab/get_cable.py @@ -1,23 +1,10 @@ -#!/usr/bin/env python +"""A module containing functions for checking out CABLE repositories.""" -""" -Get the head of the CABLE trunk, the user branch and CABLE-AUX - -That's all folks. -""" - -__author__ = "Martin De Kauwe" -__version__ = "1.0 (09.03.2019)" -__email__ = "mdekauwe@gmail.com" - -import os import subprocess -import getpass from typing import Union from pathlib import Path from benchcab import internal -from benchcab.internal import CWD, SRC_DIR, HOME_DIR, CABLE_SVN_ROOT, CABLE_AUX_DIR def next_path(path_pattern, sep="-"): @@ -35,37 +22,14 @@ def next_path(path_pattern, sep="-"): new_file_index = 1 common_filename, _ = loc_pattern.stem.split(sep) - pattern_files_sorted = sorted(Path(".").glob(path_pattern)) - if len(pattern_files_sorted): + pattern_files_sorted = sorted(internal.CWD.glob(path_pattern)) + if pattern_files_sorted != []: common_filename, last_file_index = pattern_files_sorted[-1].stem.split(sep) new_file_index = int(last_file_index) + 1 return f"{common_filename}{sep}{new_file_index}{loc_pattern.suffix}" -def archive_rev_number(): - """Archives previous rev_number.log""" - - revision_file = Path("rev_number.log") - if revision_file.exists(): - new_revision_file = next_path("rev_number-*.log") - print(f"Writing revision number info to {new_revision_file}") - revision_file.replace(new_revision_file) - - -def need_pass() -> bool: - """If the user requires a password for SVN, return `True`. Otherwise return `False`.""" - try: - return os.listdir(f"{HOME_DIR}/.subversion/auth/svn.simple/") == [] - except FileNotFoundError: - return False - - -def get_password() -> str: - """Prompt user for a password.""" - return "'" + getpass.getpass("Password:") + "'" - - def svn_info_show_item(path: Union[Path, str], item: str) -> str: """A wrapper around `svn info --show-item `.""" cmd = f"svn info --show-item {item} {path}" @@ -73,50 +37,48 @@ def svn_info_show_item(path: Union[Path, str], item: str) -> str: return out.stdout.strip() -def checkout_cable_auxiliary(verbose=False): +def checkout_cable_auxiliary(verbose=False) -> Path: """Checkout CABLE-AUX.""" - # TODO(Sean) we should archive revision numbers for CABLE-AUX - cable_aux_dir = Path(CWD / CABLE_AUX_DIR) - if cable_aux_dir.exists(): - return + cable_aux_dir = Path(internal.CWD / internal.CABLE_AUX_DIR) - cmd = f"svn checkout {CABLE_SVN_ROOT}/branches/Share/CABLE-AUX {cable_aux_dir}" - - if need_pass(): - cmd += f" --password {get_password()}" + cmd = f"svn checkout {internal.CABLE_SVN_ROOT}/branches/Share/CABLE-AUX {cable_aux_dir}" if verbose: print(cmd) subprocess.run( - cmd, shell=True, check=True, stdout=None if verbose else subprocess.DEVNULL + cmd, + shell=True, + check=True, + stdout=None if verbose else subprocess.DEVNULL, + stderr=subprocess.STDOUT, ) + revision = svn_info_show_item(cable_aux_dir, "revision") + print(f"Successfully checked out CABLE-AUX at revision {revision}") + # Check relevant files exist in repository: - if not Path.exists(CWD / internal.GRID_FILE): + if not Path.exists(internal.CWD / internal.GRID_FILE): raise RuntimeError( f"Error checking out CABLE-AUX: cannot find file '{internal.GRID_FILE}'" ) - if not Path.exists(CWD / internal.PHEN_FILE): + if not Path.exists(internal.CWD / internal.PHEN_FILE): raise RuntimeError( f"Error checking out CABLE-AUX: cannot find file '{internal.PHEN_FILE}'" ) - if not Path.exists(CWD / internal.CNPBIOME_FILE): + if not Path.exists(internal.CWD / internal.CNPBIOME_FILE): raise RuntimeError( f"Error checking out CABLE-AUX: cannot find file '{internal.CNPBIOME_FILE}'" ) - rev_number = svn_info_show_item( - f"{CABLE_SVN_ROOT}/branches/Share/CABLE-AUX", "revision" - ) - print(f"Successfully checked out CABLE-AUX at revision {rev_number}") + return cable_aux_dir -def checkout_cable(branch_config: dict, verbose=False): +def checkout_cable(branch_config: dict, verbose=False) -> Path: """Checkout a branch of CABLE.""" # TODO(Sean) do nothing if the repository has already been checked out? # This also relates the 'clean' feature. @@ -127,11 +89,8 @@ def checkout_cable(branch_config: dict, verbose=False): if branch_config["revision"] > 0: cmd += f" -r {branch_config['revision']}" - if need_pass(): - cmd += f" --password {get_password()}" - - path_to_repo = Path(CWD, SRC_DIR, branch_config["name"]) - cmd += f" {CABLE_SVN_ROOT}/{branch_config['path']} {path_to_repo}" + path_to_repo = Path(internal.CWD, internal.SRC_DIR, branch_config["name"]) + cmd += f" {internal.CABLE_SVN_ROOT}/{branch_config['path']} {path_to_repo}" if verbose: print(cmd) @@ -141,14 +100,10 @@ def checkout_cable(branch_config: dict, verbose=False): shell=True, check=True, stdout=None if verbose else subprocess.DEVNULL, + stderr=subprocess.STDOUT, ) - # Write last change revision number to rev_number.log file - last_changed_rev_number = svn_info_show_item(path_to_repo, "last-changed-revision") - with open(f"{CWD}/rev_number.log", "a", encoding="utf-8") as fout: - fout.write( - f"{branch_config['name']} last changed revision: {last_changed_rev_number}\n" - ) + revision = svn_info_show_item(path_to_repo, "revision") + print(f"Successfully checked out {branch_config['name']} at revision {revision}") - rev_number = svn_info_show_item(path_to_repo, "revision") - print(f"Successfully checked out {branch_config['name']} at revision {rev_number}") + return path_to_repo diff --git a/benchcab/internal.py b/benchcab/internal.py index 8772a778..83e63c67 100644 --- a/benchcab/internal.py +++ b/benchcab/internal.py @@ -9,8 +9,7 @@ _, NODENAME, _, _, _ = os.uname() -# Default config file names -DEFAULT_CONFIG = "config.yaml" +CONFIG_REQUIRED_KEYS = ["realisations", "project", "modules", "experiment"] # Parameters for job script: QSUB_FNAME = "benchmark_cable_qsub.sh" diff --git a/benchcab/job_script.py b/benchcab/job_script.py index 64a89248..0ab332a6 100644 --- a/benchcab/job_script.py +++ b/benchcab/job_script.py @@ -1,11 +1,19 @@ """Contains functions for job script creation and submission on Gadi.""" import os -import sys import subprocess from pathlib import Path -from benchcab.internal import QSUB_FNAME, NCPUS, MEM, WALL_TIME +from benchcab import internal + + +def get_local_storage_flag(path: Path) -> str: + """Returns the PBS storage flag for a path on the Gadi file system.""" + if str(path).startswith("/scratch"): + return f"scratch/{path.parts[2]}" + if str(path).startswith("/g/data"): + return f"gdata/{path.parts[3]}" + raise RuntimeError("Current directory structure unknown on Gadi.") def create_job_script( @@ -23,38 +31,28 @@ def create_job_script( """ + job_script_path = internal.CWD / internal.QSUB_FNAME module_load_lines = "\n".join( - f"module add {module_name}" for module_name in modules + f"module load {module_name}" for module_name in modules ) verbose_flag = "-v" if verbose else "" - # Add the local directory to the storage flag for PBS - curdir = Path.cwd().parts - if "scratch" in curdir: - curdir_root = "scratch" - curdir_proj = curdir[2] - elif "g" in curdir and "data" in curdir: - curdir_root = "gdata" - curdir_proj = curdir[3] - else: - print("Current directory structure unknown on Gadi") - sys.exit(1) - print( - f"Creating PBS job script to run FLUXNET tasks on compute nodes: {QSUB_FNAME}" + "Creating PBS job script to run FLUXNET tasks on compute " + f"nodes: {job_script_path.relative_to(internal.CWD)}" ) - with open(QSUB_FNAME, "w", encoding="utf-8") as file: + with open(job_script_path, "w", encoding="utf-8") as file: file.write( f"""#!/bin/bash #PBS -l wd -#PBS -l ncpus={NCPUS} -#PBS -l mem={MEM} -#PBS -l walltime={WALL_TIME} +#PBS -l ncpus={internal.NCPUS} +#PBS -l mem={internal.MEM} +#PBS -l walltime={internal.WALL_TIME} #PBS -q normal #PBS -P {project} #PBS -j oe #PBS -m e -#PBS -l storage=gdata/ks32+gdata/hh5+gdata/{project}+{curdir_root}/{curdir_proj} +#PBS -l storage=gdata/ks32+gdata/hh5+gdata/{project}+{get_local_storage_flag(internal.CWD)} module purge module use /g/data/hh5/public/modules @@ -75,17 +73,18 @@ def create_job_script( """ ) - os.chmod(QSUB_FNAME, 0o755) - def submit_job(): """Submits the job script specified by `QSUB_FNAME`.""" - cmd = f"qsub {QSUB_FNAME}" - proc = subprocess.run(cmd, shell=True, capture_output=True, text=True, check=False) - if proc.returncode != 0: + job_script_path = internal.CWD / internal.QSUB_FNAME + cmd = f"qsub {job_script_path}" + try: + proc = subprocess.run( + cmd, shell=True, check=True, capture_output=True, text=True + ) + print(f"PBS job submitted: {proc.stdout.strip()}") + except subprocess.CalledProcessError as exc: print("Error when submitting job to NCI queue") - print(proc.stderr) - sys.exit(1) - - print(f"PBS job submitted: {proc.stdout.strip()}") + print(exc.stderr) + raise diff --git a/benchcab/run_cable_site.py b/benchcab/run_cable_site.py deleted file mode 100644 index fd550ec5..00000000 --- a/benchcab/run_cable_site.py +++ /dev/null @@ -1,148 +0,0 @@ -#!/usr/bin/env python - -""" -Run CABLE either for a single site, a subset, or all the flux sites pointed to -in the met directory - -- Only intended for biophysics -- Set mpi = True if doing a number of flux sites - -That's all folks. -""" -__author__ = "Martin De Kauwe" -__version__ = "1.0 (16.06.2020)" -__email__ = "mdekauwe@gmail.com" - -import os -import subprocess -from pathlib import Path -import multiprocessing -import queue -import netCDF4 - -from benchcab.get_cable import svn_info_show_item -from benchcab.internal import ( - CWD, - SRC_DIR, - SITE_TASKS_DIR, - SITE_OUTPUT_DIR, - CABLE_EXE, - CABLE_NML, - CABLE_STDOUT_FILENAME, - NCPUS, -) -from benchcab.task import Task - - -def run_tasks(tasks: list[Task], verbose=False): - """Runs tasks in `tasks` serially.""" - for task in tasks: - run_task(task, verbose=verbose) - - -def run_tasks_in_parallel(tasks: list[Task], verbose=False): - """Runs tasks in `tasks` in parallel across multiple processes.""" - - task_queue: multiprocessing.Queue = multiprocessing.Queue() - for task in tasks: - task_queue.put(task) - - processes = [] - for _ in range(NCPUS): - proc = multiprocessing.Process(target=worker, args=[task_queue, verbose]) - proc.start() - processes.append(proc) - - for proc in processes: - proc.join() - - -def worker(task_queue: multiprocessing.Queue, verbose=False): - """Runs tasks in `task_queue` until the queue is emptied.""" - while True: - try: - task = task_queue.get_nowait() - except queue.Empty: - return - run_task(task, verbose=verbose) - - -def run_task(task: Task, verbose=False): - """Run the CABLE executable for the given task.""" - task_name = task.get_task_name() - task_dir = CWD / SITE_TASKS_DIR / task_name - if verbose: - print( - f"Running task {task_name}... CABLE standard output " - f"saved in {task_dir / CABLE_STDOUT_FILENAME}" - ) - - if verbose: - print(f" cd {task_dir}") - os.chdir(task_dir) - - cmd = f"./{CABLE_EXE} {CABLE_NML} > {CABLE_STDOUT_FILENAME} 2>&1" - try: - if verbose: - print(f" {cmd}") - subprocess.run(cmd, shell=True, check=True) - except subprocess.CalledProcessError: - print(f"Error: CABLE returned an error for task {task_name}") - return - - output_file = CWD / SITE_OUTPUT_DIR / task.get_output_filename() - if verbose: - print(f" Adding attributes to output file: {output_file}") - add_attributes_to_output_file( - output_file=output_file, - nml_file=Path(CWD / SITE_TASKS_DIR / task_name / CABLE_NML), - sci_config=task.sci_config, - url=svn_info_show_item(CWD / SRC_DIR / task.branch_name, "url"), - rev=svn_info_show_item(CWD / SRC_DIR / task.branch_name, "revision"), - ) - - if verbose: - print(f" cd {CWD}") - os.chdir(CWD) - - -def add_attributes_to_output_file(output_file, nml_file, sci_config, url, rev): - """Adds global attributes to netcdf output file. - - Attributes include branch url, branch revision number and key value pairs in - the namelist file used to run cable. - """ - - # TODO(Sean) remove science configurations as these are in the namelist file - - nc_output = netCDF4.Dataset(output_file, "r+") - - # Add SVN info to output file - nc_output.setncattr("cable_branch", url) - nc_output.setncattr("svn_revision_number", rev) - - # Add science configurations to output file - for key, value in sci_config.items(): - nc_output.setncattr("SCI_CONFIG", f"{key}_{value}") - - # Add namelist to output file - with open(nml_file, "r", encoding="utf-8") as file: - namelist = file.readlines() - for line in namelist: - if line.strip() == "": - # skip blank lines - continue - if line.strip().startswith("!"): - # skip lines that are comments - continue - if line.startswith("&"): - # skip start of namelist - continue - if "=" not in line: - # skip lines without key = value statement - continue - key = str(line.strip().split("=")[0]).rstrip() - val = str(line.strip().split("=")[1]).rstrip() - nc_output.setncattr(key, val) - - nc_output.close() diff --git a/benchcab/run_comparison.py b/benchcab/run_comparison.py deleted file mode 100644 index e9959486..00000000 --- a/benchcab/run_comparison.py +++ /dev/null @@ -1,82 +0,0 @@ -"""Contains functions for running bitwise comparisons.""" - -import subprocess -import multiprocessing -import queue - -from benchcab.task import Task -from benchcab.internal import CWD, SITE_OUTPUT_DIR, SITE_BITWISE_CMP_DIR, NCPUS - - -def get_comparison_name(task_a: Task, task_b: Task) -> str: - """Returns the naming convention used for bitwise comparisons. - - Assumes `met_forcing_file` and `sci_conf_id` attributes are - common to both tasks. - """ - met_forcing_base_filename = task_a.met_forcing_file.split(".")[0] - return ( - f"{met_forcing_base_filename}_S{task_a.sci_conf_id}" - f"_R{task_a.branch_id}_R{task_b.branch_id}" - ) - - -def run_comparisons(comparisons: list[tuple[Task, Task]], verbose=False): - """Runs bitwise comparison tasks serially.""" - for task_a, task_b in comparisons: - run_comparison(task_a, task_b, verbose=verbose) - - -def run_comparisons_in_parallel(comparisons: list[tuple[Task, Task]], verbose=False): - """Runs bitwise comparison tasks in parallel across multiple processes.""" - - task_queue: multiprocessing.Queue = multiprocessing.Queue() - for pair in comparisons: - task_queue.put(pair) - - processes = [] - for _ in range(NCPUS): - proc = multiprocessing.Process(target=worker, args=[task_queue, verbose]) - proc.start() - processes.append(proc) - - for proc in processes: - proc.join() - - -def worker(task_queue: multiprocessing.Queue, verbose=False): - """Runs bitwise comparison tasks in `task_queue` until the queue is emptied.""" - while True: - try: - task_a, task_b = task_queue.get_nowait() - except queue.Empty: - return - run_comparison(task_a, task_b, verbose=verbose) - - -def run_comparison(task_a: Task, task_b: Task, verbose=False): - """Executes `nccmp -df` between the NetCDF output file of `task_a` and of `task_b`.""" - task_a_output = CWD / SITE_OUTPUT_DIR / task_a.get_output_filename() - task_b_output = CWD / SITE_OUTPUT_DIR / task_b.get_output_filename() - output_file = ( - CWD / SITE_BITWISE_CMP_DIR / f"{get_comparison_name(task_a, task_b)}.txt" - ) - if verbose: - print( - f"Comparing files {task_a_output.name} and {task_b_output.name} bitwise..." - ) - cmd = f"nccmp -df {task_a_output} {task_b_output} 2>&1" - if verbose: - print(f" {cmd}") - proc = subprocess.run(cmd, shell=True, check=False, capture_output=True, text=True) - if proc.returncode != 0: - with open(output_file, "w", encoding="utf-8") as file: - file.write(proc.stdout) - print( - f"Failure: files {task_a_output.name} {task_b_output.name} differ. " - f"Results of diff have been written to {output_file}" - ) - else: - print( - f"Success: files {task_a_output.name} {task_b_output.name} are identitical" - ) diff --git a/benchcab/task.py b/benchcab/task.py index ca321901..8b2587f1 100644 --- a/benchcab/task.py +++ b/benchcab/task.py @@ -1,12 +1,22 @@ -"""Contains the `Task` class definition.""" +"""A module containing functions and data structures for running fluxnet tasks.""" + import os import shutil +import subprocess +import multiprocessing +import queue +import dataclasses from pathlib import Path from typing import TypeVar, Dict, Any + +import flatdict +import netCDF4 import f90nml from benchcab import internal +import benchcab.get_cable + # fmt: off # pylint: disable=invalid-name,missing-function-docstring,line-too-long @@ -32,24 +42,39 @@ def deep_update(mapping: Dict[KeyType, Any], *updating_mappings: Dict[KeyType, A # fmt: on +def patch_namelist(nml_path: Path, patch: dict): + """Writes a namelist patch specified by `patch` to `nml_path`. + + The `patch` dictionary must comply with the `f90nml` api. + """ + + if not nml_path.exists(): + f90nml.write(patch, nml_path) + return + + nml = f90nml.read(nml_path) + # remove namelist file as f90nml cannot write to an existing file + nml_path.unlink() + f90nml.write(deep_update(nml, patch), nml_path) + + +f90_logical_repr = {True: ".true.", False: ".false."} + + +class CableError(Exception): + """Custom exception class for CABLE errors.""" + + +@dataclasses.dataclass class Task: - """A class used to represent a single fluxsite task.""" - - def __init__( - self, - branch_id: int, - branch_name: str, - branch_patch: dict, - met_forcing_file: str, - sci_conf_id: int, - sci_config: dict, - ) -> None: - self.branch_id = branch_id - self.branch_name = branch_name - self.branch_patch = branch_patch - self.met_forcing_file = met_forcing_file - self.sci_conf_id = sci_conf_id - self.sci_config = sci_config + """A class used to represent a single fluxnet task.""" + + branch_id: int + branch_name: str + branch_patch: dict + met_forcing_file: str + sci_conf_id: int + sci_config: dict def get_task_name(self) -> str: """Returns the file name convention used for this task.""" @@ -64,14 +89,84 @@ def get_log_filename(self) -> str: """Returns the file name convention used for the log file.""" return f"{self.get_task_name()}_log.txt" - def clean_task(self, root_dir=internal.CWD, verbose=False): + def setup_task(self, verbose=False): + """Does all file manipulations to run cable in the task directory. + + These include: + 1. cleaning output, namelist, log files and cable executables if they exist + 2. copying namelist files (cable.nml, pft_params.nml and cable_soil_parm.nml) + into the `runs/site/tasks/` directory. + 3. copying the cable executable from the source directory + 4. make appropriate adjustments to namelist files + 5. apply a branch patch if specified + """ + + if verbose: + print(f"Setting up task: {self.get_task_name()}") + + self.clean_task(verbose=verbose) + self.fetch_files(verbose=verbose) + + nml_path = ( + internal.CWD + / internal.SITE_TASKS_DIR + / self.get_task_name() + / internal.CABLE_NML + ) + + if verbose: + print(f" Adding base configurations to CABLE namelist file {nml_path}") + patch_namelist( + nml_path, + { + "cable": { + "filename": { + "met": str(internal.MET_DIR / self.met_forcing_file), + "out": str( + internal.CWD + / internal.SITE_OUTPUT_DIR + / self.get_output_filename() + ), + "log": str( + internal.CWD + / internal.SITE_LOG_DIR + / self.get_log_filename() + ), + "restart_out": " ", + "type": str(internal.CWD / internal.GRID_FILE), + }, + "output": { + "restart": False, + }, + "fixedCO2": internal.CABLE_FIXED_CO2_CONC, + "casafile": { + "phen": str(internal.CWD / internal.PHEN_FILE), + "cnpbiome": str(internal.CWD / internal.CNPBIOME_FILE), + }, + "spinup": False, + } + }, + ) + + if verbose: + print(f" Adding science configurations to CABLE namelist file {nml_path}") + patch_namelist(nml_path, self.sci_config) + + if self.branch_patch: + if verbose: + print( + f" Adding branch specific configurations to CABLE namelist file {nml_path}" + ) + patch_namelist(nml_path, self.branch_patch) + + def clean_task(self, verbose=False): """Cleans output files, namelist files, log files and cable executables if they exist.""" if verbose: print(" Cleaning task") task_name = self.get_task_name() - task_dir = Path(root_dir, internal.SITE_TASKS_DIR, task_name) + task_dir = Path(internal.CWD, internal.SITE_TASKS_DIR, task_name) if Path.exists(task_dir / internal.CABLE_EXE): os.remove(task_dir / internal.CABLE_EXE) @@ -86,16 +181,16 @@ def clean_task(self, root_dir=internal.CWD, verbose=False): os.remove(task_dir / internal.CABLE_SOIL_NML) output_file = self.get_output_filename() - if Path.exists(root_dir / internal.SITE_OUTPUT_DIR / output_file): - os.remove(root_dir / internal.SITE_OUTPUT_DIR / output_file) + if Path.exists(internal.CWD / internal.SITE_OUTPUT_DIR / output_file): + os.remove(internal.CWD / internal.SITE_OUTPUT_DIR / output_file) log_file = self.get_log_filename() - if Path.exists(root_dir / internal.SITE_LOG_DIR / log_file): - os.remove(root_dir / internal.SITE_LOG_DIR / log_file) + if Path.exists(internal.CWD / internal.SITE_LOG_DIR / log_file): + os.remove(internal.CWD / internal.SITE_LOG_DIR / log_file) return self - def fetch_files(self, root_dir=internal.CWD, verbose=False): + def fetch_files(self, verbose=False): """Retrieves all files necessary to run cable in the task directory. Namely: @@ -103,18 +198,20 @@ def fetch_files(self, root_dir=internal.CWD, verbose=False): - copies cable executable from source to 'runs/site/tasks/' directory. """ - task_dir = Path(root_dir, internal.SITE_TASKS_DIR, self.get_task_name()) + task_dir = Path(internal.CWD, internal.SITE_TASKS_DIR, self.get_task_name()) if verbose: print( - f" Copying namelist files from {root_dir / internal.NAMELIST_DIR} " + f" Copying namelist files from {internal.CWD / internal.NAMELIST_DIR} " f"to {task_dir}" ) - shutil.copytree(root_dir / internal.NAMELIST_DIR, task_dir, dirs_exist_ok=True) + shutil.copytree( + internal.CWD / internal.NAMELIST_DIR, task_dir, dirs_exist_ok=True + ) exe_src = ( - root_dir + internal.CWD / internal.SRC_DIR / self.branch_name / "offline" @@ -129,92 +226,76 @@ def fetch_files(self, root_dir=internal.CWD, verbose=False): return self - def adjust_namelist_file(self, root_dir=internal.CWD, verbose=False): - """Sets the base settings in the CABLE namelist file for this task.""" - - patch_nml = { - "cable": { - "filename": { - "met": str(internal.MET_DIR / self.met_forcing_file), - "out": str( - root_dir / internal.SITE_OUTPUT_DIR / self.get_output_filename() - ), - "log": str( - root_dir / internal.SITE_LOG_DIR / self.get_log_filename() - ), - "restart_out": " ", - "type": str(root_dir / internal.GRID_FILE), - }, - "output": { - "restart": False, - }, - "fixedCO2": internal.CABLE_FIXED_CO2_CONC, - "casafile": { - "phen": str(root_dir / internal.PHEN_FILE), - "cnpbiome": str(root_dir / internal.CNPBIOME_FILE), - }, - "spinup": False, - } - } - - patch_nml = deep_update(patch_nml, self.sci_config) - - if verbose: - # remove new line as we prepend the next message in patch_namelist_file() - print(" Adjusting namelist file: ", end="") - - self.patch_namelist_file(patch_nml, root_dir=root_dir, verbose=verbose) - - return self - - def patch_namelist_file(self, patch: dict, root_dir=internal.CWD, verbose=False): - """Writes a patch to the CABLE namelist file for this task. - - The `patch` dictionary must comply with the `f90nml` api. - """ - - task_dir = Path(root_dir, internal.SITE_TASKS_DIR, self.get_task_name()) - + def run(self, verbose=False): + """Runs a single fluxnet task.""" + task_name = self.get_task_name() + task_dir = internal.CWD / internal.SITE_TASKS_DIR / task_name if verbose: - # this message should not indent and start as lower case as we are - # appending to the previous message print( - f"applying patch to CABLE namelist file {task_dir / internal.CABLE_NML}" + f"Running task {task_name}... CABLE standard output " + f"saved in {task_dir / internal.CABLE_STDOUT_FILENAME}" ) + try: + self.run_cable(verbose=verbose) + self.add_provenance_info(verbose=verbose) + except CableError: + return - cable_nml = f90nml.read(str(task_dir / internal.CABLE_NML)) - # remove namelist file as f90nml cannot write to an existing file - os.remove(str(task_dir / internal.CABLE_NML)) - - f90nml.write(deep_update(cable_nml, patch), str(task_dir / internal.CABLE_NML)) + def run_cable(self, verbose=False): + """Run the CABLE executable for the given task. - return self + Raises `CableError` when CABLE returns a non-zero exit code. + """ + task_name = self.get_task_name() + task_dir = internal.CWD / internal.SITE_TASKS_DIR / task_name + exe_path = task_dir / internal.CABLE_EXE + nml_path = task_dir / internal.CABLE_NML + stdout_path = task_dir / internal.CABLE_STDOUT_FILENAME + cmd = f"{exe_path} {nml_path} > {stdout_path} 2>&1" + try: + if verbose: + print(f" {cmd}") + subprocess.run(cmd, shell=True, check=True) + except subprocess.CalledProcessError as exc: + print(f"Error: CABLE returned an error for task {task_name}") + raise CableError from exc - def setup_task(self, root_dir=internal.CWD, verbose=False): - """Does all file manipulations to run cable in the task directory. + def add_provenance_info(self, verbose=False): + """Adds provenance information to global attributes of netcdf output file. - These include: - 1. cleaning output, namelist, log files and cable executables if they exist - 2. copying namelist files (cable.nml, pft_params.nml and cable_soil_parm.nml) - into the `runs/site/tasks/` directory. - 3. copying the cable executable from the source directory - 4. make appropriate adjustments to namelist files - 5. apply a branch patch if specified + Attributes include branch url, branch revision number and key value pairs in + the namelist file used to run cable. """ - + nc_output_path = ( + internal.CWD / internal.SITE_OUTPUT_DIR / self.get_output_filename() + ) + nml = f90nml.read( + internal.CWD + / internal.SITE_TASKS_DIR + / self.get_task_name() + / internal.CABLE_NML + ) if verbose: - print(f"Setting up task: {self.get_task_name()}") - - self.clean_task(root_dir=root_dir, verbose=verbose) - self.fetch_files(root_dir=root_dir, verbose=verbose) - self.adjust_namelist_file(root_dir=root_dir, verbose=verbose) - - if self.branch_patch: - if verbose: - # remove new line as we prepend the next message in patch_namelist_file() - print(" Adding branch specific namelist settings: ", end="") - self.patch_namelist_file( - self.branch_patch, root_dir=root_dir, verbose=verbose + print(f" Adding attributes to output file: {nc_output_path}") + with netCDF4.Dataset(nc_output_path, "r+") as nc_output: + nc_output.setncatts( + { + **{ + key: f90_logical_repr[val] if isinstance(val, bool) else val + for key, val in flatdict.FlatDict( + nml["cable"], delimiter="%" + ).items() + }, + **{ + "cable_branch": benchcab.get_cable.svn_info_show_item( + internal.CWD / internal.SRC_DIR / self.branch_name, "url" + ), + "svn_revision_number": benchcab.get_cable.svn_info_show_item( + internal.CWD / internal.SRC_DIR / self.branch_name, + "revision", + ), + }, + } ) @@ -239,6 +320,39 @@ def get_fluxnet_tasks( return tasks +def run_tasks(tasks: list[Task], verbose=False): + """Runs tasks in `tasks` serially.""" + for task in tasks: + task.run(verbose=verbose) + + +def run_tasks_in_parallel(tasks: list[Task], verbose=False): + """Runs tasks in `tasks` in parallel across multiple processes.""" + + task_queue: multiprocessing.Queue = multiprocessing.Queue() + for task in tasks: + task_queue.put(task) + + processes = [] + for _ in range(internal.NCPUS): + proc = multiprocessing.Process(target=worker_run, args=[task_queue, verbose]) + proc.start() + processes.append(proc) + + for proc in processes: + proc.join() + + +def worker_run(task_queue: multiprocessing.Queue, verbose=False): + """Runs tasks in `task_queue` until the queue is emptied.""" + while True: + try: + task = task_queue.get_nowait() + except queue.Empty: + return + task.run(verbose=verbose) + + def get_fluxnet_comparisons(tasks: list[Task]) -> list[tuple[Task, Task]]: """Returns a list of pairs of fluxnet tasks to run comparisons with. @@ -264,3 +378,83 @@ def get_fluxnet_comparisons(tasks: list[Task]) -> list[tuple[Task, Task]]: # range(len(realisations)), 2 # ) ] + + +def get_comparison_name(task_a: Task, task_b: Task) -> str: + """Returns the naming convention used for bitwise comparisons. + + Assumes `met_forcing_file` and `sci_conf_id` attributes are + common to both tasks. + """ + met_forcing_base_filename = task_a.met_forcing_file.split(".")[0] + return ( + f"{met_forcing_base_filename}_S{task_a.sci_conf_id}" + f"_R{task_a.branch_id}_R{task_b.branch_id}" + ) + + +def run_comparisons(comparisons: list[tuple[Task, Task]], verbose=False): + """Runs bitwise comparison tasks serially.""" + for task_a, task_b in comparisons: + run_comparison(task_a, task_b, verbose=verbose) + + +def run_comparisons_in_parallel(comparisons: list[tuple[Task, Task]], verbose=False): + """Runs bitwise comparison tasks in parallel across multiple processes.""" + + task_queue: multiprocessing.Queue = multiprocessing.Queue() + for pair in comparisons: + task_queue.put(pair) + + processes = [] + for _ in range(internal.NCPUS): + proc = multiprocessing.Process( + target=worker_comparison, args=[task_queue, verbose] + ) + proc.start() + processes.append(proc) + + for proc in processes: + proc.join() + + +def worker_comparison(task_queue: multiprocessing.Queue, verbose=False): + """Runs bitwise comparison tasks in `task_queue` until the queue is emptied.""" + while True: + try: + task_a, task_b = task_queue.get_nowait() + except queue.Empty: + return + run_comparison(task_a, task_b, verbose=verbose) + + +def run_comparison(task_a: Task, task_b: Task, verbose=False): + """Executes `nccmp -df` between the NetCDF output file of `task_a` and of `task_b`.""" + task_a_output = ( + internal.CWD / internal.SITE_OUTPUT_DIR / task_a.get_output_filename() + ) + task_b_output = ( + internal.CWD / internal.SITE_OUTPUT_DIR / task_b.get_output_filename() + ) + output_file = ( + internal.CWD + / internal.SITE_BITWISE_CMP_DIR + / f"{get_comparison_name(task_a, task_b)}.txt" + ) + if verbose: + print( + f"Comparing files {task_a_output.name} and {task_b_output.name} bitwise..." + ) + cmd = f"nccmp -df {task_a_output} {task_b_output} 2>&1" + if verbose: + print(f" {cmd}") + proc = subprocess.run(cmd, shell=True, check=False, capture_output=True, text=True) + if proc.returncode != 0: + with open(output_file, "w", encoding="utf-8") as file: + file.write(proc.stdout) + print( + f"Failure: files {task_a_output.name} {task_b_output.name} differ. " + f"Results of diff have been written to {output_file}" + ) + else: + print(f"Success: files {task_a_output.name} {task_b_output.name} are identical") diff --git a/docs/user_guide/expected_output.md b/docs/user_guide/expected_output.md index e6ae987b..502e0285 100644 --- a/docs/user_guide/expected_output.md +++ b/docs/user_guide/expected_output.md @@ -41,13 +41,9 @@ Running FLUXNET tasks... Successfully ran FLUXNET tasks Running comparison tasks... -Success: files AU-Tum_2002-2017_OzFlux_Met_R0_S2_out.nc AU-Tum_2002-2017_OzFlux_Met_R1_S2_out.nc are identitical -Success: files AU-Tum_2002-2017_OzFlux_Met_R0_S6_out.nc AU-Tum_2002-2017_OzFlux_Met_R1_S6_out.nc are identitical -Success: files AU-Tum_2002-2017_OzFlux_Met_R0_S7_out.nc AU-Tum_2002-2017_OzFlux_Met_R1_S7_out.nc are identitical -Success: files AU-Tum_2002-2017_OzFlux_Met_R0_S3_out.nc AU-Tum_2002-2017_OzFlux_Met_R1_S3_out.nc are identitical -Success: files AU-Tum_2002-2017_OzFlux_Met_R0_S0_out.nc AU-Tum_2002-2017_OzFlux_Met_R1_S0_out.nc are identitical -Success: files AU-Tum_2002-2017_OzFlux_Met_R0_S5_out.nc AU-Tum_2002-2017_OzFlux_Met_R1_S5_out.nc are identitical -Success: files AU-Tum_2002-2017_OzFlux_Met_R0_S4_out.nc AU-Tum_2002-2017_OzFlux_Met_R1_S4_out.nc are identitical -Success: files AU-Tum_2002-2017_OzFlux_Met_R0_S1_out.nc AU-Tum_2002-2017_OzFlux_Met_R1_S1_out.nc are identitical +Success: files AU-Tum_2002-2017_OzFlux_Met_R0_S0_out.nc AU-Tum_2002-2017_OzFlux_Met_R1_S0_out.nc are identical +Success: files AU-Tum_2002-2017_OzFlux_Met_R0_S1_out.nc AU-Tum_2002-2017_OzFlux_Met_R1_S1_out.nc are identical +Success: files AU-Tum_2002-2017_OzFlux_Met_R0_S2_out.nc AU-Tum_2002-2017_OzFlux_Met_R1_S2_out.nc are identical +Success: files AU-Tum_2002-2017_OzFlux_Met_R0_S3_out.nc AU-Tum_2002-2017_OzFlux_Met_R1_S3_out.nc are identical Successfully ran comparison tasks ``` diff --git a/tests/common.py b/tests/common.py index 01952f49..6673d053 100644 --- a/tests/common.py +++ b/tests/common.py @@ -3,7 +3,7 @@ import os from pathlib import Path -TMP_DIR = Path(os.environ["TMPDIR"], "benchcab_tests") +MOCK_CWD = TMP_DIR = Path(os.environ["TMPDIR"], "benchcab_tests") def make_barebones_config() -> dict: diff --git a/tests/conftest.py b/tests/conftest.py index d673a877..af8546d6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,9 +1,10 @@ """Contains pytest fixtures accessible to all tests in the same directory.""" import shutil +import unittest.mock import pytest -from .common import TMP_DIR +from .common import MOCK_CWD @pytest.fixture(autouse=True) @@ -11,12 +12,15 @@ def run_around_tests(): """`pytest` autouse fixture that runs around each test.""" # Setup: - if TMP_DIR.exists(): - shutil.rmtree(TMP_DIR) - TMP_DIR.mkdir() + if MOCK_CWD.exists(): + shutil.rmtree(MOCK_CWD) + MOCK_CWD.mkdir() + patcher = unittest.mock.patch("benchcab.internal.CWD", MOCK_CWD) + patcher.start() # Run the test: yield # Teardown: - shutil.rmtree(TMP_DIR) + patcher.stop() + shutil.rmtree(MOCK_CWD) diff --git a/tests/test_bench_config.py b/tests/test_bench_config.py index 994d6cb3..c2639983 100644 --- a/tests/test_bench_config.py +++ b/tests/test_bench_config.py @@ -7,7 +7,7 @@ from tests.common import TMP_DIR from tests.common import make_barebones_config from benchcab.bench_config import check_config, read_config -from benchcab.internal import DEFAULT_SCIENCE_CONFIGURATIONS +from benchcab import internal def test_check_config(): @@ -34,11 +34,7 @@ def test_check_config(): # Success case: test config when realisations contains more than two keys config = make_barebones_config() - config["realisations"].append( - { - "path": "path/to/my_new_branch", - } - ) + config["realisations"].append({"path": "path/to/my_new_branch"}) assert len(config["realisations"]) > 2 check_config(config) @@ -60,129 +56,183 @@ def test_check_config(): check_config(config) # Failure case: test config without project key raises an exception - with pytest.raises(ValueError): + with pytest.raises( + ValueError, + match="The config file does not list all required entries. " + "Those are: " + ", ".join(internal.CONFIG_REQUIRED_KEYS), + ): config = make_barebones_config() config.pop("project") check_config(config) # Failure case: test config without realisations key raises an exception - with pytest.raises(ValueError): + with pytest.raises( + ValueError, + match="The config file does not list all required entries. " + "Those are: " + ", ".join(internal.CONFIG_REQUIRED_KEYS), + ): config = make_barebones_config() config.pop("realisations") check_config(config) # Failure case: test config with empty realisations key raises an exception - with pytest.raises(ValueError): + with pytest.raises(ValueError, match="The 'realisations' key cannot be empty."): config = make_barebones_config() config["realisations"] = [] check_config(config) # Failure case: test config without modules key raises an exception - with pytest.raises(ValueError): + with pytest.raises( + ValueError, + match="The config file does not list all required entries. " + "Those are: " + ", ".join(internal.CONFIG_REQUIRED_KEYS), + ): config = make_barebones_config() config.pop("modules") check_config(config) # Failure case: test config without experiment key raises an exception - with pytest.raises(ValueError): + with pytest.raises( + ValueError, + match="The config file does not list all required entries. " + "Those are: " + ", ".join(internal.CONFIG_REQUIRED_KEYS), + ): config = make_barebones_config() config.pop("experiment") check_config(config) # Failure case: test config with invalid experiment key raises an exception - with pytest.raises(ValueError): + with pytest.raises( + ValueError, + match="The 'experiment' key is invalid.\n" + "Valid experiments are: " + + ", ".join( + list(internal.MEORG_EXPERIMENTS) + + internal.MEORG_EXPERIMENTS["five-site-test"] + ), + ): config = make_barebones_config() config["experiment"] = "foo" check_config(config) # Failure case: test config with invalid experiment key (not a subset of # five-site-test) raises an exception - with pytest.raises(ValueError): + with pytest.raises( + ValueError, + match="The 'experiment' key is invalid.\n" + "Valid experiments are: " + + ", ".join( + list(internal.MEORG_EXPERIMENTS) + + internal.MEORG_EXPERIMENTS["five-site-test"] + ), + ): config = make_barebones_config() config["experiment"] = "CH-Dav" check_config(config) # Failure case: 'path' key is missing in branch configuration - with pytest.raises(ValueError): + with pytest.raises( + ValueError, match="Realisation '1' must specify the `path` field." + ): config = make_barebones_config() config["realisations"][1].pop("path") check_config(config) # Failure case: test config with empty science_configurations key # raises an exception - with pytest.raises(ValueError): + with pytest.raises( + ValueError, match="The 'science_configurations' key cannot be empty." + ): config = make_barebones_config() config["science_configurations"] = [] check_config(config) # Failure case: project key is not a string - with pytest.raises(TypeError): + with pytest.raises(TypeError, match="The 'project' key must be a string."): config = make_barebones_config() config["project"] = 123 check_config(config) # Failure case: realisations key is not a list - with pytest.raises(TypeError): + with pytest.raises(TypeError, match="The 'realisations' key must be a list."): config = make_barebones_config() config["realisations"] = {"foo": "bar"} check_config(config) # Failure case: realisations key is not a list of dict - with pytest.raises(TypeError): + with pytest.raises(TypeError, match="Realisation '0' must be a dictionary object."): config = make_barebones_config() - config["realisations"] = ["foo", "bar"] + config["realisations"] = ["foo"] check_config(config) # Failure case: type of name is not a string - with pytest.raises(TypeError): + with pytest.raises( + TypeError, match="The 'name' field in realisation '1' must be a string." + ): config = make_barebones_config() config["realisations"][1]["name"] = 1234 check_config(config) # Failure case: type of path is not a string - with pytest.raises(TypeError): + with pytest.raises( + TypeError, match="The 'path' field in realisation '1' must be a string." + ): config = make_barebones_config() config["realisations"][1]["path"] = 1234 check_config(config) # Failure case: type of revision key is not an integer - with pytest.raises(TypeError): + with pytest.raises( + TypeError, match="The 'revision' field in realisation '1' must be an integer." + ): config = make_barebones_config() config["realisations"][1]["revision"] = "-1" check_config(config) # Failure case: type of patch key is not a dictionary - with pytest.raises(TypeError): + with pytest.raises( + TypeError, + match="The 'patch' field in realisation '1' must be a dictionary that is " + "compatible with the f90nml python package.", + ): config = make_barebones_config() config["realisations"][1]["patch"] = r"cable_user%ENABLE_SOME_FEATURE = .FALSE." check_config(config) # Failure case: type of build_script key is not a string - with pytest.raises(TypeError): + with pytest.raises( + TypeError, match="The 'build_script' field in realisation '1' must be a string." + ): config = make_barebones_config() config["realisations"][1]["build_script"] = ["echo", "hello"] check_config(config) # Failure case: modules key is not a list - with pytest.raises(TypeError): + with pytest.raises(TypeError, match="The 'modules' key must be a list."): config = make_barebones_config() config["modules"] = "netcdf" check_config(config) # Failure case: experiment key is not a string - with pytest.raises(TypeError): + with pytest.raises(TypeError, match="The 'experiment' key must be a string."): config = make_barebones_config() config["experiment"] = 0 check_config(config) # Failure case: type of config["science_configurations"] is not a list - with pytest.raises(TypeError): + with pytest.raises( + TypeError, match="The 'science_configurations' key must be a list." + ): config = make_barebones_config() config["science_configurations"] = r"cable_user%GS_SWITCH = 'medlyn'" check_config(config) # Failure case: type of config["science_configurations"] is not a list of dict - with pytest.raises(TypeError): + with pytest.raises( + TypeError, + match="Science config settings must be specified using a dictionary " + "that is compatible with the f90nml python package.", + ): config = make_barebones_config() config["science_configurations"] = [r"cable_user%GS_SWITCH = 'medlyn'"] check_config(config) @@ -259,4 +309,4 @@ def test_read_config(): res = read_config(filename) os.remove(filename) assert config != res - assert res["science_configurations"] == DEFAULT_SCIENCE_CONFIGURATIONS + assert res["science_configurations"] == internal.DEFAULT_SCIENCE_CONFIGURATIONS diff --git a/tests/test_benchtree.py b/tests/test_benchtree.py index f7212f78..1a9a5a48 100644 --- a/tests/test_benchtree.py +++ b/tests/test_benchtree.py @@ -1,9 +1,12 @@ """`pytest` tests for benchtree.py""" +import io +import contextlib +import shutil from pathlib import Path -from tests.common import TMP_DIR +from tests.common import MOCK_CWD from tests.common import make_barebones_config from benchcab.task import Task from benchcab.benchtree import ( @@ -42,24 +45,64 @@ def test_setup_directory_tree(): # Success case: generate fluxnet directory structure tasks = setup_mock_tasks() - setup_fluxnet_directory_tree(fluxnet_tasks=tasks, root_dir=TMP_DIR) - - assert len(list(TMP_DIR.glob("*"))) == 1 - assert Path(TMP_DIR, "runs").exists() - assert Path(TMP_DIR, "runs", "site").exists() - assert Path(TMP_DIR, "runs", "site", "logs").exists() - assert Path(TMP_DIR, "runs", "site", "outputs").exists() - assert Path(TMP_DIR, "runs", "site", "analysis", "bitwise-comparisons").exists() - assert Path(TMP_DIR, "runs", "site", "tasks").exists() - - assert Path(TMP_DIR, "runs", "site", "tasks", "site_foo_R0_S0").exists() - assert Path(TMP_DIR, "runs", "site", "tasks", "site_foo_R0_S1").exists() - assert Path(TMP_DIR, "runs", "site", "tasks", "site_bar_R0_S0").exists() - assert Path(TMP_DIR, "runs", "site", "tasks", "site_bar_R0_S1").exists() - assert Path(TMP_DIR, "runs", "site", "tasks", "site_foo_R1_S0").exists() - assert Path(TMP_DIR, "runs", "site", "tasks", "site_foo_R1_S1").exists() - assert Path(TMP_DIR, "runs", "site", "tasks", "site_bar_R1_S0").exists() - assert Path(TMP_DIR, "runs", "site", "tasks", "site_bar_R1_S1").exists() + setup_fluxnet_directory_tree(fluxnet_tasks=tasks) + + assert len(list(MOCK_CWD.glob("*"))) == 1 + assert Path(MOCK_CWD, "runs").exists() + assert Path(MOCK_CWD, "runs", "site").exists() + assert Path(MOCK_CWD, "runs", "site", "logs").exists() + assert Path(MOCK_CWD, "runs", "site", "outputs").exists() + assert Path(MOCK_CWD, "runs", "site", "tasks").exists() + assert Path(MOCK_CWD, "runs", "site", "analysis", "bitwise-comparisons").exists() + + assert Path(MOCK_CWD, "runs", "site", "tasks", "site_foo_R0_S0").exists() + assert Path(MOCK_CWD, "runs", "site", "tasks", "site_foo_R0_S1").exists() + assert Path(MOCK_CWD, "runs", "site", "tasks", "site_bar_R0_S0").exists() + assert Path(MOCK_CWD, "runs", "site", "tasks", "site_bar_R0_S1").exists() + assert Path(MOCK_CWD, "runs", "site", "tasks", "site_foo_R1_S0").exists() + assert Path(MOCK_CWD, "runs", "site", "tasks", "site_foo_R1_S1").exists() + assert Path(MOCK_CWD, "runs", "site", "tasks", "site_bar_R1_S0").exists() + assert Path(MOCK_CWD, "runs", "site", "tasks", "site_bar_R1_S1").exists() + + shutil.rmtree(MOCK_CWD / "runs") + + # Success case: test non-verbose output + with contextlib.redirect_stdout(io.StringIO()) as buf: + setup_fluxnet_directory_tree(fluxnet_tasks=tasks) + assert buf.getvalue() == ( + f"Creating runs/site/logs directory: {MOCK_CWD}/runs/site/logs\n" + f"Creating runs/site/outputs directory: {MOCK_CWD}/runs/site/outputs\n" + f"Creating runs/site/tasks directory: {MOCK_CWD}/runs/site/tasks\n" + f"Creating runs/site/analysis directory: {MOCK_CWD}/runs/site/analysis\n" + f"Creating runs/site/analysis/bitwise-comparisons directory: {MOCK_CWD}" + "/runs/site/analysis/bitwise-comparisons\n" + f"Creating task directories...\n" + ) + + shutil.rmtree(MOCK_CWD / "runs") + + # Success case: test verbose output + with contextlib.redirect_stdout(io.StringIO()) as buf: + setup_fluxnet_directory_tree(fluxnet_tasks=tasks, verbose=True) + assert buf.getvalue() == ( + f"Creating runs/site/logs directory: {MOCK_CWD}/runs/site/logs\n" + f"Creating runs/site/outputs directory: {MOCK_CWD}/runs/site/outputs\n" + f"Creating runs/site/tasks directory: {MOCK_CWD}/runs/site/tasks\n" + f"Creating runs/site/analysis directory: {MOCK_CWD}/runs/site/analysis\n" + f"Creating runs/site/analysis/bitwise-comparisons directory: {MOCK_CWD}" + "/runs/site/analysis/bitwise-comparisons\n" + f"Creating task directories...\n" + f"Creating runs/site/tasks/site_foo_R0_S0: {MOCK_CWD}/runs/site/tasks/site_foo_R0_S0\n" + f"Creating runs/site/tasks/site_foo_R0_S1: {MOCK_CWD}/runs/site/tasks/site_foo_R0_S1\n" + f"Creating runs/site/tasks/site_bar_R0_S0: {MOCK_CWD}/runs/site/tasks/site_bar_R0_S0\n" + f"Creating runs/site/tasks/site_bar_R0_S1: {MOCK_CWD}/runs/site/tasks/site_bar_R0_S1\n" + f"Creating runs/site/tasks/site_foo_R1_S0: {MOCK_CWD}/runs/site/tasks/site_foo_R1_S0\n" + f"Creating runs/site/tasks/site_foo_R1_S1: {MOCK_CWD}/runs/site/tasks/site_foo_R1_S1\n" + f"Creating runs/site/tasks/site_bar_R1_S0: {MOCK_CWD}/runs/site/tasks/site_bar_R1_S0\n" + f"Creating runs/site/tasks/site_bar_R1_S1: {MOCK_CWD}/runs/site/tasks/site_bar_R1_S1\n" + ) + + shutil.rmtree(MOCK_CWD / "runs") def test_clean_directory_tree(): @@ -67,19 +110,19 @@ def test_clean_directory_tree(): # Success case: directory tree does not exist after clean tasks = setup_mock_tasks() - setup_fluxnet_directory_tree(fluxnet_tasks=tasks, root_dir=TMP_DIR) + setup_fluxnet_directory_tree(fluxnet_tasks=tasks) - clean_directory_tree(root_dir=TMP_DIR) - assert not Path(TMP_DIR, "runs").exists() + clean_directory_tree() + assert not Path(MOCK_CWD, "runs").exists() - setup_src_dir(root_dir=TMP_DIR) - clean_directory_tree(root_dir=TMP_DIR) - assert not Path(TMP_DIR, "src").exists() + setup_src_dir() + clean_directory_tree() + assert not Path(MOCK_CWD, "src").exists() def test_setup_src_dir(): """Tests for `setup_src_dir()`.""" # Success case: make src directory - setup_src_dir(root_dir=TMP_DIR) - assert Path(TMP_DIR, "src").exists() + setup_src_dir() + assert Path(MOCK_CWD, "src").exists() diff --git a/tests/test_build_cable.py b/tests/test_build_cable.py new file mode 100644 index 00000000..993cd3ed --- /dev/null +++ b/tests/test_build_cable.py @@ -0,0 +1,221 @@ +"""`pytest` tests for build_cable.py""" + +import subprocess +import unittest.mock +import io +import contextlib +import pytest + +from benchcab import internal +from benchcab.build_cable import patch_build_script, default_build, custom_build +from benchcab import environment_modules +from .common import MOCK_CWD + + +def test_patch_build_script(): + """Tests for `patch_build_script()`.""" + file_path = MOCK_CWD / "test-build.sh" + with open(file_path, "w", encoding="utf-8") as file: + file.write( + """#!/bin/bash +module add bar +module purge + +host_gadi() +{ + . /etc/bashrc + module purge + module add intel-compiler/2019.5.281 + module add netcdf/4.6.3 + module load foo + modules + echo foo && module load + echo foo # module load + # module load foo + + if [[ $1 = 'mpi' ]]; then + module add intel-mpi/2019.5.281 + fi +} +""" + ) + + patch_build_script(file_path) + + with open(file_path, "r", encoding="utf-8") as file: + assert file.read() == ( + """#!/bin/bash + +host_gadi() +{ + . /etc/bashrc + modules + echo foo # module load + # module load foo + + if [[ $1 = 'mpi' ]]; then + fi +} +""" + ) + + +def test_default_build(): + """Tests for `default_build()`.""" + offline_dir = MOCK_CWD / internal.SRC_DIR / "test-branch" / "offline" + offline_dir.mkdir(parents=True) + build_script_path = offline_dir / "build3.sh" + build_script_path.touch() + + # Success case: execute the default build command + with unittest.mock.patch( + "benchcab.environment_modules.module_load" + ), unittest.mock.patch( + "benchcab.environment_modules.module_unload" + ), unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run: + default_build("test-branch", ["foo", "bar"]) + mock_subprocess_run.assert_called_once_with( + "./tmp-build3.sh", + shell=True, + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + ) + + # Success case: test non-verbose output + with unittest.mock.patch( + "benchcab.environment_modules.module_load" + ), unittest.mock.patch( + "benchcab.environment_modules.module_unload" + ), unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run, contextlib.redirect_stdout( + io.StringIO() + ) as buf: + default_build("test-branch", ["foo", "bar"]) + assert buf.getvalue() == ( + "Compiling CABLE serially for realisation test-branch...\n" + ) + + # Success case: test verbose output + with unittest.mock.patch( + "benchcab.environment_modules.module_load" + ), unittest.mock.patch( + "benchcab.environment_modules.module_unload" + ), unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run, contextlib.redirect_stdout( + io.StringIO() + ) as buf: + default_build("test-branch", ["foo", "bar"], verbose=True) + mock_subprocess_run.assert_called_once_with( + "./tmp-build3.sh", + shell=True, + check=True, + stdout=None, + stderr=subprocess.STDOUT, + ) + assert buf.getvalue() == ( + "Compiling CABLE serially for realisation test-branch...\n" + f" Copying {build_script_path} to {build_script_path.parent}/tmp-build3.sh\n" + f" chmod +x {build_script_path.parent}/tmp-build3.sh\n" + " Patching tmp-build3.sh: remove lines that call environment " + "modules\n" + f" Loading modules: foo bar\n" + f" ./tmp-build3.sh\n" + f" Unloading modules: foo bar\n" + ) + + # Failure case: cannot load modules + with unittest.mock.patch( + "benchcab.environment_modules.module_load" + ) as mock_module_load, unittest.mock.patch( + "benchcab.environment_modules.module_unload" + ), unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run: + mock_module_load.side_effect = environment_modules.EnvironmentModulesError + with pytest.raises(environment_modules.EnvironmentModulesError): + default_build("test-branch", ["foo", "bar"]) + + # Failure case: cannot unload modules + with unittest.mock.patch( + "benchcab.environment_modules.module_load" + ), unittest.mock.patch( + "benchcab.environment_modules.module_unload" + ) as mock_module_unload, unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run: + mock_module_unload.side_effect = environment_modules.EnvironmentModulesError + with pytest.raises(environment_modules.EnvironmentModulesError): + default_build("test-branch", ["foo", "bar"]) + + # Failure case: cannot find default build script + build_script_path.unlink() + with unittest.mock.patch( + "benchcab.environment_modules.module_load" + ), unittest.mock.patch( + "benchcab.environment_modules.module_unload" + ), unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run: + with pytest.raises( + FileNotFoundError, + match=f"The default build script, {MOCK_CWD}/src/test-branch/offline/build3.sh, " + "could not be found. Do you need to specify a different build script with the " + "'build_script' option in config.yaml?", + ): + default_build("test-branch", ["foo", "bar"], verbose=True) + + +def test_custom_build(): + """Tests for `custom_build()`.""" + offline_dir = MOCK_CWD / internal.SRC_DIR / "test-branch" / "offline" + offline_dir.mkdir(parents=True) + build_script_path = offline_dir / "custom-build.sh" + build_script_path.touch() + + # Success case: execute custom build command + with unittest.mock.patch( + "benchcab.environment_modules.module_load" + ), unittest.mock.patch( + "benchcab.environment_modules.module_unload" + ), unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run: + custom_build(build_script_path, "test-branch") + mock_subprocess_run.assert_called_once_with( + f"./{build_script_path.name}", + shell=True, + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + ) + + # Success case: test non-verbose output + with unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run, contextlib.redirect_stdout(io.StringIO()) as buf: + custom_build(build_script_path, "test-branch") + assert buf.getvalue() == ( + "Compiling CABLE using custom build script for realisation test-branch...\n" + ) + + # Success case: test verbose output + with unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run, contextlib.redirect_stdout(io.StringIO()) as buf: + custom_build(build_script_path, "test-branch", verbose=True) + mock_subprocess_run.assert_called_once_with( + f"./{build_script_path.name}", + shell=True, + check=True, + stdout=None, + stderr=subprocess.STDOUT, + ) + assert buf.getvalue() == ( + "Compiling CABLE using custom build script for realisation test-branch...\n" + f" ./{build_script_path.name}\n" + ) diff --git a/tests/test_get_cable.py b/tests/test_get_cable.py new file mode 100644 index 00000000..c435621f --- /dev/null +++ b/tests/test_get_cable.py @@ -0,0 +1,251 @@ +"""`pytest` tests for get_cable.py""" + +import subprocess +import unittest.mock +import shutil +import io +import contextlib +import pytest + +from benchcab import internal +from benchcab.get_cable import ( + checkout_cable, + checkout_cable_auxiliary, + svn_info_show_item, + next_path, +) +from .common import MOCK_CWD + + +def setup_mock_branch_config() -> dict: + """Returns a mock branch config.""" + return { + "name": "trunk", + "revision": 9000, + "path": "trunk", + "patch": {}, + "build_script": "", + } + + +def mock_svn_info_show_item(*args, **kwargs): # pylint: disable=unused-argument + """Side effect function used to mock `svn_info_show_item()`""" + item = args[1] + return {"url": "/url/to/test-branch", "revision": "123"}[item] + + +def test_checkout_cable(): + """Tests for `checkout_cable()`.""" + + with unittest.mock.patch( + "benchcab.get_cable.svn_info_show_item", mock_svn_info_show_item + ): + # Success case: checkout mock branch repository from SVN + with unittest.mock.patch("subprocess.run") as mock_subprocess_run: + branch_config = setup_mock_branch_config() + path = checkout_cable(branch_config) + assert path == MOCK_CWD / "src" / "trunk" + mock_subprocess_run.assert_called_once_with( + "svn checkout -r 9000 https://trac.nci.org.au/svn/cable/trunk " + f"{MOCK_CWD}/src/trunk", + shell=True, + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + ) + + # Success case: specify default revision number + with unittest.mock.patch("subprocess.run") as mock_subprocess_run: + branch_config = setup_mock_branch_config() + branch_config["revision"] = -1 + path = checkout_cable(branch_config) + assert path == MOCK_CWD / "src" / "trunk" + mock_subprocess_run.assert_called_once_with( + f"svn checkout https://trac.nci.org.au/svn/cable/trunk {MOCK_CWD}/src/trunk", + shell=True, + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + ) + + # Success case: test non-verbose output + with unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run, contextlib.redirect_stdout(io.StringIO()) as buf: + checkout_cable(branch_config) + assert buf.getvalue() == "Successfully checked out trunk at revision 123\n" + + # Success case: test verbose output + with unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run, contextlib.redirect_stdout(io.StringIO()) as buf: + checkout_cable(branch_config, verbose=True) + mock_subprocess_run.assert_called_once_with( + "svn checkout https://trac.nci.org.au/svn/cable/trunk " + f"{MOCK_CWD}/src/trunk", + shell=True, + check=True, + stdout=None, + stderr=subprocess.STDOUT, + ) + assert buf.getvalue() == ( + f"svn checkout https://trac.nci.org.au/svn/cable/trunk {MOCK_CWD}/src/trunk\n" + "Successfully checked out trunk at revision 123\n" + ) + + +def test_checkout_cable_auxiliary(): + """Tests for `checkout_cable_auxiliary()`.""" + + grid_file_path = MOCK_CWD / internal.GRID_FILE + phen_file_path = MOCK_CWD / internal.PHEN_FILE + cnpbiome_file_path = MOCK_CWD / internal.CNPBIOME_FILE + + # Generate mock files in CABLE-AUX as a side effect + def touch_files(*args, **kwargs): # pylint: disable=unused-argument + grid_file_path.parent.mkdir(parents=True, exist_ok=True) + grid_file_path.touch() + phen_file_path.parent.mkdir(parents=True, exist_ok=True) + phen_file_path.touch() + cnpbiome_file_path.parent.mkdir(parents=True, exist_ok=True) + cnpbiome_file_path.touch() + + with unittest.mock.patch( + "benchcab.get_cable.svn_info_show_item", mock_svn_info_show_item + ): + # Success case: checkout CABLE-AUX repository + with unittest.mock.patch("subprocess.run") as mock_subprocess_run: + mock_subprocess_run.side_effect = touch_files + checkout_cable_auxiliary() + mock_subprocess_run.assert_called_once_with( + "svn checkout https://trac.nci.org.au/svn/cable/branches/Share/CABLE-AUX " + f"{MOCK_CWD}/{internal.CABLE_AUX_DIR}", + shell=True, + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + ) + shutil.rmtree(MOCK_CWD / internal.CABLE_AUX_DIR) + + # Success case: test non-verbose output + with unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run, contextlib.redirect_stdout(io.StringIO()) as buf: + mock_subprocess_run.side_effect = touch_files + checkout_cable_auxiliary() + assert buf.getvalue() == "Successfully checked out CABLE-AUX at revision 123\n" + shutil.rmtree(MOCK_CWD / internal.CABLE_AUX_DIR) + + # Success case: test verbose output + with unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run, contextlib.redirect_stdout(io.StringIO()) as buf: + mock_subprocess_run.side_effect = touch_files + checkout_cable_auxiliary(verbose=True) + mock_subprocess_run.assert_called_once_with( + "svn checkout https://trac.nci.org.au/svn/cable/branches/Share/CABLE-AUX " + f"{MOCK_CWD}/{internal.CABLE_AUX_DIR}", + shell=True, + check=True, + stdout=None, + stderr=subprocess.STDOUT, + ) + assert buf.getvalue() == ( + "svn checkout https://trac.nci.org.au/svn/cable/branches/Share/CABLE-AUX " + f"{MOCK_CWD}/{internal.CABLE_AUX_DIR}\n" + "Successfully checked out CABLE-AUX at revision 123\n" + ) + shutil.rmtree(MOCK_CWD / internal.CABLE_AUX_DIR) + + with unittest.mock.patch("subprocess.run"): + # Failure case: missing grid file in CABLE-AUX repository + touch_files() + grid_file_path.unlink() + with pytest.raises( + RuntimeError, + match=f"Error checking out CABLE-AUX: cannot find file '{internal.GRID_FILE}'", + ): + checkout_cable_auxiliary() + shutil.rmtree(MOCK_CWD / internal.CABLE_AUX_DIR) + + # Failure case: missing phen file in CABLE-AUX repository + touch_files() + phen_file_path.unlink() + with pytest.raises( + RuntimeError, + match=f"Error checking out CABLE-AUX: cannot find file '{internal.PHEN_FILE}'", + ): + checkout_cable_auxiliary() + shutil.rmtree(MOCK_CWD / internal.CABLE_AUX_DIR) + + # Failure case: missing cnpbiome file in CABLE-AUX repository + touch_files() + cnpbiome_file_path.unlink() + with pytest.raises( + RuntimeError, + match=f"Error checking out CABLE-AUX: cannot find file '{internal.CNPBIOME_FILE}'", + ): + checkout_cable_auxiliary() + shutil.rmtree(MOCK_CWD / internal.CABLE_AUX_DIR) + + +def test_svn_info_show_item(): + """Tests for `svn_info_show_item()`.""" + + # Success case: run command for mock item + with unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run, unittest.mock.patch( + "subprocess.CompletedProcess" + ) as mock_completed_process: + mock_completed_process.configure_mock( + **{"stdout": "standard output from command"} + ) + mock_subprocess_run.return_value = mock_completed_process + ret = svn_info_show_item("foo", "some-mock-item") + assert ret == "standard output from command" + mock_subprocess_run.assert_called_once_with( + "svn info --show-item some-mock-item foo", + shell=True, + capture_output=True, + text=True, + check=True, + ) + + # Success case: test leading and trailing white space is removed from standard output + with unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run, unittest.mock.patch( + "subprocess.CompletedProcess" + ) as mock_completed_process: + mock_completed_process.configure_mock( + **{"stdout": " standard output from command\n "} + ) + mock_subprocess_run.return_value = mock_completed_process + ret = svn_info_show_item("foo", "some-mock-item") + assert ret == "standard output from command" + mock_subprocess_run.assert_called_once_with( + "svn info --show-item some-mock-item foo", + shell=True, + capture_output=True, + text=True, + check=True, + ) + + +def test_next_path(): + """Tests for `next_path()`.""" + + pattern = "rev_number-*.log" + + # Success case: get next path in 'empty' CWD + assert len(list(MOCK_CWD.glob(pattern))) == 0 + ret = next_path(pattern) + assert ret == "rev_number-1.log" + + # Success case: get next path in 'non-empty' CWD + ret_path = MOCK_CWD / ret + ret_path.touch() + assert len(list(MOCK_CWD.glob(pattern))) == 1 + ret = next_path(pattern) + assert ret == "rev_number-2.log" diff --git a/tests/test_job_script.py b/tests/test_job_script.py new file mode 100644 index 00000000..a3f43f64 --- /dev/null +++ b/tests/test_job_script.py @@ -0,0 +1,219 @@ +"""`pytest` tests for job_script.py""" + +import unittest.mock +import io +import subprocess +import contextlib +from pathlib import Path +import pytest + +from benchcab import internal +from benchcab.job_script import get_local_storage_flag, create_job_script, submit_job +from .common import MOCK_CWD + + +def test_get_local_storage_flag(): + """Tests for `get_local_storage_flag()`.""" + + # Success case: scratch dir storage flag + assert get_local_storage_flag(Path("/scratch/tm70/foo")) == "scratch/tm70" + + # Success case: gdata storage flag + assert get_local_storage_flag(Path("/g/data/tm70/foo")) == "gdata/tm70" + + # Failure case: invalid path + with pytest.raises( + RuntimeError, match="Current directory structure unknown on Gadi." + ): + get_local_storage_flag(Path("/home/189/foo")) + + +def test_create_job_script(): + """Tests for `create_job_script()`.""" + + # Success case: test default job script creation + with unittest.mock.patch( + "benchcab.job_script.get_local_storage_flag" + ) as mock_get_local_storage_flag: + mock_get_local_storage_flag.return_value = "storage_flag" + create_job_script( + project="tm70", + config_path="/path/to/config.yaml", + modules=["foo", "bar", "baz"], + ) + with open(MOCK_CWD / internal.QSUB_FNAME, "r", encoding="utf-8") as file: + assert ( + file.read() + == f"""#!/bin/bash +#PBS -l wd +#PBS -l ncpus={internal.NCPUS} +#PBS -l mem={internal.MEM} +#PBS -l walltime={internal.WALL_TIME} +#PBS -q normal +#PBS -P tm70 +#PBS -j oe +#PBS -m e +#PBS -l storage=gdata/ks32+gdata/hh5+gdata/tm70+storage_flag + +module purge +module use /g/data/hh5/public/modules +module load conda/analysis3-unstable +module load foo +module load bar +module load baz + +benchcab fluxnet-run-tasks --config=/path/to/config.yaml +if [ $? -ne 0 ]; then + echo 'Error: benchcab fluxnet-run-tasks failed. Exiting...' + exit 1 +fi + +benchcab fluxnet-bitwise-cmp --config=/path/to/config.yaml +if [ $? -ne 0 ]; then + echo 'Error: benchcab fluxnet-bitwise-cmp failed. Exiting...' + exit 1 +fi +""" + ) + + # Success case: skip fluxnet-bitwise-cmp step + with unittest.mock.patch( + "benchcab.job_script.get_local_storage_flag" + ) as mock_get_local_storage_flag: + mock_get_local_storage_flag.return_value = "storage_flag" + create_job_script( + project="tm70", + config_path="/path/to/config.yaml", + modules=["foo", "bar", "baz"], + skip_bitwise_cmp=True, + ) + with open(MOCK_CWD / internal.QSUB_FNAME, "r", encoding="utf-8") as file: + assert ( + file.read() + == f"""#!/bin/bash +#PBS -l wd +#PBS -l ncpus={internal.NCPUS} +#PBS -l mem={internal.MEM} +#PBS -l walltime={internal.WALL_TIME} +#PBS -q normal +#PBS -P tm70 +#PBS -j oe +#PBS -m e +#PBS -l storage=gdata/ks32+gdata/hh5+gdata/tm70+storage_flag + +module purge +module use /g/data/hh5/public/modules +module load conda/analysis3-unstable +module load foo +module load bar +module load baz + +benchcab fluxnet-run-tasks --config=/path/to/config.yaml +if [ $? -ne 0 ]; then + echo 'Error: benchcab fluxnet-run-tasks failed. Exiting...' + exit 1 +fi + +""" + ) + + # Success case: test standard output + with unittest.mock.patch( + "benchcab.job_script.get_local_storage_flag" + ) as mock_get_local_storage_flag: + mock_get_local_storage_flag.return_value = "storage_flag" + with contextlib.redirect_stdout(io.StringIO()) as buf: + create_job_script( + project="tm70", + config_path="/path/to/config.yaml", + modules=["foo", "bar", "baz"], + ) + assert buf.getvalue() == ( + "Creating PBS job script to run FLUXNET tasks on compute " + f"nodes: {internal.QSUB_FNAME}\n" + ) + + # Success case: enable verbose flag + with unittest.mock.patch( + "benchcab.job_script.get_local_storage_flag" + ) as mock_get_local_storage_flag: + mock_get_local_storage_flag.return_value = "storage_flag" + create_job_script( + project="tm70", + config_path="/path/to/config.yaml", + modules=["foo", "bar", "baz"], + verbose=True, + ) + with open(MOCK_CWD / internal.QSUB_FNAME, "r", encoding="utf-8") as file: + assert ( + file.read() + == f"""#!/bin/bash +#PBS -l wd +#PBS -l ncpus={internal.NCPUS} +#PBS -l mem={internal.MEM} +#PBS -l walltime={internal.WALL_TIME} +#PBS -q normal +#PBS -P tm70 +#PBS -j oe +#PBS -m e +#PBS -l storage=gdata/ks32+gdata/hh5+gdata/tm70+storage_flag + +module purge +module use /g/data/hh5/public/modules +module load conda/analysis3-unstable +module load foo +module load bar +module load baz + +benchcab fluxnet-run-tasks --config=/path/to/config.yaml -v +if [ $? -ne 0 ]; then + echo 'Error: benchcab fluxnet-run-tasks failed. Exiting...' + exit 1 +fi + +benchcab fluxnet-bitwise-cmp --config=/path/to/config.yaml -v +if [ $? -ne 0 ]; then + echo 'Error: benchcab fluxnet-bitwise-cmp failed. Exiting...' + exit 1 +fi +""" + ) + + +def test_submit_job(): + """Tests for `submit_job()`.""" + + # Success case: submit PBS job + with unittest.mock.patch("subprocess.run") as mock_subprocess_run: + submit_job() + mock_subprocess_run.assert_called_once_with( + f"qsub {MOCK_CWD/internal.QSUB_FNAME}", + shell=True, + check=True, + capture_output=True, + text=True, + ) + + # Success case: test standard output + with unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run, unittest.mock.patch( + "subprocess.CompletedProcess" + ) as mock_completed_process: + mock_completed_process.configure_mock(stdout="standard output from qsub") + mock_subprocess_run.return_value = mock_completed_process + with contextlib.redirect_stdout(io.StringIO()) as buf: + submit_job() + assert buf.getvalue() == "PBS job submitted: standard output from qsub\n" + + # Failure case: qsub non-zero exit code + with unittest.mock.patch("subprocess.run") as mock_subprocess_run: + mock_subprocess_run.side_effect = subprocess.CalledProcessError( + 1, "dummy-cmd", stderr="standard error from qsub" + ) + with contextlib.redirect_stdout(io.StringIO()) as buf: + with pytest.raises(subprocess.CalledProcessError): + submit_job() + assert buf.getvalue() == ( + "Error when submitting job to NCI queue\n" "standard error from qsub\n" + ) diff --git a/tests/test_task.py b/tests/test_task.py index 5529c5c4..fbdc0112 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -1,12 +1,27 @@ """`pytest` tests for task.py""" +import unittest.mock +import subprocess from pathlib import Path +import io +import contextlib +import pytest import f90nml - -from tests.common import TMP_DIR -from benchcab.task import Task +import netCDF4 + +from benchcab.task import ( + patch_namelist, + get_fluxnet_tasks, + get_fluxnet_comparisons, + get_comparison_name, + run_comparison, + Task, + CableError, +) from benchcab import internal from benchcab.benchtree import setup_fluxnet_directory_tree +from .common import MOCK_CWD, make_barebones_config + def setup_mock_task() -> Task: """Returns a mock `Task` instance.""" @@ -16,44 +31,48 @@ def setup_mock_task() -> Task: branch_patch={"cable": {"some_branch_specific_setting": True}}, met_forcing_file="forcing-file.nc", sci_conf_id=0, - sci_config={"cable": {"some_setting": True}} + sci_config={"cable": {"some_setting": True}}, ) return task def setup_mock_namelists_directory(): - """Setup a mock namelists directory in TMP_DIR.""" - Path(TMP_DIR, internal.NAMELIST_DIR).mkdir() + """Setup a mock namelists directory in MOCK_CWD.""" + Path(MOCK_CWD, internal.NAMELIST_DIR).mkdir() - cable_nml_path = Path(TMP_DIR, internal.NAMELIST_DIR, internal.CABLE_NML) + cable_nml_path = Path(MOCK_CWD, internal.NAMELIST_DIR, internal.CABLE_NML) cable_nml_path.touch() assert cable_nml_path.exists() - cable_soil_nml_path = Path(TMP_DIR, internal.NAMELIST_DIR, internal.CABLE_SOIL_NML) + cable_soil_nml_path = Path(MOCK_CWD, internal.NAMELIST_DIR, internal.CABLE_SOIL_NML) cable_soil_nml_path.touch() assert cable_soil_nml_path.exists() - cable_vegetation_nml_path = Path(TMP_DIR, internal.NAMELIST_DIR, internal.CABLE_VEGETATION_NML) + cable_vegetation_nml_path = Path( + MOCK_CWD, internal.NAMELIST_DIR, internal.CABLE_VEGETATION_NML + ) cable_vegetation_nml_path.touch() assert cable_vegetation_nml_path.exists() def do_mock_checkout_and_build(): """Setup mock repository that has been checked out and built.""" - Path(TMP_DIR, internal.SRC_DIR, "test-branch", "offline").mkdir(parents=True) + Path(MOCK_CWD, internal.SRC_DIR, "test-branch", "offline").mkdir(parents=True) - cable_exe_path = Path(TMP_DIR, internal.SRC_DIR, "test-branch", "offline", internal.CABLE_EXE) + cable_exe_path = Path( + MOCK_CWD, internal.SRC_DIR, "test-branch", "offline", internal.CABLE_EXE + ) cable_exe_path.touch() assert cable_exe_path.exists() def do_mock_run(task: Task): """Make mock log files and output files as if benchcab has just been run.""" - output_path = Path(TMP_DIR, internal.SITE_OUTPUT_DIR, task.get_output_filename()) + output_path = Path(MOCK_CWD, internal.SITE_OUTPUT_DIR, task.get_output_filename()) output_path.touch() assert output_path.exists() - log_path = Path(TMP_DIR, internal.SITE_LOG_DIR, task.get_log_filename()) + log_path = Path(MOCK_CWD, internal.SITE_LOG_DIR, task.get_log_filename()) log_path.touch() assert log_path.exists() @@ -86,19 +105,26 @@ def test_fetch_files(): task = setup_mock_task() setup_mock_namelists_directory() - setup_fluxnet_directory_tree([task], root_dir=TMP_DIR) + setup_fluxnet_directory_tree([task]) do_mock_checkout_and_build() - task.fetch_files(root_dir=TMP_DIR) - - assert Path(TMP_DIR, internal.SITE_TASKS_DIR, - task.get_task_name(), internal.CABLE_NML).exists() - assert Path(TMP_DIR, internal.SITE_TASKS_DIR, - task.get_task_name(), internal.CABLE_VEGETATION_NML).exists() - assert Path(TMP_DIR, internal.SITE_TASKS_DIR, - task.get_task_name(), internal.CABLE_SOIL_NML).exists() - assert Path(TMP_DIR, internal.SITE_TASKS_DIR, - task.get_task_name(), internal.CABLE_EXE).exists() + task.fetch_files() + + assert Path( + MOCK_CWD, internal.SITE_TASKS_DIR, task.get_task_name(), internal.CABLE_NML + ).exists() + assert Path( + MOCK_CWD, + internal.SITE_TASKS_DIR, + task.get_task_name(), + internal.CABLE_VEGETATION_NML, + ).exists() + assert Path( + MOCK_CWD, internal.SITE_TASKS_DIR, task.get_task_name(), internal.CABLE_SOIL_NML + ).exists() + assert Path( + MOCK_CWD, internal.SITE_TASKS_DIR, task.get_task_name(), internal.CABLE_EXE + ).exists() def test_clean_task(): @@ -108,89 +134,499 @@ def test_clean_task(): task = setup_mock_task() setup_mock_namelists_directory() - setup_fluxnet_directory_tree([task], root_dir=TMP_DIR) + setup_fluxnet_directory_tree([task]) do_mock_checkout_and_build() - task.fetch_files(root_dir=TMP_DIR) + task.fetch_files() do_mock_run(task) - task.clean_task(root_dir=TMP_DIR) + task.clean_task() + + assert not Path( + MOCK_CWD, internal.SITE_TASKS_DIR, task.get_task_name(), internal.CABLE_NML + ).exists() + assert not Path( + MOCK_CWD, + internal.SITE_TASKS_DIR, + task.get_task_name(), + internal.CABLE_VEGETATION_NML, + ).exists() + assert not Path( + MOCK_CWD, internal.SITE_TASKS_DIR, task.get_task_name(), internal.CABLE_SOIL_NML + ).exists() + assert not Path( + MOCK_CWD, internal.SITE_TASKS_DIR, task.get_task_name(), internal.CABLE_EXE + ).exists() + assert not Path( + MOCK_CWD, internal.SITE_OUTPUT_DIR, task.get_output_filename() + ).exists() + assert not Path(MOCK_CWD, internal.SITE_LOG_DIR, task.get_log_filename()).exists() + + +def test_patch_namelist(): + """Tests for `patch_namelist()`.""" + + nml_path = MOCK_CWD / "test.nml" + + # Success case: patch non-existing namelist file + assert not nml_path.exists() + patch_namelist(nml_path, {"cable": {"file": "/path/to/file", "bar": 123}}) + assert f90nml.read(nml_path) == { + "cable": { + "file": "/path/to/file", + "bar": 123, + } + } + + # Success case: patch non-empty namelist file + patch_namelist(nml_path, {"cable": {"some": {"parameter": True}, "bar": 456}}) + assert f90nml.read(nml_path) == { + "cable": { + "file": "/path/to/file", + "bar": 456, + "some": {"parameter": True}, + } + } - assert not Path(TMP_DIR, internal.SITE_TASKS_DIR, - task.get_task_name(), internal.CABLE_NML).exists() - assert not Path(TMP_DIR, internal.SITE_TASKS_DIR, - task.get_task_name(), internal.CABLE_VEGETATION_NML).exists() - assert not Path(TMP_DIR, internal.SITE_TASKS_DIR, - task.get_task_name(), internal.CABLE_SOIL_NML).exists() - assert not Path(TMP_DIR, internal.SITE_TASKS_DIR, - task.get_task_name(), internal.CABLE_EXE).exists() - assert not Path(TMP_DIR, internal.SITE_OUTPUT_DIR, task.get_output_filename()).exists() - assert not Path(TMP_DIR, internal.SITE_LOG_DIR, task.get_log_filename()).exists() + # Success case: empty patch does nothing + prev = f90nml.read(nml_path) + patch_namelist(nml_path, {}) + assert f90nml.read(nml_path) == prev -def test_adjust_namelist_file(): - """Tests for `adjust_namelist_file()`.""" +def test_setup_task(): + """Tests for `setup_task()`.""" - # Success case: adjust cable namelist file task = setup_mock_task() - task_dir = Path(TMP_DIR, internal.SITE_TASKS_DIR, task.get_task_name()) + task_dir = Path(MOCK_CWD, internal.SITE_TASKS_DIR, task.get_task_name()) - setup_fluxnet_directory_tree([task], root_dir=TMP_DIR) + setup_mock_namelists_directory() + setup_fluxnet_directory_tree([task]) + do_mock_checkout_and_build() - # Create mock namelist file in task directory: - nml = { - 'cable': { - 'filename': { - 'met': "/path/to/met/file", - 'foo': 123 - }, - 'bar': 123 - } + # Success case: test all settings are patched into task namelist file + task.setup_task() + res_nml = f90nml.read(str(task_dir / internal.CABLE_NML)) + assert res_nml["cable"] == { + "filename": { + "met": str(internal.MET_DIR / "forcing-file.nc"), + "out": str( + MOCK_CWD / internal.SITE_OUTPUT_DIR / task.get_output_filename() + ), + "log": str(MOCK_CWD / internal.SITE_LOG_DIR / task.get_log_filename()), + "restart_out": " ", + "type": str(MOCK_CWD / internal.GRID_FILE), + }, + "output": {"restart": False}, + "fixedco2": internal.CABLE_FIXED_CO2_CONC, + "casafile": { + "phen": str(MOCK_CWD / internal.PHEN_FILE), + "cnpbiome": str(MOCK_CWD / internal.CNPBIOME_FILE), + }, + "spinup": False, + "some_setting": True, + "some_branch_specific_setting": True, } - f90nml.write(nml, str(task_dir / internal.CABLE_NML)) + # Success case: test non-verbose output + with contextlib.redirect_stdout(io.StringIO()) as buf: + task.setup_task() + assert not buf.getvalue() + + # Success case: test verbose output + with contextlib.redirect_stdout(io.StringIO()) as buf: + task.setup_task(verbose=True) + assert buf.getvalue() == ( + "Setting up task: forcing-file_R1_S0\n" + " Cleaning task\n" + f" Copying namelist files from {MOCK_CWD}/namelists to " + f"{MOCK_CWD / 'runs/site/tasks/forcing-file_R1_S0'}\n" + f" Copying CABLE executable from {MOCK_CWD}/src/test-branch/" + f"offline/cable to {MOCK_CWD}/runs/site/tasks/forcing-file_R1_S0/cable\n" + " Adding base configurations to CABLE namelist file " + f"{MOCK_CWD}/runs/site/tasks/forcing-file_R1_S0/cable.nml\n" + " Adding science configurations to CABLE namelist file " + f"{MOCK_CWD}/runs/site/tasks/forcing-file_R1_S0/cable.nml\n" + " Adding branch specific configurations to CABLE namelist file " + f"{MOCK_CWD}/runs/site/tasks/forcing-file_R1_S0/cable.nml\n" + ) + - task.adjust_namelist_file(root_dir=TMP_DIR) +def test_run_cable(): + """Tests for `run_cable()`.""" - res_nml = f90nml.read(str(task_dir / internal.CABLE_NML)) + task = setup_mock_task() + task_dir = MOCK_CWD / internal.SITE_TASKS_DIR / task.get_task_name() + task_dir.mkdir(parents=True) + exe_path = task_dir / internal.CABLE_EXE + exe_path.touch() + nml_path = task_dir / internal.CABLE_NML + nml_path.touch() + + # Success case: run CABLE executable in subprocess + with unittest.mock.patch("subprocess.run") as mock_subprocess_run: + task.run_cable() + mock_subprocess_run.assert_called_once_with( + f"{exe_path} {nml_path} > {task_dir / internal.CABLE_STDOUT_FILENAME} 2>&1", + shell=True, + check=True, + ) + + # Success case: test non-verbose output + with unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run, contextlib.redirect_stdout(io.StringIO()) as buf: + task.run_cable() + assert not buf.getvalue() + + # Success case: test verbose output + with unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run, contextlib.redirect_stdout(io.StringIO()) as buf: + task.run_cable(verbose=True) + assert buf.getvalue() == ( + f" {MOCK_CWD}/runs/site/tasks/forcing-file_R1_S0/cable " + f"{MOCK_CWD}/runs/site/tasks/forcing-file_R1_S0/cable.nml " + f"> {MOCK_CWD}/runs/site/tasks/forcing-file_R1_S0/out.txt 2>&1\n" + ) - met_file_path = Path(internal.MET_DIR, "forcing-file.nc") - output_path = Path(TMP_DIR, internal.SITE_OUTPUT_DIR, task.get_output_filename()) - log_path = Path(TMP_DIR, internal.SITE_LOG_DIR, task.get_log_filename()) - grid_file_path = Path(TMP_DIR, internal.GRID_FILE) - phen_file_path = Path(TMP_DIR, internal.PHEN_FILE) - cnpbiome_file_path = Path(TMP_DIR, internal.CNPBIOME_FILE) - - assert res_nml['cable']['filename']['met'] == str(met_file_path) - assert res_nml['cable']['filename']['out'] == str(output_path) - assert res_nml['cable']['filename']['log'] == str(log_path) - assert res_nml['cable']['filename']['restart_out'] == " " - assert res_nml['cable']['filename']['type'] == str(grid_file_path) - assert res_nml['cable']['output']['restart'] is False - assert res_nml['cable']['fixedCO2'] == internal.CABLE_FIXED_CO2_CONC - assert res_nml['cable']['casafile']['phen'] == str(phen_file_path) - assert res_nml['cable']['casafile']['cnpbiome'] == str(cnpbiome_file_path) - assert res_nml['cable']['spinup'] is False - assert res_nml['cable']['some_setting'] is True - - assert res_nml['cable']['filename']['foo'] == 123, "assert existing derived types are preserved" - assert res_nml['cable']['bar'] == 123, "assert existing top-level parameters are preserved" + # Failure case: raise CableError on subprocess non-zero exit code + with unittest.mock.patch("subprocess.run") as mock_subprocess_run: + mock_subprocess_run.side_effect = subprocess.CalledProcessError( + returncode=1, cmd="cmd" + ) + with pytest.raises(CableError): + task.run_cable() -def test_setup_task(): - """Tests for `setup_task()`.""" +def test_add_provenance_info(): + """Tests for `add_provenance_info()`.""" - # Success case: test branch specific settings are patched into task namelist file task = setup_mock_task() - task_dir = Path(TMP_DIR, internal.SITE_TASKS_DIR, task.get_task_name()) + task_dir = MOCK_CWD / internal.SITE_TASKS_DIR / task.get_task_name() + task_dir.mkdir(parents=True) + site_output_dir = MOCK_CWD / internal.SITE_OUTPUT_DIR + site_output_dir.mkdir() - setup_mock_namelists_directory() - setup_fluxnet_directory_tree([task], root_dir=TMP_DIR) - do_mock_checkout_and_build() + # Create mock namelist file in task directory: + f90nml.write( + {"cable": {"filename": {"met": "/path/to/met/file", "foo": 123}, "bar": True}}, + task_dir / internal.CABLE_NML, + ) - task.setup_task(root_dir=TMP_DIR) + # Create mock netcdf output file as if CABLE had just been run: + nc_output_path = site_output_dir / task.get_output_filename() + netCDF4.Dataset(nc_output_path, "w") + + def mock_svn_info_show_item(*args, **kwargs): # pylint: disable=unused-argument + item = args[1] + return {"url": "/url/to/test-branch", "revision": "123"}[item] + + # Success case: add global attributes to netcdf file + with unittest.mock.patch( + "benchcab.get_cable.svn_info_show_item", mock_svn_info_show_item + ): + task.add_provenance_info() + + with netCDF4.Dataset(str(nc_output_path), "r") as nc_output: + atts = vars(nc_output) + assert atts["cable_branch"] == "/url/to/test-branch" + assert atts["svn_revision_number"] == "123" + assert atts[r"filename%met"] == "/path/to/met/file" + assert atts[r"filename%foo"] == 123 + assert atts[r"bar"] == ".true." + + # Success case: test non-verbose output + with unittest.mock.patch( + "benchcab.get_cable.svn_info_show_item", mock_svn_info_show_item + ), contextlib.redirect_stdout(io.StringIO()) as buf: + task.add_provenance_info() + assert not buf.getvalue() + + # Success case: test verbose output + with unittest.mock.patch( + "benchcab.get_cable.svn_info_show_item", mock_svn_info_show_item + ), contextlib.redirect_stdout(io.StringIO()) as buf: + task.add_provenance_info(verbose=True) + assert buf.getvalue() == ( + " Adding attributes to output file: " + f"{MOCK_CWD}/runs/site/outputs/forcing-file_R1_S0_out.nc\n" + ) - res_nml = f90nml.read(str(task_dir / internal.CABLE_NML)) - assert res_nml['cable']['some_branch_specific_setting'] is True +def test_run(): + """Tests for `run()`.""" + + task = setup_mock_task() + + # Success case: run CABLE and add attributes to netcdf output file + with unittest.mock.patch( + "benchcab.task.Task.run_cable" + ) as mock_run_cable, unittest.mock.patch( + "benchcab.task.Task.add_provenance_info" + ) as mock_add_provenance_info: + task.run() + mock_run_cable.assert_called_once() + mock_add_provenance_info.assert_called_once() + + # Success case: do not add attributes to netcdf file on failure + with unittest.mock.patch( + "benchcab.task.Task.run_cable" + ) as mock_run_cable, unittest.mock.patch( + "benchcab.task.Task.add_provenance_info" + ) as mock_add_provenance_info: + mock_run_cable.side_effect = CableError + task.run() + mock_run_cable.assert_called_once() + mock_add_provenance_info.assert_not_called() + + # Success case: test non-verbose output + with unittest.mock.patch( + "benchcab.task.Task.run_cable" + ) as mock_run_cable, unittest.mock.patch( + "benchcab.task.Task.add_provenance_info" + ) as mock_add_provenance_info: + with contextlib.redirect_stdout(io.StringIO()) as buf: + task.run() + mock_run_cable.assert_called_once_with(verbose=False) + mock_add_provenance_info.assert_called_once_with(verbose=False) + assert not buf.getvalue() + + # Success case: test verbose output + with unittest.mock.patch( + "benchcab.task.Task.run_cable" + ) as mock_run_cable, unittest.mock.patch( + "benchcab.task.Task.add_provenance_info" + ) as mock_add_provenance_info: + with contextlib.redirect_stdout(io.StringIO()) as buf: + task.run(verbose=True) + mock_run_cable.assert_called_once_with(verbose=True) + mock_add_provenance_info.assert_called_once_with(verbose=True) + assert buf.getvalue() == ( + "Running task forcing-file_R1_S0... CABLE standard output saved in " + f"{MOCK_CWD}/runs/site/tasks/forcing-file_R1_S0/out.txt\n" + ) + + +def test_get_fluxnet_tasks(): + """Tests for `get_fluxnet_tasks()`.""" + + # Success case: get task list for two branches, two met + # sites and two science configurations + config = make_barebones_config() + branch_a, branch_b = config["realisations"] + met_site_a, met_site_b = "foo", "bar" + sci_a, sci_b = config["science_configurations"] + + assert get_fluxnet_tasks( + config["realisations"], + config["science_configurations"], + [met_site_a, met_site_b], + ) == [ + Task(0, branch_a["name"], branch_a["patch"], met_site_a, 0, sci_a), + Task(0, branch_a["name"], branch_a["patch"], met_site_a, 1, sci_b), + Task(0, branch_a["name"], branch_a["patch"], met_site_b, 0, sci_a), + Task(0, branch_a["name"], branch_a["patch"], met_site_b, 1, sci_b), + Task(1, branch_b["name"], branch_b["patch"], met_site_a, 0, sci_a), + Task(1, branch_b["name"], branch_b["patch"], met_site_a, 1, sci_b), + Task(1, branch_b["name"], branch_b["patch"], met_site_b, 0, sci_a), + Task(1, branch_b["name"], branch_b["patch"], met_site_b, 1, sci_b), + ] + + +def test_get_fluxnet_comparisons(): + """Tests for `get_fluxnet_comparisons()`.""" + + # Success case: comparisons for two branches with two tasks + # met0_S0_R0 met0_S0_R1 + config = make_barebones_config() + science_configurations = [{"foo": "bar"}] + met_sites = ["foo.nc"] + tasks = get_fluxnet_tasks(config["realisations"], science_configurations, met_sites) + assert len(tasks) == 2 + comparisons = get_fluxnet_comparisons(tasks) + assert len(comparisons) == 1 + assert all( + (task_a.sci_conf_id, task_a.met_forcing_file) + == (task_b.sci_conf_id, task_b.met_forcing_file) + for task_a, task_b in comparisons + ) + assert (comparisons[0][0].branch_id, comparisons[0][1].branch_id) == (0, 1) + + # Success case: comparisons for three branches with three tasks + # met0_S0_R0 met0_S0_R1 met0_S0_R2 + config = make_barebones_config() + config["realisations"] += ( + { + "name": "new-branch", + "revision": -1, + "path": "path/to/new-branch", + "patch": {}, + "build_script": "", + }, + ) + science_configurations = [{"foo": "bar"}] + met_sites = ["foo.nc"] + tasks = get_fluxnet_tasks(config["realisations"], science_configurations, met_sites) + assert len(tasks) == 3 + comparisons = get_fluxnet_comparisons(tasks) + assert len(comparisons) == 3 + assert all( + (task_a.sci_conf_id, task_a.met_forcing_file) + == (task_b.sci_conf_id, task_b.met_forcing_file) + for task_a, task_b in comparisons + ) + assert (comparisons[0][0].branch_id, comparisons[0][1].branch_id) == (0, 1) + assert (comparisons[1][0].branch_id, comparisons[1][1].branch_id) == (0, 2) + assert (comparisons[2][0].branch_id, comparisons[2][1].branch_id) == (1, 2) + + +def test_get_comparison_name(): + """Tests for `get_comparison_name()`.""" + # Success case: check comparison name convention + task_a = Task(0, "branch-a", {}, "foo", 0, {}) + task_b = Task(1, "branch-b", {}, "foo", 0, {}) + assert get_comparison_name(task_a, task_b) == "foo_S0_R0_R1" + + +def test_run_comparison(): + """Tests for `run_comparison()`.""" + bitwise_cmp_dir = MOCK_CWD / internal.SITE_BITWISE_CMP_DIR + bitwise_cmp_dir.mkdir(parents=True) + output_dir = MOCK_CWD / internal.SITE_OUTPUT_DIR + task_a = Task(0, "branch-a", {}, "foo", 0, {}) + task_b = Task(1, "branch-b", {}, "foo", 0, {}) + + # Success case: run comparison + with unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run, unittest.mock.patch( + "subprocess.CompletedProcess" + ) as mock_completed_process: + mock_completed_process.configure_mock( + **{ + "returncode": 0, + "stdout": "standard output from subprocess", + } + ) + mock_subprocess_run.return_value = mock_completed_process + run_comparison(task_a, task_b) + mock_subprocess_run.assert_called_once_with( + f"nccmp -df {output_dir / task_a.get_output_filename()} " + f"{output_dir / task_b.get_output_filename()} 2>&1", + shell=True, + check=False, + capture_output=True, + text=True, + ) + + # Success case: test non-verbose output + with unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run, unittest.mock.patch( + "subprocess.CompletedProcess" + ) as mock_completed_process: + mock_completed_process.configure_mock( + **{ + "returncode": 0, + "stdout": "standard output from subprocess", + } + ) + mock_subprocess_run.return_value = mock_completed_process + with contextlib.redirect_stdout(io.StringIO()) as buf: + run_comparison(task_a, task_b) + assert buf.getvalue() == ( + f"Success: files {task_a.get_output_filename()} " + f"{task_b.get_output_filename()} are identical\n" + ) + + # Success case: test verbose output + with unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run, unittest.mock.patch( + "subprocess.CompletedProcess" + ) as mock_completed_process: + mock_completed_process.configure_mock( + **{ + "returncode": 0, + "stdout": "standard output from subprocess", + } + ) + mock_subprocess_run.return_value = mock_completed_process + with contextlib.redirect_stdout(io.StringIO()) as buf: + run_comparison(task_a, task_b, verbose=True) + assert buf.getvalue() == ( + f"Comparing files {task_a.get_output_filename()} and " + f"{task_b.get_output_filename()} bitwise...\n" + f" nccmp -df {output_dir / task_a.get_output_filename()} " + f"{output_dir / task_b.get_output_filename()} 2>&1\n" + f"Success: files {task_a.get_output_filename()} " + f"{task_b.get_output_filename()} are identical\n" + ) + + # Failure case: run comparison with non-zero exit code + with unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run, unittest.mock.patch( + "subprocess.CompletedProcess" + ) as mock_completed_process: + mock_completed_process.configure_mock( + **{ + "returncode": 1, + "stdout": "standard output from subprocess", + } + ) + mock_subprocess_run.return_value = mock_completed_process + run_comparison(task_a, task_b) + stdout_file = bitwise_cmp_dir / f"{get_comparison_name(task_a, task_b)}.txt" + with open(stdout_file, "r", encoding="utf-8") as file: + assert file.read() == "standard output from subprocess" + + # Failure case: test non-verbose output + with unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run, unittest.mock.patch( + "subprocess.CompletedProcess" + ) as mock_completed_process: + mock_completed_process.configure_mock( + **{ + "returncode": 1, + "stdout": "standard output from subprocess", + } + ) + mock_subprocess_run.return_value = mock_completed_process + with contextlib.redirect_stdout(io.StringIO()) as buf: + run_comparison(task_a, task_b) + stdout_file = bitwise_cmp_dir / f"{get_comparison_name(task_a, task_b)}.txt" + assert buf.getvalue() == ( + f"Failure: files {task_a.get_output_filename()} " + f"{task_b.get_output_filename()} differ. Results of diff " + f"have been written to {stdout_file}\n" + ) + + # Failure case: test verbose output + with unittest.mock.patch( + "subprocess.run" + ) as mock_subprocess_run, unittest.mock.patch( + "subprocess.CompletedProcess" + ) as mock_completed_process: + mock_completed_process.configure_mock( + **{ + "returncode": 1, + "stdout": "standard output from subprocess", + } + ) + mock_subprocess_run.return_value = mock_completed_process + with contextlib.redirect_stdout(io.StringIO()) as buf: + run_comparison(task_a, task_b, verbose=True) + stdout_file = bitwise_cmp_dir / f"{get_comparison_name(task_a, task_b)}.txt" + assert buf.getvalue() == ( + f"Comparing files {task_a.get_output_filename()} and " + f"{task_b.get_output_filename()} bitwise...\n" + f" nccmp -df {output_dir / task_a.get_output_filename()} " + f"{output_dir / task_b.get_output_filename()} 2>&1\n" + f"Failure: files {task_a.get_output_filename()} " + f"{task_b.get_output_filename()} differ. Results of diff " + f"have been written to {stdout_file}\n" + )