From baabb4ae5da25d50f99a3165a28ba9e4fb5ceb35 Mon Sep 17 00:00:00 2001 From: ychiat35 Date: Thu, 10 Nov 2022 09:32:43 +0100 Subject: [PATCH] Extend OARCluster implementation to let OAR take into account the memory parameter (#595) * let OAR take into account the memory parameter * reformat test_oar.py * update following PR comments * let OAR take into account the memory parameter --- dask_jobqueue/jobqueue.yaml | 17 +++++---- dask_jobqueue/oar.py | 47 ++++++++++++++++++++++++ dask_jobqueue/tests/test_job.py | 16 ++++---- dask_jobqueue/tests/test_oar.py | 65 +++++++++++++++++++++++++++++++-- 4 files changed, 125 insertions(+), 20 deletions(-) diff --git a/dask_jobqueue/jobqueue.yaml b/dask_jobqueue/jobqueue.yaml index 6432c282..3ce3dc04 100644 --- a/dask_jobqueue/jobqueue.yaml +++ b/dask_jobqueue/jobqueue.yaml @@ -26,7 +26,8 @@ jobqueue: job-extra-directives: [] job-directives-skip: [] log-directory: null - + mem-core-property-name: null + # Scheduler options scheduler-options: {} @@ -57,7 +58,7 @@ jobqueue: job-extra-directives: [] job-directives-skip: [] log-directory: null - + # Scheduler options scheduler-options: {} @@ -88,7 +89,7 @@ jobqueue: job-directives-skip: [] log-directory: null resource-spec: null - + # Scheduler options scheduler-options: {} @@ -120,7 +121,7 @@ jobqueue: job-extra-directives: [] job-directives-skip: [] log-directory: null - + # Scheduler options scheduler-options: {} @@ -151,7 +152,7 @@ jobqueue: job-extra-directives: [] job-directives-skip: [] log-directory: null - + # Scheduler options scheduler-options: {} @@ -185,7 +186,7 @@ jobqueue: log-directory: null lsf-units: null use-stdin: True # (bool) How jobs are launched, i.e. 'bsub jobscript.sh' or 'bsub < jobscript.sh' - + # Scheduler options scheduler-options: {} @@ -215,7 +216,7 @@ jobqueue: cancel-command-extra: [] # Extra condor_rm arguments log-directory: null shebang: "#!/usr/bin/env condor_submit" - + # Scheduler options scheduler-options: {} @@ -239,6 +240,6 @@ jobqueue: job-extra-directives: [] job-directives-skip: [] log-directory: null - + # Scheduler options scheduler-options: {} diff --git a/dask_jobqueue/oar.py b/dask_jobqueue/oar.py index cce0e224..36125e11 100644 --- a/dask_jobqueue/oar.py +++ b/dask_jobqueue/oar.py @@ -1,5 +1,8 @@ import logging import shlex +import warnings + +from dask.utils import parse_bytes import dask @@ -24,6 +27,7 @@ def __init__( project=None, resource_spec=None, walltime=None, + mem_core_property_name=None, config_name=None, **base_class_kwargs ): @@ -76,6 +80,36 @@ def __init__( # Add extra header directives header_lines.extend(["#OAR %s" % arg for arg in self.job_extra_directives]) + # Memory + if self.worker_memory is not None: + if mem_core_property_name is None: + mem_core_property_name = dask.config.get( + "jobqueue.%s.mem-core-property-name" % self.config_name + ) + if mem_core_property_name is None: + warn = ( + "OAR Job memory reserved resources will not be set according to Dask Worker memory limit, " + "which can cause crashes." + ) + warnings.warn(warn, category=UserWarning) + else: + # OAR expects MiB as memory unit + oar_memory = int( + parse_bytes(self.worker_memory / self.worker_cores) / 2**20 + ) + # OAR needs to have the properties on a single line, with SQL syntax + # If there are several "#OAR -p" lines, only the last one will be taken into account by OAR + last_job_property = return_last_job_property(self.job_extra_directives) + if last_job_property is not None: + header_lines.append( + "#OAR -p '%s AND %s>=%s'" + % (last_job_property, mem_core_property_name, oar_memory) + ) + else: + header_lines.append( + "#OAR -p %s>=%s" % (mem_core_property_name, oar_memory) + ) + self.job_header = "\n".join(header_lines) logger.debug("Job script: \n %s" % self.job_script()) @@ -123,6 +157,12 @@ class OARCluster(JobQueueCluster): Deprecated: use ``job_extra_directives`` instead. This parameter will be removed in a future version. job_extra_directives : list List of other OAR options, for example `-t besteffort`. Each option will be prepended with the #OAR prefix. + mem_core_property_name : str + The memory per core property name of your OAR cluster (usually named `memcore` or `mem_core`). + Existing properties can be listed by executing `oarnodes` command. + Note that the memory per core property might not exist on your cluster. + In this case, do not specify a value for mem_core_property_name parameter. + If this parameter is None, you will be warned that the memory parameter will not be taken into account by OAR. Examples -------- @@ -140,3 +180,10 @@ class OARCluster(JobQueueCluster): job=job_parameters, cluster=cluster_parameters ) job_cls = OARJob + + +def return_last_job_property(job_extra_directives): + for directive in reversed(job_extra_directives): + if directive.startswith("-p"): + return directive.replace("-p ", "") + return None diff --git a/dask_jobqueue/tests/test_job.py b/dask_jobqueue/tests/test_job.py index 8ac62a3d..f24e1148 100644 --- a/dask_jobqueue/tests/test_job.py +++ b/dask_jobqueue/tests/test_job.py @@ -142,8 +142,8 @@ def test_header_lines_dont_skip_extra_directives(): def test_deprecation_header_skip(Cluster): import warnings - # test issuing of warning - warnings.simplefilter("always") + # test issuing of warning but ignore UserWarning + warnings.simplefilter("ignore", UserWarning) job_cls = Cluster.job_cls with warnings.catch_warnings(record=True) as w: @@ -240,8 +240,8 @@ def test_docstring_cluster(Cluster): def test_deprecation_env_extra(Cluster): import warnings - # test issuing of warning - warnings.simplefilter("always") + # test issuing of warning but ignore UserWarning + warnings.simplefilter("ignore", UserWarning) job_cls = Cluster.job_cls with warnings.catch_warnings(record=True) as w: @@ -304,8 +304,8 @@ def test_deprecation_env_extra(Cluster): def test_deprecation_extra(Cluster): import warnings - # test issuing of warning - warnings.simplefilter("always") + # test issuing of warning but ignore UserWarning + warnings.simplefilter("ignore", UserWarning) job_cls = Cluster.job_cls with warnings.catch_warnings(record=True) as w: @@ -371,8 +371,8 @@ def test_deprecation_job_extra(Cluster): import warnings - # test issuing of warning - warnings.simplefilter("always") + # test issuing of warning but ignore UserWarning + warnings.simplefilter("ignore", UserWarning) job_cls = Cluster.job_cls with warnings.catch_warnings(record=True) as w: diff --git a/dask_jobqueue/tests/test_oar.py b/dask_jobqueue/tests/test_oar.py index 20a4b8ee..abc24499 100644 --- a/dask_jobqueue/tests/test_oar.py +++ b/dask_jobqueue/tests/test_oar.py @@ -28,16 +28,42 @@ def test_header(): assert "#OAR -q regular" in cluster.job_header assert "#OAR -t besteffort" in cluster.job_header - with OARCluster(cores=4, memory="8GB") as cluster: + with OARCluster(cores=4, memory="8GB", mem_core_property_name="memcore") as cluster: assert "#OAR -n dask-worker" in cluster.job_header assert "walltime=" in cluster.job_header + assert "#OAR -p memcore" in cluster.job_header assert "#OAR --project" not in cluster.job_header assert "#OAR -q" not in cluster.job_header + with OARCluster( + walltime="00:02:00", + processes=4, + cores=8, + memory="8GiB", + mem_core_property_name="mem_core", + ) as cluster: + assert "#OAR -n dask-worker" in cluster.job_header + assert "#OAR -l /nodes=1/core=8,walltime=00:02:00" in cluster.job_header + assert "#OAR -p mem_core>=1024" in cluster.job_header + + with OARCluster( + cores=4, + memory="28MiB", + job_extra_directives=["-p gpu_count=1"], + mem_core_property_name="mem_core", + ) as cluster: + assert "#OAR -n dask-worker" in cluster.job_header + assert "walltime=" in cluster.job_header + assert "#OAR -p 'gpu_count=1 AND mem_core>=7'" in cluster.job_header + def test_job_script(): with OARCluster( - walltime="00:02:00", processes=4, cores=8, memory="28GB" + walltime="00:02:00", + processes=4, + cores=8, + memory="28GB", + mem_core_property_name="memcore", ) as cluster: job_script = cluster.job_script() assert "#OAR" in job_script @@ -45,11 +71,10 @@ def test_job_script(): formatted_bytes = format_bytes(parse_bytes("7GB")).replace(" ", "") assert f"--memory-limit {formatted_bytes}" in job_script assert "#OAR -l /nodes=1/core=8,walltime=00:02:00" in job_script + assert "#OAR -p memcore" in job_script assert "#OAR --project" not in job_script assert "#OAR -q" not in job_script - assert "export " not in job_script - assert ( "{} -m distributed.cli.dask_worker tcp://".format(sys.executable) in job_script @@ -118,8 +143,40 @@ def test_config_name_oar_takes_custom_config(): "job-cpu": None, "job-mem": None, "resource-spec": None, + "mem-core-property-name": "memcore", } with dask.config.set({"jobqueue.oar-config-name": conf}): with OARCluster(config_name="oar-config-name") as cluster: assert cluster.job_name == "myname" + + +def test_mem_core_property_name_none_warning(): + import warnings + + # test issuing of warning + warnings.simplefilter("always") + + job_cls = OARCluster.job_cls + with warnings.catch_warnings(record=True) as w: + # should give a warning + job = job_cls(cores=1, memory="1 GB") + assert len(w) == 1 + assert issubclass(w[0].category, UserWarning) + assert ( + "OAR Job memory reserved resources will not be set according to Dask Worker memory limit" + in str(w[0].message) + ) + job_script = job.job_script() + assert "#OAR -p" not in job_script + + with warnings.catch_warnings(record=True) as w: + # should not give a warning + job = job_cls( + cores=1, + memory="1 GB", + mem_core_property_name="memcore", + ) + assert len(w) == 0 + job_script = job.job_script() + assert "#OAR -p memcore" in job_script