Skip to content

Commit

Permalink
Extend OARCluster implementation to let OAR take into account the mem…
Browse files Browse the repository at this point in the history
…ory parameter (dask#595)

* let OAR take into account the memory parameter

* reformat test_oar.py

* update following PR comments

* let OAR take into account the memory parameter
  • Loading branch information
ychiat35 authored Nov 10, 2022
1 parent e02cf1d commit baabb4a
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 20 deletions.
17 changes: 9 additions & 8 deletions dask_jobqueue/jobqueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ jobqueue:
job-extra-directives: []
job-directives-skip: []
log-directory: null

mem-core-property-name: null

# Scheduler options
scheduler-options: {}

Expand Down Expand Up @@ -57,7 +58,7 @@ jobqueue:
job-extra-directives: []
job-directives-skip: []
log-directory: null

# Scheduler options
scheduler-options: {}

Expand Down Expand Up @@ -88,7 +89,7 @@ jobqueue:
job-directives-skip: []
log-directory: null
resource-spec: null

# Scheduler options
scheduler-options: {}

Expand Down Expand Up @@ -120,7 +121,7 @@ jobqueue:
job-extra-directives: []
job-directives-skip: []
log-directory: null

# Scheduler options
scheduler-options: {}

Expand Down Expand Up @@ -151,7 +152,7 @@ jobqueue:
job-extra-directives: []
job-directives-skip: []
log-directory: null

# Scheduler options
scheduler-options: {}

Expand Down Expand Up @@ -185,7 +186,7 @@ jobqueue:
log-directory: null
lsf-units: null
use-stdin: True # (bool) How jobs are launched, i.e. 'bsub jobscript.sh' or 'bsub < jobscript.sh'

# Scheduler options
scheduler-options: {}

Expand Down Expand Up @@ -215,7 +216,7 @@ jobqueue:
cancel-command-extra: [] # Extra condor_rm arguments
log-directory: null
shebang: "#!/usr/bin/env condor_submit"

# Scheduler options
scheduler-options: {}

Expand All @@ -239,6 +240,6 @@ jobqueue:
job-extra-directives: []
job-directives-skip: []
log-directory: null

# Scheduler options
scheduler-options: {}
47 changes: 47 additions & 0 deletions dask_jobqueue/oar.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import logging
import shlex
import warnings

from dask.utils import parse_bytes

import dask

Expand All @@ -24,6 +27,7 @@ def __init__(
project=None,
resource_spec=None,
walltime=None,
mem_core_property_name=None,
config_name=None,
**base_class_kwargs
):
Expand Down Expand Up @@ -76,6 +80,36 @@ def __init__(
# Add extra header directives
header_lines.extend(["#OAR %s" % arg for arg in self.job_extra_directives])

# Memory
if self.worker_memory is not None:
if mem_core_property_name is None:
mem_core_property_name = dask.config.get(
"jobqueue.%s.mem-core-property-name" % self.config_name
)
if mem_core_property_name is None:
warn = (
"OAR Job memory reserved resources will not be set according to Dask Worker memory limit, "
"which can cause crashes."
)
warnings.warn(warn, category=UserWarning)
else:
# OAR expects MiB as memory unit
oar_memory = int(
parse_bytes(self.worker_memory / self.worker_cores) / 2**20
)
# OAR needs to have the properties on a single line, with SQL syntax
# If there are several "#OAR -p" lines, only the last one will be taken into account by OAR
last_job_property = return_last_job_property(self.job_extra_directives)
if last_job_property is not None:
header_lines.append(
"#OAR -p '%s AND %s>=%s'"
% (last_job_property, mem_core_property_name, oar_memory)
)
else:
header_lines.append(
"#OAR -p %s>=%s" % (mem_core_property_name, oar_memory)
)

self.job_header = "\n".join(header_lines)

logger.debug("Job script: \n %s" % self.job_script())
Expand Down Expand Up @@ -123,6 +157,12 @@ class OARCluster(JobQueueCluster):
Deprecated: use ``job_extra_directives`` instead. This parameter will be removed in a future version.
job_extra_directives : list
List of other OAR options, for example `-t besteffort`. Each option will be prepended with the #OAR prefix.
mem_core_property_name : str
The memory per core property name of your OAR cluster (usually named `memcore` or `mem_core`).
Existing properties can be listed by executing `oarnodes` command.
Note that the memory per core property might not exist on your cluster.
In this case, do not specify a value for mem_core_property_name parameter.
If this parameter is None, you will be warned that the memory parameter will not be taken into account by OAR.
Examples
--------
Expand All @@ -140,3 +180,10 @@ class OARCluster(JobQueueCluster):
job=job_parameters, cluster=cluster_parameters
)
job_cls = OARJob


def return_last_job_property(job_extra_directives):
for directive in reversed(job_extra_directives):
if directive.startswith("-p"):
return directive.replace("-p ", "")
return None
16 changes: 8 additions & 8 deletions dask_jobqueue/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ def test_header_lines_dont_skip_extra_directives():
def test_deprecation_header_skip(Cluster):
import warnings

# test issuing of warning
warnings.simplefilter("always")
# test issuing of warning but ignore UserWarning
warnings.simplefilter("ignore", UserWarning)

job_cls = Cluster.job_cls
with warnings.catch_warnings(record=True) as w:
Expand Down Expand Up @@ -240,8 +240,8 @@ def test_docstring_cluster(Cluster):
def test_deprecation_env_extra(Cluster):
import warnings

# test issuing of warning
warnings.simplefilter("always")
# test issuing of warning but ignore UserWarning
warnings.simplefilter("ignore", UserWarning)

job_cls = Cluster.job_cls
with warnings.catch_warnings(record=True) as w:
Expand Down Expand Up @@ -304,8 +304,8 @@ def test_deprecation_env_extra(Cluster):
def test_deprecation_extra(Cluster):
import warnings

# test issuing of warning
warnings.simplefilter("always")
# test issuing of warning but ignore UserWarning
warnings.simplefilter("ignore", UserWarning)

job_cls = Cluster.job_cls
with warnings.catch_warnings(record=True) as w:
Expand Down Expand Up @@ -371,8 +371,8 @@ def test_deprecation_job_extra(Cluster):

import warnings

# test issuing of warning
warnings.simplefilter("always")
# test issuing of warning but ignore UserWarning
warnings.simplefilter("ignore", UserWarning)

job_cls = Cluster.job_cls
with warnings.catch_warnings(record=True) as w:
Expand Down
65 changes: 61 additions & 4 deletions dask_jobqueue/tests/test_oar.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,53 @@ def test_header():
assert "#OAR -q regular" in cluster.job_header
assert "#OAR -t besteffort" in cluster.job_header

with OARCluster(cores=4, memory="8GB") as cluster:
with OARCluster(cores=4, memory="8GB", mem_core_property_name="memcore") as cluster:
assert "#OAR -n dask-worker" in cluster.job_header
assert "walltime=" in cluster.job_header
assert "#OAR -p memcore" in cluster.job_header
assert "#OAR --project" not in cluster.job_header
assert "#OAR -q" not in cluster.job_header

with OARCluster(
walltime="00:02:00",
processes=4,
cores=8,
memory="8GiB",
mem_core_property_name="mem_core",
) as cluster:
assert "#OAR -n dask-worker" in cluster.job_header
assert "#OAR -l /nodes=1/core=8,walltime=00:02:00" in cluster.job_header
assert "#OAR -p mem_core>=1024" in cluster.job_header

with OARCluster(
cores=4,
memory="28MiB",
job_extra_directives=["-p gpu_count=1"],
mem_core_property_name="mem_core",
) as cluster:
assert "#OAR -n dask-worker" in cluster.job_header
assert "walltime=" in cluster.job_header
assert "#OAR -p 'gpu_count=1 AND mem_core>=7'" in cluster.job_header


def test_job_script():
with OARCluster(
walltime="00:02:00", processes=4, cores=8, memory="28GB"
walltime="00:02:00",
processes=4,
cores=8,
memory="28GB",
mem_core_property_name="memcore",
) as cluster:
job_script = cluster.job_script()
assert "#OAR" in job_script
assert "#OAR -n dask-worker" in job_script
formatted_bytes = format_bytes(parse_bytes("7GB")).replace(" ", "")
assert f"--memory-limit {formatted_bytes}" in job_script
assert "#OAR -l /nodes=1/core=8,walltime=00:02:00" in job_script
assert "#OAR -p memcore" in job_script
assert "#OAR --project" not in job_script
assert "#OAR -q" not in job_script

assert "export " not in job_script

assert (
"{} -m distributed.cli.dask_worker tcp://".format(sys.executable)
in job_script
Expand Down Expand Up @@ -118,8 +143,40 @@ def test_config_name_oar_takes_custom_config():
"job-cpu": None,
"job-mem": None,
"resource-spec": None,
"mem-core-property-name": "memcore",
}

with dask.config.set({"jobqueue.oar-config-name": conf}):
with OARCluster(config_name="oar-config-name") as cluster:
assert cluster.job_name == "myname"


def test_mem_core_property_name_none_warning():
import warnings

# test issuing of warning
warnings.simplefilter("always")

job_cls = OARCluster.job_cls
with warnings.catch_warnings(record=True) as w:
# should give a warning
job = job_cls(cores=1, memory="1 GB")
assert len(w) == 1
assert issubclass(w[0].category, UserWarning)
assert (
"OAR Job memory reserved resources will not be set according to Dask Worker memory limit"
in str(w[0].message)
)
job_script = job.job_script()
assert "#OAR -p" not in job_script

with warnings.catch_warnings(record=True) as w:
# should not give a warning
job = job_cls(
cores=1,
memory="1 GB",
mem_core_property_name="memcore",
)
assert len(w) == 0
job_script = job.job_script()
assert "#OAR -p memcore" in job_script

0 comments on commit baabb4a

Please sign in to comment.