Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promote PBS constants to optional parameters #121

36 changes: 36 additions & 0 deletions benchcab/bench_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,42 @@ def check_config(config: dict):
"that is compatible with the f90nml python package."
)

# the "fluxsite" key is optional
if "fluxsite" in config:
if not isinstance(config["fluxsite"], dict):
raise TypeError("The 'fluxsite' key must be a dictionary.")
# the "pbs" key is optional
if "pbs" in config["fluxsite"]:
if not isinstance(config["fluxsite"]["pbs"], dict):
raise TypeError("The 'pbs' key must be a dictionary.")
# the "ncpus" key is optional
if "ncpus" in config["fluxsite"]["pbs"] and not isinstance(
config["fluxsite"]["pbs"]["ncpus"], int
):
raise TypeError("The 'ncpus' key must be an integer.")
# the "mem" key is optional
if "mem" in config["fluxsite"]["pbs"] and not isinstance(
config["fluxsite"]["pbs"]["mem"], str
):
raise TypeError("The 'mem' key must be a string.")
# the "walltime" key is optional
if "walltime" in config["fluxsite"]["pbs"] and not isinstance(
config["fluxsite"]["pbs"]["walltime"], str
):
raise TypeError("The 'walltime' key must be a string.")
# the "storage" key is optional
if "storage" in config["fluxsite"]["pbs"]:
if not isinstance(config["fluxsite"]["pbs"]["storage"], list) or any(
not isinstance(val, str)
for val in config["fluxsite"]["pbs"]["storage"]
):
raise TypeError("The 'storage' key must be a list of strings.")
# the "multiprocessing" key is optional
if "multiprocessing" in config["fluxsite"] and not isinstance(
config["fluxsite"]["multiprocessing"], bool
):
raise TypeError("The 'multiprocessing' key must be a boolean.")

valid_experiments = (
list(internal.MEORG_EXPERIMENTS) + internal.MEORG_EXPERIMENTS["five-site-test"]
)
Expand Down
27 changes: 22 additions & 5 deletions benchcab/benchcab.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@
project=self.config["project"],
config_path=self.args.config,
modules=self.config["modules"],
storage_flags=[], # TODO(Sean) add storage flags option to config
verbose=self.args.verbose,
skip_bitwise_cmp="fluxsite-bitwise-cmp" in self.args.skip,
benchcab_path=str(self.benchcab_exe_path),
pbs_config=self.config.get("pbs"),
)
file.write(contents)

Expand Down Expand Up @@ -211,8 +211,15 @@
"""Endpoint for `benchcab fluxsite-run-tasks`."""
tasks = self.tasks if self.tasks else self._initialise_tasks()
print("Running fluxsite tasks...")
if internal.MULTIPROCESS:
run_tasks_in_parallel(tasks, verbose=self.args.verbose)
try:
multiprocess = self.config["fluxsite"]["multiprocess"]
except KeyError:
multiprocess = internal.FLUXSITE_DEFAULT_MULTIPROCESS
if multiprocess:
ncpus = self.config.get("pbs", {}).get(

Check warning on line 219 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L214-L219

Added lines #L214 - L219 were not covered by tests
"ncpus", internal.FLUXSITE_DEFAULT_PBS["ncpus"]
)
run_tasks_in_parallel(tasks, n_processes=ncpus, verbose=self.args.verbose)

Check warning on line 222 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L222

Added line #L222 was not covered by tests
else:
run_tasks(tasks, verbose=self.args.verbose)
print("Successfully ran fluxsite tasks")
Expand All @@ -230,8 +237,18 @@
comparisons = get_fluxsite_comparisons(tasks)

print("Running comparison tasks...")
if internal.MULTIPROCESS:
run_comparisons_in_parallel(comparisons, verbose=self.args.verbose)
try:
multiprocess = self.config["fluxsite"]["multiprocess"]
except KeyError:
multiprocess = internal.FLUXSITE_DEFAULT_MULTIPROCESS
if multiprocess:
try:
ncpus = self.config["fluxsite"]["pbs"]["ncpus"]
except KeyError:
ncpus = internal.FLUXSITE_DEFAULT_PBS["ncpus"]
run_comparisons_in_parallel(

Check warning on line 249 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L240-L249

Added lines #L240 - L249 were not covered by tests
comparisons, n_processes=ncpus, verbose=self.args.verbose
)
else:
run_comparisons(comparisons, verbose=self.args.verbose)
print("Successfully ran comparison tasks")
Expand Down
6 changes: 4 additions & 2 deletions benchcab/comparison.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@


def run_comparisons_in_parallel(
comparison_tasks: list[ComparisonTask], verbose=False
comparison_tasks: list[ComparisonTask],
n_processes=internal.FLUXSITE_DEFAULT_PBS["ncpus"],
verbose=False,
) -> None:
"""Runs bitwise comparison tasks in parallel across multiple processes."""

Expand All @@ -68,7 +70,7 @@
task_queue.put(task)

processes = []
for _ in range(internal.NCPUS):
for _ in range(n_processes):

Check warning on line 73 in benchcab/comparison.py

View check run for this annotation

