Skip to content

Commit

Permalink
Promote PBS constants to optional parameters
Browse files Browse the repository at this point in the history
This change promotes PBS related constants to optional parameters in the
configuration file so that PBS flags can be set at runtime.

This is useful in running multiple benchcab instances with different job
parameters such as memory and the number of CPUs. This will also allow
us to easily find an optimal number of CPUs to use to maximise
performance.

This change also adds the ability to switch on and off multiprocessing
at runtime via an optional parameter in the config file.

Fixes #104
  • Loading branch information
SeanBryan51 committed Jul 28, 2023
1 parent 138b877 commit 08c04bb
Show file tree
Hide file tree
Showing 10 changed files with 236 additions and 39 deletions.
26 changes: 26 additions & 0 deletions benchcab/bench_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,32 @@ def check_config(config: dict):
"that is compatible with the f90nml python package."
)

# the "pbs" key is optional
if "pbs" in config:
if not isinstance(config["pbs"], dict):
raise TypeError("The 'pbs' key must be a dictionary.")
# the "ncpus" key is optional
if "ncpus" in config["pbs"] and not isinstance(config["pbs"]["ncpus"], int):
raise TypeError("The 'ncpus' key must be an integer.")
# the "mem" key is optional
if "mem" in config["pbs"] and not isinstance(config["pbs"]["mem"], str):
raise TypeError("The 'mem' key must be a string.")
# the "walltime" key is optional
if "walltime" in config["pbs"] and not isinstance(
config["pbs"]["walltime"], str
):
raise TypeError("The 'walltime' key must be a string.")
# the "storage" key is optional
if "storage" in config["pbs"]:
if not isinstance(config["pbs"]["storage"], list) or any(
not isinstance(val, str) for val in config["pbs"]["storage"]
):
raise TypeError("The 'storage' key must be a list of strings.")

# the "multiprocessing" key is optional
if "multiprocessing" in config and not isinstance(config["multiprocessing"], bool):
raise TypeError("The 'multiprocessing' key must be a boolean.")

valid_experiments = (
list(internal.MEORG_EXPERIMENTS) + internal.MEORG_EXPERIMENTS["five-site-test"]
)
Expand Down
20 changes: 15 additions & 5 deletions benchcab/benchcab.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ def fluxsite_submit_job(self) -> None:
project=self.config["project"],
config_path=self.args.config,
modules=self.config["modules"],
storage_flags=[], # TODO(Sean) add storage flags option to config
verbose=self.args.verbose,
skip_bitwise_cmp="fluxsite-bitwise-cmp" in self.args.skip,
benchcab_path=str(self.benchcab_exe_path),
pbs_config=self.config.get("pbs"),
)
file.write(contents)

Expand Down Expand Up @@ -211,8 +211,12 @@ def fluxsite_run_tasks(self):
"""Endpoint for `benchcab fluxsite-run-tasks`."""
tasks = self.tasks if self.tasks else self._initialise_tasks()
print("Running fluxsite tasks...")
if internal.MULTIPROCESS:
run_tasks_in_parallel(tasks, verbose=self.args.verbose)
multiprocess = self.config.get("multiprocessing", internal.DEFAULT_MULTIPROCESS)
if multiprocess:
ncpus = self.config.get("pbs", {}).get(

Check warning on line 216 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L214-L216

Added lines #L214 - L216 were not covered by tests
"ncpus", internal.DEFAULT_PBS["ncpus"]
)
run_tasks_in_parallel(tasks, n_processes=ncpus, verbose=self.args.verbose)

Check warning on line 219 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L219

Added line #L219 was not covered by tests
else:
run_tasks(tasks, verbose=self.args.verbose)
print("Successfully ran fluxsite tasks")
Expand All @@ -230,8 +234,14 @@ def fluxsite_bitwise_cmp(self):
comparisons = get_fluxsite_comparisons(tasks)

print("Running comparison tasks...")
if internal.MULTIPROCESS:
run_comparisons_in_parallel(comparisons, verbose=self.args.verbose)
multiprocess = self.config.get("multiprocessing", internal.DEFAULT_MULTIPROCESS)
if multiprocess:
ncpus = self.config.get("pbs", {}).get(

Check warning on line 239 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L237-L239

Added lines #L237 - L239 were not covered by tests
"ncpus", internal.DEFAULT_PBS["ncpus"]
)
run_comparisons_in_parallel(

Check warning on line 242 in benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

benchcab/benchcab.py#L242

Added line #L242 was not covered by tests
comparisons, n_processes=ncpus, verbose=self.args.verbose
)
else:
run_comparisons(comparisons, verbose=self.args.verbose)
print("Successfully ran comparison tasks")
Expand Down
6 changes: 4 additions & 2 deletions benchcab/comparison.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ def run_comparisons(comparison_tasks: list[ComparisonTask], verbose=False) -> No


