Skip to content

Commit

Permalink
Merge pull request #121 from CABLE-LSM/104-promote-pbs-related-consta…
Browse files Browse the repository at this point in the history
…nts-to-parameters-in-the-config-file

Promote PBS constants to optional parameters
  • Loading branch information
SeanBryan51 authored Aug 10, 2023
2 parents 79e7fa4 + 309adca commit a35753f
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 39 deletions.
36 changes: 36 additions & 0 deletions benchcab/bench_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,42 @@ def check_config(config: dict):
"that is compatible with the f90nml python package."
)

# the "fluxsite" key is optional
if "fluxsite" in config:
if not isinstance(config["fluxsite"], dict):
raise TypeError("The 'fluxsite' key must be a dictionary.")
# the "pbs" key is optional
if "pbs" in config["fluxsite"]:
if not isinstance(config["fluxsite"]["pbs"], dict):
raise TypeError("The 'pbs' key must be a dictionary.")
# the "ncpus" key is optional
if "ncpus" in config["fluxsite"]["pbs"] and not isinstance(
config["fluxsite"]["pbs"]["ncpus"], int
):
raise TypeError("The 'ncpus' key must be an integer.")
# the "mem" key is optional
if "mem" in config["fluxsite"]["pbs"] and not isinstance(
config["fluxsite"]["pbs"]["mem"], str
):
raise TypeError("The 'mem' key must be a string.")
# the "walltime" key is optional
if "walltime" in config["fluxsite"]["pbs"] and not isinstance(
config["fluxsite"]["pbs"]["walltime"], str
):
raise TypeError("The 'walltime' key must be a string.")
# the "storage" key is optional
if "storage" in config["fluxsite"]["pbs"]:
if not isinstance(config["fluxsite"]["pbs"]["storage"], list) or any(
not isinstance(val, str)
for val in config["fluxsite"]["pbs"]["storage"]
):
raise TypeError("The 'storage' key must be a list of strings.")
# the "multiprocessing" key is optional
if "multiprocessing" in config["fluxsite"] and not isinstance(
config["fluxsite"]["multiprocessing"], bool
):
raise TypeError("The 'multiprocessing' key must be a boolean.")

valid_experiments = (
list(internal.MEORG_EXPERIMENTS) + internal.MEORG_EXPERIMENTS["five-site-test"]
)
Expand Down
27 changes: 22 additions & 5 deletions benchcab/benchcab.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ def fluxsite_submit_job(self) -> None:
project=self.config["project"],
config_path=self.args.config,
modules=self.config["modules"],
storage_flags=[], # TODO(Sean) add storage flags option to config
verbose=self.args.verbose,
skip_bitwise_cmp="fluxsite-bitwise-cmp" in self.args.skip,
benchcab_path=str(self.benchcab_exe_path),
pbs_config=self.config.get("pbs"),
)
file.write(contents)

Expand Down Expand Up @@ -211,8 +211,15 @@ def fluxsite_run_tasks(self):
"""Endpoint for `benchcab fluxsite-run-tasks`."""
tasks = self.tasks if self.tasks else self._initialise_tasks()
print("Running fluxsite tasks...")
if internal.MULTIPROCESS:
run_tasks_in_parallel(tasks, verbose=self.args.verbose)
try:
multiprocess = self.config["fluxsite"]["multiprocess"]
except KeyError:
multiprocess = internal.FLUXSITE_DEFAULT_MULTIPROCESS
if multiprocess:
ncpus = self.config.get("pbs", {}).get(
"ncpus", internal.FLUXSITE_DEFAULT_PBS["ncpus"]
)
run_tasks_in_parallel(tasks, n_processes=ncpus, verbose=self.args.verbose)
else:
run_tasks(tasks, verbose=self.args.verbose)
print("Successfully ran fluxsite tasks")
Expand All @@ -230,8 +237,18 @@ def fluxsite_bitwise_cmp(self):
comparisons = get_fluxsite_comparisons(tasks)