Codecov / codecov/patch

benchcab/comparison.py#L73

Added line #L73 was not covered by tests
proc = multiprocessing.Process(
target=worker_comparison, args=[task_queue, verbose]
)
Expand Down
12 changes: 8 additions & 4 deletions benchcab/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import os
from pathlib import Path
from typing import Any


_, NODENAME, _, _, _ = os.uname()
Expand All @@ -10,11 +11,14 @@

# Parameters for job script:
QSUB_FNAME = "benchmark_cable_qsub.sh"
NCPUS = 18
MEM = "30GB"
WALL_TIME = "6:00:00"
FLUXSITE_DEFAULT_PBS: Any = {
"ncpus": 18,
"mem": "30GB",
"walltime": "6:00:00",
"storage": [],
}
MPI = False
MULTIPROCESS = True
FLUXSITE_DEFAULT_MULTIPROCESS = True

# DIRECTORY PATHS/STRUCTURE:

Expand Down
6 changes: 4 additions & 2 deletions benchcab/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,15 +334,17 @@
task.run(verbose=verbose)


def run_tasks_in_parallel(tasks: list[Task], verbose=False):
def run_tasks_in_parallel(
tasks: list[Task], n_processes=internal.FLUXSITE_DEFAULT_PBS["ncpus"], verbose=False
):
"""Runs tasks in `tasks` in parallel across multiple processes."""

task_queue: multiprocessing.Queue = multiprocessing.Queue()
for task in tasks:
task_queue.put(task)

processes = []
for _ in range(internal.NCPUS):
for _ in range(n_processes):

Check warning on line 347 in benchcab/task.py

View check run for this annotation

Codecov / codecov/patch

benchcab/task.py#L347

Added line #L347 was not covered by tests
proc = multiprocessing.Process(target=worker_run, args=[task_queue, verbose])
proc.start()
processes.append(proc)
Expand Down
23 changes: 18 additions & 5 deletions benchcab/utils/pbs.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,46 @@
"""Contains helper functions for manipulating PBS job scripts."""

from typing import Optional

from benchcab import internal


def render_job_script(
project: str,
config_path: str,
modules: list,
storage_flags: list,
benchcab_path: str,
verbose=False,
skip_bitwise_cmp=False,
pbs_config: Optional[dict] = None,
) -> str:
"""Returns the text for a PBS job script that executes all computationally expensive commands.

This includes things such as running CABLE and running bitwise comparison jobs
between model output files.
"""

if pbs_config is None:
pbs_config = internal.FLUXSITE_DEFAULT_PBS