def run_comparisons_in_parallel(
comparison_tasks: list[ComparisonTask], verbose=False
comparison_tasks: list[ComparisonTask],
n_processes=internal.DEFAULT_PBS["ncpus"],
verbose=False,
) -> None:
"""Runs bitwise comparison tasks in parallel across multiple processes."""

Expand All @@ -68,7 +70,7 @@ def run_comparisons_in_parallel(
task_queue.put(task)

processes = []
for _ in range(internal.NCPUS):
for _ in range(n_processes):

Check warning on line 73 in benchcab/comparison.py

View check run for this annotation

Codecov / codecov/patch

benchcab/comparison.py#L73

Added line #L73 was not covered by tests
proc = multiprocessing.Process(
target=worker_comparison, args=[task_queue, verbose]
)
Expand Down
7 changes: 3 additions & 4 deletions benchcab/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import os
from pathlib import Path
from typing import Any


_, NODENAME, _, _, _ = os.uname()
Expand All @@ -10,11 +11,9 @@

# Parameters for job script:
QSUB_FNAME = "benchmark_cable_qsub.sh"
NCPUS = 18
MEM = "30GB"
WALL_TIME = "6:00:00"
DEFAULT_PBS: Any = {"ncpus": 18, "mem": "30GB", "walltime": "6:00:00", "storage": []}
MPI = False
MULTIPROCESS = True
DEFAULT_MULTIPROCESS = True

# DIRECTORY PATHS/STRUCTURE:

Expand Down
6 changes: 4 additions & 2 deletions benchcab/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,15 +332,17 @@ def run_tasks(tasks: list[Task], verbose=False):
task.run(verbose=verbose)


def run_tasks_in_parallel(tasks: list[Task], verbose=False):
def run_tasks_in_parallel(
tasks: list[Task], n_processes=internal.DEFAULT_PBS["ncpus"], verbose=False
):
"""Runs tasks in `tasks` in parallel across multiple processes."""

task_queue: multiprocessing.Queue = multiprocessing.Queue()
for task in tasks:
task_queue.put(task)

processes = []
for _ in range(internal.NCPUS):
for _ in range(n_processes):

Check warning on line 345 in benchcab/task.py

View check run for this annotation

Codecov / codecov/patch

benchcab/task.py#L345

Added line #L345 was not covered by tests
proc = multiprocessing.Process(target=worker_run, args=[task_queue, verbose])
proc.start()
processes.append(proc)
Expand Down
23 changes: 18 additions & 5 deletions benchcab/utils/pbs.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,46 @@
"""Contains helper functions for manipulating PBS job scripts."""

from typing import Optional

from benchcab import internal


def render_job_script(
project: str,
config_path: str,
modules: list,
storage_flags: list,
benchcab_path: str,
verbose=False,
skip_bitwise_cmp=False,
pbs_config: Optional[dict] = None,
) -> str:
"""Returns the text for a PBS job script that executes all computationally expensive commands.
This includes things such as running CABLE and running bitwise comparison jobs
between model output files.
"""

if pbs_config is None:
pbs_config = internal.DEFAULT_PBS

module_load_lines = "\n".join(
f"module load {module_name}" for module_name in modules
)
verbose_flag = "-v" if verbose else ""
storage_flags = ["gdata/ks32", "gdata/hh5", f"gdata/{project}", *storage_flags]
ncpus = pbs_config.get("ncpus", internal.DEFAULT_PBS["ncpus"])
mem = pbs_config.get("mem", internal.DEFAULT_PBS["mem"])
walltime = pbs_config.get("walltime", internal.DEFAULT_PBS["walltime"])
storage_flags = [
"gdata/ks32",
"gdata/hh5",
f"gdata/{project}",
*pbs_config.get("storage", internal.DEFAULT_PBS["storage"]),
]
return f"""#!/bin/bash
#PBS -l wd
#PBS -l ncpus={internal.NCPUS}
#PBS -l mem={internal.MEM}
#PBS -l walltime={internal.WALL_TIME}
#PBS -l ncpus={ncpus}
#PBS -l mem={mem}
#PBS -l walltime={walltime}
#PBS -q normal
#PBS -P {project}
#PBS -j oe
Expand Down
32 changes: 31 additions & 1 deletion docs/user_guide/config_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,35 @@ The different running modes of `benchcab` are solely dependent on the options us

: NCI modules to use for compiling CABLE

### `multiprocessing`

: Enables or disables multiprocessing for executing embarrassingly parallel tasks.
: This key is **optional** and can be omitted from the config file. By default `multiprocessing` is set to `True`.

### `pbs`

: Settings specific to the PBS scheduler at NCI.

#### `ncpus`

: The number of CPU cores to allocate for the main job script, i.e. the `-l ncpus=<4>` PBS flag (see [PBS Directives Explained][nci-pbs-directives]).
: This key is **optional** and can be omitted from the config file. By default `ncpus` is set to `18`.

#### `mem`

: The total memory limit for the main job script, i.e. the `-l mem=<10GB>` PBS flag (see [PBS Directives Explained][nci-pbs-directives]).
: This key is **optional** and can be omitted from the config file. By default `mem` is set to `30GB`.

#### `walltime`

: The wall clock time limit for the main job script, i.e. `-l walltime=<HH:MM:SS>` PBS flag (see [PBS Directives Explained][nci-pbs-directives]).
: This key is **optional** and can be omitted from the config file. By default `walltime` is set to `6:00:00`.

#### `storage`

: A list of extra storage flags required for the main job script, i.e. `-l storage=<scratch/a00>` (see [PBS Directives Explained][nci-pbs-directives]).
: This key is **optional** and can be omitted from the config file. By default `storage` is set to `[]`.

## Simulations options

### `realisations`
Expand Down Expand Up @@ -138,4 +167,5 @@ science_configurations: [
[forty-two-me]: https://modelevaluation.org/experiment/display/urTKSXEsojdvEPwdR
[five-me]: https://modelevaluation.org/experiment/display/xNZx2hSvn4PMKAa9R
[f90nml-github]: https://github.com/marshallward/f90nml
[environment-modules]: https://modules.sourceforge.net/
[environment-modules]: https://modules.sourceforge.net/
[nci-pbs-directives]: https://opus.nci.org.au/display/Help/PBS+Directives+Explained
7 changes: 7 additions & 0 deletions tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ def get_mock_config() -> dict:
}
},
],
"pbs": {
"ncpus": 16,
"mem": "64G",
"walltime": "01:00:00",
"storage": ["gdata/foo123"],
},
"multiprocessing": True,
}
return config

Expand Down
72 changes: 72 additions & 0 deletions tests/test_bench_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,36 @@ def test_check_config():
config.pop("science_configurations")
check_config(config)

# Success case: test config without multiprocessing key is valid
config = get_mock_config()
config.pop("multiprocessing")
check_config(config)

# Success case: test config without pbs key is valid
config = get_mock_config()
config.pop("pbs")
check_config(config)

# Success case: test config without ncpus key is valid
config = get_mock_config()
config["pbs"].pop("ncpus")
check_config(config)

# Success case: test config without mem key is valid
config = get_mock_config()
config["pbs"].pop("mem")
check_config(config)

# Success case: test config without walltime key is valid
config = get_mock_config()
config["pbs"].pop("walltime")
check_config(config)

# Success case: test config without storage key is valid
config = get_mock_config()
config["pbs"].pop("storage")
check_config(config)

# Failure case: test missing required keys raises an exception
with pytest.raises(
ValueError,
Expand Down Expand Up @@ -207,6 +237,48 @@ def test_check_config():
config["science_configurations"] = [r"cable_user%GS_SWITCH = 'medlyn'"]
check_config(config)

# Failure case: type of config["pbs"] is not a dict
with pytest.raises(TypeError, match="The 'pbs' key must be a dictionary."):
config = get_mock_config()
config["pbs"] = "-l ncpus=16"
check_config(config)

# Failure case: type of config["pbs"]["ncpus"] is not an int
with pytest.raises(TypeError, match="The 'ncpus' key must be an integer."):
config = get_mock_config()
config["pbs"]["ncpus"] = "16"
check_config(config)

# Failure case: type of config["pbs"]["mem"] is not a string
with pytest.raises(TypeError, match="The 'mem' key must be a string."):
config = get_mock_config()
config["pbs"]["mem"] = 64
check_config(config)

# Failure case: type of config["pbs"]["walltime"] is not a string
with pytest.raises(TypeError, match="The 'walltime' key must be a string."):
config = get_mock_config()
config["pbs"]["walltime"] = 60
check_config(config)

# Failure case: type of config["pbs"]["storage"] is not a list
with pytest.raises(TypeError, match="The 'storage' key must be a list of strings."):
config = get_mock_config()
config["pbs"]["storage"] = "gdata/foo+gdata/bar"
check_config(config)

# Failure case: type of config["pbs"]["storage"] is not a list of strings
with pytest.raises(TypeError, match="The 'storage' key must be a list of strings."):
config = get_mock_config()
config["pbs"]["storage"] = [1, 2, 3]
check_config(config)

# Failure case: type of config["multiprocessing"] is not a bool
with pytest.raises(TypeError, match="The 'multiprocessing' key must be a boolean."):
config = get_mock_config()
config["multiprocessing"] = 1
check_config(config)


def test_read_config():
"""Tests for `read_config()`."""
Expand Down
Loading

0 comments on commit 08c04bb

Please sign in to comment.