print("Running comparison tasks...")
if internal.MULTIPROCESS:
run_comparisons_in_parallel(comparisons, verbose=self.args.verbose)
try:
multiprocess = self.config["fluxsite"]["multiprocess"]
except KeyError:
multiprocess = internal.FLUXSITE_DEFAULT_MULTIPROCESS
if multiprocess:
try:
ncpus = self.config["fluxsite"]["pbs"]["ncpus"]
except KeyError:
ncpus = internal.FLUXSITE_DEFAULT_PBS["ncpus"]
run_comparisons_in_parallel(
comparisons, n_processes=ncpus, verbose=self.args.verbose
)
else:
run_comparisons(comparisons, verbose=self.args.verbose)
print("Successfully ran comparison tasks")
Expand Down
6 changes: 4 additions & 2 deletions benchcab/comparison.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ def run_comparisons(comparison_tasks: list[ComparisonTask], verbose=False) -> No


def run_comparisons_in_parallel(
comparison_tasks: list[ComparisonTask], verbose=False
comparison_tasks: list[ComparisonTask],
n_processes=internal.FLUXSITE_DEFAULT_PBS["ncpus"],
verbose=False,
) -> None:
"""Runs bitwise comparison tasks in parallel across multiple processes."""

Expand All @@ -68,7 +70,7 @@ def run_comparisons_in_parallel(
task_queue.put(task)

processes = []
for _ in range(internal.NCPUS):
for _ in range(n_processes):
proc = multiprocessing.Process(
target=worker_comparison, args=[task_queue, verbose]
)
Expand Down
12 changes: 8 additions & 4 deletions benchcab/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import os
from pathlib import Path
from typing import Any


_, NODENAME, _, _, _ = os.uname()
Expand All @@ -10,11 +11,14 @@

# Parameters for job script:
QSUB_FNAME = "benchmark_cable_qsub.sh"
NCPUS = 18
MEM = "30GB"
WALL_TIME = "6:00:00"
FLUXSITE_DEFAULT_PBS: Any = {
"ncpus": 18,
"mem": "30GB",
"walltime": "6:00:00",
"storage": [],
}
MPI = False
MULTIPROCESS = True
FLUXSITE_DEFAULT_MULTIPROCESS = True

# DIRECTORY PATHS/STRUCTURE:

Expand Down
6 changes: 4 additions & 2 deletions benchcab/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,15 +334,17 @@ def run_tasks(tasks: list[Task], verbose=False):
task.run(verbose=verbose)


def run_tasks_in_parallel(tasks: list[Task], verbose=False):
def run_tasks_in_parallel(
tasks: list[Task], n_processes=internal.FLUXSITE_DEFAULT_PBS["ncpus"], verbose=False
):
"""Runs tasks in `tasks` in parallel across multiple processes."""

task_queue: multiprocessing.Queue = multiprocessing.Queue()
for task in tasks:
task_queue.put(task)

processes = []
for _ in range(internal.NCPUS):
for _ in range(n_processes):
proc = multiprocessing.Process(target=worker_run, args=[task_queue, verbose])
proc.start()
processes.append(proc)
Expand Down
23 changes: 18 additions & 5 deletions benchcab/utils/pbs.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,46 @@
"""Contains helper functions for manipulating PBS job scripts."""

from typing import Optional

from benchcab import internal


def render_job_script(
project: str,
config_path: str,
modules: list,
storage_flags: list,
benchcab_path: str,
verbose=False,
skip_bitwise_cmp=False,
pbs_config: Optional[dict] = None,
) -> str:
"""Returns the text for a PBS job script that executes all computationally expensive commands.
This includes things such as running CABLE and running bitwise comparison jobs
between model output files.
"""

if pbs_config is None:
pbs_config = internal.FLUXSITE_DEFAULT_PBS

module_load_lines = "\n".join(
f"module load {module_name}" for module_name in modules
)
verbose_flag = "-v" if verbose else ""
storage_flags = ["gdata/ks32", "gdata/hh5", f"gdata/{project}", *storage_flags]
ncpus = pbs_config.get("ncpus", internal.FLUXSITE_DEFAULT_PBS["ncpus"])
mem = pbs_config.get("mem", internal.FLUXSITE_DEFAULT_PBS["mem"])
walltime = pbs_config.get("walltime", internal.FLUXSITE_DEFAULT_PBS["walltime"])
storage_flags = [
"gdata/ks32",
"gdata/hh5",
f"gdata/{project}",
*pbs_config.get("storage", internal.FLUXSITE_DEFAULT_PBS["storage"]),
]
return f"""#!/bin/bash
#PBS -l wd
#PBS -l ncpus={internal.NCPUS}
#PBS -l mem={internal.MEM}
#PBS -l walltime={internal.WALL_TIME}
#PBS -l ncpus={ncpus}
#PBS -l mem={mem}
#PBS -l walltime={walltime}
#PBS -q normal
#PBS -P {project}
#PBS -j oe
Expand Down
48 changes: 47 additions & 1 deletion docs/user_guide/config_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,51 @@ The different running modes of `benchcab` are solely dependent on the options us

: NCI modules to use for compiling CABLE

### `fluxsite`
: Contains settings specific to fluxsite tests.
: This key is **optional**. Default settings for the fluxsite tests will be used if it is not present

#### `pbs`

: Contains settings specific to the PBS scheduler at NCI for the PBS script running the CABLE simulations at FLUXNET sites and the bitwise comparison for these simulations.
: This key is **optional**. Default values for the PBS settings will apply if it is not specified.

##### `ncpus`

: The number of CPU cores to allocate for the PBS job, i.e. the `-l ncpus=<4>` PBS flag (see [PBS Directives Explained][nci-pbs-directives]).
: This key is **optional** and can be omitted from the config file. By default `ncpus` is set to `18`.

##### `mem`

: The total memory limit for the PBS job, i.e. the `-l mem=<10GB>` PBS flag (see [PBS Directives Explained][nci-pbs-directives]).
: This key is **optional** and can be omitted from the config file. By default `mem` is set to `30GB`.

##### `walltime`

: The wall clock time limit for the PBS job, i.e. `-l walltime=<HH:MM:SS>` PBS flag (see [PBS Directives Explained][nci-pbs-directives]).
: This key is **optional** and can be omitted from the config file. By default `walltime` is set to `6:00:00`.

##### `storage`

: A list of extra storage flags required for the PBS job, i.e. `-l storage=<scratch/a00>` (see [PBS Directives Explained][nci-pbs-directives]).
: This key is **optional** and can be omitted from the config file. By default `storage` is set to `[]`.

#### `multiprocess`

: Enables or disables multiprocessing for executing embarrassingly parallel tasks.
: This key is **optional** and can be omitted from the config file. By default `multiprocess` is set to `True`.

Example:
```yaml
fluxsite:
pbs:
ncpus: 16
mem: 64GB
walltime: 00:01:00
storage: [scratch/a00, gdata/xy11]
multiprocess: True
```
## Simulations options
### `realisations`
Expand Down Expand Up @@ -138,4 +183,5 @@ science_configurations: [
[forty-two-me]: https://modelevaluation.org/experiment/display/urTKSXEsojdvEPwdR
[five-me]: https://modelevaluation.org/experiment/display/xNZx2hSvn4PMKAa9R
[f90nml-github]: https://github.com/marshallward/f90nml
[environment-modules]: https://modules.sourceforge.net/
[environment-modules]: https://modules.sourceforge.net/
[nci-pbs-directives]: https://opus.nci.org.au/display/Help/PBS+Directives+Explained
9 changes: 9 additions & 0 deletions tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ def get_mock_config() -> dict:
}
},
],
"fluxsite": {
"pbs": {
"ncpus": 16,
"mem": "64G",
"walltime": "01:00:00",
"storage": ["gdata/foo123"],
},
"multiprocessing": True,
},
}
return config

Expand Down
83 changes: 83 additions & 0 deletions tests/test_bench_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,41 @@ def test_check_config():
config.pop("science_configurations")
check_config(config)

# Success case: test config without fluxsite key is valid
config = get_mock_config()
config.pop("fluxsite")
check_config(config)

# Success case: test config without multiprocessing key is valid
config = get_mock_config()
config["fluxsite"].pop("multiprocessing")
check_config(config)

# Success case: test config without pbs key is valid
config = get_mock_config()
config["fluxsite"].pop("pbs")
check_config(config)

# Success case: test config without ncpus key is valid
config = get_mock_config()
config["fluxsite"]["pbs"].pop("ncpus")
check_config(config)

# Success case: test config without mem key is valid
config = get_mock_config()
config["fluxsite"]["pbs"].pop("mem")
check_config(config)

# Success case: test config without walltime key is valid
config = get_mock_config()
config["fluxsite"]["pbs"].pop("walltime")
check_config(config)

# Success case: test config without storage key is valid
config = get_mock_config()
config["fluxsite"]["pbs"].pop("storage")
check_config(config)

# Failure case: test missing required keys raises an exception
with pytest.raises(
ValueError,
Expand Down Expand Up @@ -207,6 +242,54 @@ def test_check_config():
config["science_configurations"] = [r"cable_user%GS_SWITCH = 'medlyn'"]
check_config(config)

# Failure case: type of config["fluxsite"] is not a dict
with pytest.raises(TypeError, match="The 'fluxsite' key must be a dictionary."):
config = get_mock_config()
config["fluxsite"] = ["ncpus: 16\nmem: 64GB\n"]
check_config(config)

# Failure case: type of config["pbs"] is not a dict
with pytest.raises(TypeError, match="The 'pbs' key must be a dictionary."):
config = get_mock_config()
config["fluxsite"]["pbs"] = "-l ncpus=16"
check_config(config)

# Failure case: type of config["pbs"]["ncpus"] is not an int
with pytest.raises(TypeError, match="The 'ncpus' key must be an integer."):
config = get_mock_config()
config["fluxsite"]["pbs"]["ncpus"] = "16"
check_config(config)

# Failure case: type of config["pbs"]["mem"] is not a string
with pytest.raises(TypeError, match="The 'mem' key must be a string."):
config = get_mock_config()
config["fluxsite"]["pbs"]["mem"] = 64
check_config(config)

# Failure case: type of config["pbs"]["walltime"] is not a string
with pytest.raises(TypeError, match="The 'walltime' key must be a string."):
config = get_mock_config()
config["fluxsite"]["pbs"]["walltime"] = 60
check_config(config)

# Failure case: type of config["pbs"]["storage"] is not a list
with pytest.raises(TypeError, match="The 'storage' key must be a list of strings."):
config = get_mock_config()
config["fluxsite"]["pbs"]["storage"] = "gdata/foo+gdata/bar"
check_config(config)

# Failure case: type of config["pbs"]["storage"] is not a list of strings
with pytest.raises(TypeError, match="The 'storage' key must be a list of strings."):
config = get_mock_config()
config["fluxsite"]["pbs"]["storage"] = [1, 2, 3]
check_config(config)

# Failure case: type of config["multiprocessing"] is not a bool
with pytest.raises(TypeError, match="The 'multiprocessing' key must be a boolean."):
config = get_mock_config()
config["fluxsite"]["multiprocessing"] = 1
check_config(config)


def test_read_config():
"""Tests for `read_config()`."""
Expand Down
Loading

0 comments on commit a35753f

Please sign in to comment.