module_load_lines = "\n".join(
f"module load {module_name}" for module_name in modules
)
verbose_flag = "-v" if verbose else ""
storage_flags = ["gdata/ks32", "gdata/hh5", f"gdata/{project}", *storage_flags]
ncpus = pbs_config.get("ncpus", internal.FLUXSITE_DEFAULT_PBS["ncpus"])
mem = pbs_config.get("mem", internal.FLUXSITE_DEFAULT_PBS["mem"])
walltime = pbs_config.get("walltime", internal.FLUXSITE_DEFAULT_PBS["walltime"])
storage_flags = [
"gdata/ks32",
"gdata/hh5",
f"gdata/{project}",
*pbs_config.get("storage", internal.FLUXSITE_DEFAULT_PBS["storage"]),
]
return f"""#!/bin/bash
#PBS -l wd
#PBS -l ncpus={internal.NCPUS}
#PBS -l mem={internal.MEM}
#PBS -l walltime={internal.WALL_TIME}
#PBS -l ncpus={ncpus}
#PBS -l mem={mem}
#PBS -l walltime={walltime}
#PBS -q normal
#PBS -P {project}
#PBS -j oe
Expand Down
46 changes: 45 additions & 1 deletion docs/user_guide/config_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,49 @@ The different running modes of `benchcab` are solely dependent on the options us

: NCI modules to use for compiling CABLE

### `fluxsite`
: Contains settings specific to fluxsite tests.
SeanBryan51 marked this conversation as resolved.
Show resolved Hide resolved

#### `pbs`

: Contains settings specific to the PBS scheduler at NCI for the PBS script running the CABLE simulations at FLUXNET sites and the bitwise comparison for these simulations.
SeanBryan51 marked this conversation as resolved.
Show resolved Hide resolved

##### `ncpus`

: The number of CPU cores to allocate for the PBS job, i.e. the `-l ncpus=<4>` PBS flag (see [PBS Directives Explained][nci-pbs-directives]).
: This key is **optional** and can be omitted from the config file. By default `ncpus` is set to `18`.

##### `mem`

: The total memory limit for the PBS job, i.e. the `-l mem=<10GB>` PBS flag (see [PBS Directives Explained][nci-pbs-directives]).
: This key is **optional** and can be omitted from the config file. By default `mem` is set to `30GB`.

##### `walltime`

: The wall clock time limit for the PBS job, i.e. `-l walltime=<HH:MM:SS>` PBS flag (see [PBS Directives Explained][nci-pbs-directives]).
: This key is **optional** and can be omitted from the config file. By default `walltime` is set to `6:00:00`.

##### `storage`

: A list of extra storage flags required for the PBS job, i.e. `-l storage=<scratch/a00>` (see [PBS Directives Explained][nci-pbs-directives]).
: This key is **optional** and can be omitted from the config file. By default `storage` is set to `[]`.

#### `multiprocess`

: Enables or disables multiprocessing for executing embarrassingly parallel tasks.
: This key is **optional** and can be omitted from the config file. By default `multiprocess` is set to `True`.

Example:
```yaml
fluxsite:
pbs:
ncpus: 16
mem: 64GB
walltime: 00:01:00
storage: [scratch/a00, gdata/xy11]
multiprocess: True
```

## Simulations options

### `realisations`
Expand Down Expand Up @@ -138,4 +181,5 @@ science_configurations: [
[forty-two-me]: https://modelevaluation.org/experiment/display/urTKSXEsojdvEPwdR
[five-me]: https://modelevaluation.org/experiment/display/xNZx2hSvn4PMKAa9R
[f90nml-github]: https://github.com/marshallward/f90nml
[environment-modules]: https://modules.sourceforge.net/
[environment-modules]: https://modules.sourceforge.net/
[nci-pbs-directives]: https://opus.nci.org.au/display/Help/PBS+Directives+Explained
9 changes: 9 additions & 0 deletions tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ def get_mock_config() -> dict:
}
},
],
"fluxsite": {
"pbs": {
"ncpus": 16,
"mem": "64G",
"walltime": "01:00:00",
"storage": ["gdata/foo123"],
},
"multiprocessing": True,
},
}
return config

Expand Down
83 changes: 83 additions & 0 deletions tests/test_bench_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,41 @@ def test_check_config():
config.pop("science_configurations")
check_config(config)

# Success case: test config without fluxsite key is valid
config = get_mock_config()
config.pop("fluxsite")
check_config(config)

# Success case: test config without multiprocessing key is valid
config = get_mock_config()
config["fluxsite"].pop("multiprocessing")
check_config(config)

# Success case: test config without pbs key is valid
config = get_mock_config()
config["fluxsite"].pop("pbs")
check_config(config)

# Success case: test config without ncpus key is valid
config = get_mock_config()
config["fluxsite"]["pbs"].pop("ncpus")
check_config(config)

# Success case: test config without mem key is valid
config = get_mock_config()
config["fluxsite"]["pbs"].pop("mem")
check_config(config)

# Success case: test config without walltime key is valid
config = get_mock_config()
config["fluxsite"]["pbs"].pop("walltime")
check_config(config)

# Success case: test config without storage key is valid
config = get_mock_config()
config["fluxsite"]["pbs"].pop("storage")
check_config(config)

# Failure case: test missing required keys raises an exception
with pytest.raises(
ValueError,
Expand Down Expand Up @@ -207,6 +242,54 @@ def test_check_config():
config["science_configurations"] = [r"cable_user%GS_SWITCH = 'medlyn'"]
check_config(config)

# Failure case: type of config["fluxsite"] is not a dict
with pytest.raises(TypeError, match="The 'fluxsite' key must be a dictionary."):
config = get_mock_config()
config["fluxsite"] = ["ncpus: 16\nmem: 64GB\n"]
check_config(config)

# Failure case: type of config["pbs"] is not a dict
with pytest.raises(TypeError, match="The 'pbs' key must be a dictionary."):
config = get_mock_config()
config["fluxsite"]["pbs"] = "-l ncpus=16"
check_config(config)

# Failure case: type of config["pbs"]["ncpus"] is not an int
with pytest.raises(TypeError, match="The 'ncpus' key must be an integer."):
config = get_mock_config()
config["fluxsite"]["pbs"]["ncpus"] = "16"
check_config(config)

# Failure case: type of config["pbs"]["mem"] is not a string
with pytest.raises(TypeError, match="The 'mem' key must be a string."):
config = get_mock_config()
config["fluxsite"]["pbs"]["mem"] = 64
check_config(config)

# Failure case: type of config["pbs"]["walltime"] is not a string
with pytest.raises(TypeError, match="The 'walltime' key must be a string."):
config = get_mock_config()
config["fluxsite"]["pbs"]["walltime"] = 60
check_config(config)

# Failure case: type of config["pbs"]["storage"] is not a list
with pytest.raises(TypeError, match="The 'storage' key must be a list of strings."):
config = get_mock_config()
config["fluxsite"]["pbs"]["storage"] = "gdata/foo+gdata/bar"
check_config(config)

# Failure case: type of config["pbs"]["storage"] is not a list of strings
with pytest.raises(TypeError, match="The 'storage' key must be a list of strings."):
config = get_mock_config()
config["fluxsite"]["pbs"]["storage"] = [1, 2, 3]
check_config(config)

# Failure case: type of config["multiprocessing"] is not a bool
with pytest.raises(TypeError, match="The 'multiprocessing' key must be a boolean."):
config = get_mock_config()
config["fluxsite"]["multiprocessing"] = 1
check_config(config)


def test_read_config():
"""Tests for `read_config()`."""
Expand Down
Loading