From bd86e21d509c9ab70f81f69651e36c5806e6b759 Mon Sep 17 00:00:00 2001 From: Adrien Ali Taiga Date: Mon, 31 Jul 2017 15:36:31 -0400 Subject: [PATCH 01/36] Added test for slurm integration --- test_pbs_slurm.py | 73 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 test_pbs_slurm.py diff --git a/test_pbs_slurm.py b/test_pbs_slurm.py new file mode 100644 index 0000000..fe9050e --- /dev/null +++ b/test_pbs_slurm.py @@ -0,0 +1,73 @@ +import time +import unittest + + +from subprocess import Popen, PIPE +from nose.tools import assert_equal, assert_true + +pbs_string = """\ +#!/usr/bin/env /bin/bash + +#PBS -N arrayJob +#PBS -o arrayJob_%A_%a.out +#PBS -t 1-5 +#PBS -l walltime=01:00:00 +#PBS -l naccelerators=1 +#PBS -l proc={cons} +#PBS -M {mail} +#SBATCH --qos {priority} + +###################### +# Begin work section # +###################### + +echo "My SLURM_ARRAY_JOB_ID:" $SLURM_ARRAY_JOB_ID +echo "My SLURM_ARRAY_TASK_ID: " $SLURM_ARRAY_TASK_ID +nvidia-smi +""" + +sbatch_string = """\ +#!/usr/bin/env -i /bin/zsh + +#SBATCH --job-name=arrayJob +#SBATCH --output=arrayJob_%A_%a.out +#SBATCH --array=0-5 +#SBATCH --time=01:00:00 +#SBATCH --gres=gpu +#SBATCH --constraint=gpu6gb + +###################### +# Begin work section # +###################### + +echo "My SLURM_ARRAY_JOB_ID:" $SLURM_ARRAY_JOB_ID +echo "My SLURM_ARRAY_TASK_ID: " $SLURM_ARRAY_TASK_ID +nvidia-smi +""" + +class TestSlurm(unittest.TestCase): + + def test_pbs_slurm(self): + priorities = ['unkillable', 'high', 'low'] + # doesn't test 12 and 24 to avoid killing jobs just for testing + constraints = ['gpulowmem', 'gpu6gb', 'gpu8gb'] + mem = ['2G', '4G', '6G'] + mail = 'adrien.alitaiga@gmail.com' + # gpus = ['titanblack'] + + for priority in priorities: + for cons in constraints: + for m in mem: + string = pbs_string.format(**{ + 'cons': cons, + 'mail': mail, + 'priority': priority + }) + with open("test.pbs", "w") as text_file: + text_file.write(string) + process = Popen("sbatch test.pbs", stdout=PIPE, stderr=PIPE, shell=True) + time.sleep(1) + + stdout, stderr = process.communicate() + print stdout + assert_true("Submitted batch job" in stdout) From 1168e3e0fe574e132afc75e4876145f1442340a7 Mon Sep 17 00:00:00 2001 From: Adrien Ali Taiga Date: Tue, 8 Aug 2017 11:51:22 -0400 Subject: [PATCH 02/36] New test for priority --- array.pbs | 16 ++++ array.sbatch | 16 ++++ smartdispatch/tests/pbs_slurm_test.py | 116 ++++++++++++++++++++++++++ test_pbs_slurm.py | 73 ---------------- 4 files changed, 148 insertions(+), 73 deletions(-) create mode 100644 array.pbs create mode 100644 array.sbatch create mode 100644 smartdispatch/tests/pbs_slurm_test.py delete mode 100644 test_pbs_slurm.py diff --git a/array.pbs b/array.pbs new file mode 100644 index 0000000..4e4ff7a --- /dev/null +++ b/array.pbs @@ -0,0 +1,16 @@ +#!/usr/bin/env /bin/bash + +#PBS -N arrayJob +#PBS -o arrayJob_%A_%a.out +#PBS -t 1-5 +#PBS -l walltime=01:00:00 +#PBS -l naccelerators=1 +#PBS -l proc=gpu6gb + +###################### +# Begin work section # +###################### + +echo "My SLURM_ARRAY_JOB_ID:" $SLURM_ARRAY_JOB_ID +echo "My SLURM_ARRAY_TASK_ID: " $SLURM_ARRAY_TASK_ID +nvidia-smi diff --git a/array.sbatch b/array.sbatch new file mode 100644 index 0000000..785bea4 --- /dev/null +++ b/array.sbatch @@ -0,0 +1,16 @@ +#!/usr/bin/env -i /bin/zsh + +#SBATCH --job-name=arrayJob +#SBATCH --output=arrayJob_%A_%a.out +#SBATCH --array=0-5 +#SBATCH --time=01:00:00 +#SBATCH --gres=gpu +#SBATCH --constraint=gpu6gb + +###################### +# Begin work section # +###################### + +echo "My SLURM_ARRAY_JOB_ID:" $SLURM_ARRAY_JOB_ID +echo "My SLURM_ARRAY_TASK_ID: " $SLURM_ARRAY_TASK_ID +nvidia-smi diff --git a/smartdispatch/tests/pbs_slurm_test.py b/smartdispatch/tests/pbs_slurm_test.py new file mode 100644 index 0000000..42cccd7 --- /dev/null +++ b/smartdispatch/tests/pbs_slurm_test.py @@ -0,0 +1,116 @@ +import time +import unittest + + +from subprocess import Popen, PIPE +from nose.tools import assert_equal, assert_true + +pbs_string = """\ +#!/usr/bin/env /bin/bash + +#PBS -N arrayJob +#PBS -o arrayJob_%A_%a.out +#PBS -t 1-2 +#PBS -l walltime=01:00:00 +#PBS -l naccelerators=1 +{} + +###################### +# Begin work section # +###################### + +echo "My SLURM_ARRAY_JOB_ID:" $SLURM_ARRAY_JOB_ID +echo "My SLURM_ARRAY_TASK_ID: " $SLURM_ARRAY_TASK_ID +nvidia-smi +""" + +sbatch_string = """\ +#!/usr/bin/env -i /bin/zsh + +#SBATCH --job-name=arrayJob +#SBATCH --output=arrayJob_%A_%a.out +#SBATCH --array=0-5 +#SBATCH --time=01:00:00 +#SBATCH --gres=gpu +#SBATCH --constraint=gpu6gb + +###################### +# Begin work section # +###################### + +echo "My SLURM_ARRAY_JOB_ID:" $SLURM_ARRAY_JOB_ID +echo "My SLURM_ARRAY_TASK_ID: " $SLURM_ARRAY_TASK_ID +nvidia-smi +""" + + +class TestSlurm(unittest.TestCase): + + def test_priority(self): + priorities = ['high', 'low'] + for priority in priorities: + string = pbs_string.format( + "#SBATCH --qos={priority}".format(priority=priority) + ) + with open("test.pbs", "w") as text_file: + text_file.write(string) + process = Popen("sbatch test.pbs", stdout=PIPE, stderr=PIPE, shell=True) + stdout, stderr = process.communicate() + assert_true("Submitted batch job" in stdout) + job_id = stdout.split(" ")[-1] + + time.sleep(0.25) + process = Popen("squeue -u $USER -O qos", stdout=PIPE, stderr=PIPE, shell=True) + stdout, stderr = process.communicate() + job_priorities = [prio.strip() for prio in stdout.split("\n")[1:] if prio != ''] + assert_true(all(prio == priority for prio in job_priorities)) + + # def test_pbs_slurm(self): + # priorities = ['unkillable', 'high', 'low'] + # # doesn't test 12 and 24 to avoid killing jobs just for testing + # constraints = ['gpulowmem', 'gpu6gb', 'gpu8gb'] + # mem = ['2G', '4G', '6G'] + # mail = 'adrien.alitaiga@gmail.com' + # # gpus = ['titanblack'] + # + # for priority in priorities: + # for cons in constraints: + # for m in mem: + # string = pbs_string.format(**{ + # 'cons': cons, + # 'mail': mail, + # 'priority': priority + # }) + # with open("test.pbs", "w") as text_file: + # text_file.write(string) + # process = Popen("sbatch test.pbs", stdout=PIPE, stderr=PIPE, shell=True) + # time.sleep(1) + # + # stdout, stderr = process.communicate() + # print stdout + # assert_true("Submitted batch job" in stdout) + + +pbs_string2 = """\ +#!/usr/bin/env /bin/bash + +#PBS -N arrayJob +#PBS -o arrayJob_%A_%a.out +#PBS -t 1-5 +#PBS -l walltime=01:00:00 +#PBS -l naccelerators=1 +#PBS -l proc={cons} +#PBS -M {mail} +#SBATCH --qos {priority} + +###################### +# Begin work section # +###################### + +echo "My SLURM_ARRAY_JOB_ID:" $SLURM_ARRAY_JOB_ID +echo "My SLURM_ARRAY_TASK_ID: " $SLURM_ARRAY_TASK_ID +nvidia-smi +""" + +if __name__ == '__main__': + unittest.main() diff --git a/test_pbs_slurm.py b/test_pbs_slurm.py deleted file mode 100644 index fe9050e..0000000 --- a/test_pbs_slurm.py +++ /dev/null @@ -1,73 +0,0 @@ -import time -import unittest - - -from subprocess import Popen, PIPE -from nose.tools import assert_equal, assert_true - -pbs_string = """\ -#!/usr/bin/env /bin/bash - -#PBS -N arrayJob -#PBS -o arrayJob_%A_%a.out -#PBS -t 1-5 -#PBS -l walltime=01:00:00 -#PBS -l naccelerators=1 -#PBS -l proc={cons} -#PBS -M {mail} -#SBATCH --qos {priority} - -###################### -# Begin work section # -###################### - -echo "My SLURM_ARRAY_JOB_ID:" $SLURM_ARRAY_JOB_ID -echo "My SLURM_ARRAY_TASK_ID: " $SLURM_ARRAY_TASK_ID -nvidia-smi -""" - -sbatch_string = """\ -#!/usr/bin/env -i /bin/zsh - -#SBATCH --job-name=arrayJob -#SBATCH --output=arrayJob_%A_%a.out -#SBATCH --array=0-5 -#SBATCH --time=01:00:00 -#SBATCH --gres=gpu -#SBATCH --constraint=gpu6gb - -###################### -# Begin work section # -###################### - -echo "My SLURM_ARRAY_JOB_ID:" $SLURM_ARRAY_JOB_ID -echo "My SLURM_ARRAY_TASK_ID: " $SLURM_ARRAY_TASK_ID -nvidia-smi -""" - -class TestSlurm(unittest.TestCase): - - def test_pbs_slurm(self): - priorities = ['unkillable', 'high', 'low'] - # doesn't test 12 and 24 to avoid killing jobs just for testing - constraints = ['gpulowmem', 'gpu6gb', 'gpu8gb'] - mem = ['2G', '4G', '6G'] - mail = 'adrien.alitaiga@gmail.com' - # gpus = ['titanblack'] - - for priority in priorities: - for cons in constraints: - for m in mem: - string = pbs_string.format(**{ - 'cons': cons, - 'mail': mail, - 'priority': priority - }) - with open("test.pbs", "w") as text_file: - text_file.write(string) - process = Popen("sbatch test.pbs", stdout=PIPE, stderr=PIPE, shell=True) - time.sleep(1) - - stdout, stderr = process.communicate() - print stdout - assert_true("Submitted batch job" in stdout) From 243b19179509da971938e50b531167e45c9b76da Mon Sep 17 00:00:00 2001 From: Adrien Ali Taiga Date: Tue, 8 Aug 2017 12:43:12 -0400 Subject: [PATCH 03/36] Added gres + memory tests --- smartdispatch/tests/pbs_slurm_test.py | 42 +++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/smartdispatch/tests/pbs_slurm_test.py b/smartdispatch/tests/pbs_slurm_test.py index 42cccd7..6fae910 100644 --- a/smartdispatch/tests/pbs_slurm_test.py +++ b/smartdispatch/tests/pbs_slurm_test.py @@ -60,10 +60,48 @@ def test_priority(self): job_id = stdout.split(" ")[-1] time.sleep(0.25) - process = Popen("squeue -u $USER -O qos", stdout=PIPE, stderr=PIPE, shell=True) + process = Popen("squeue -u $USER -j {} -O qos".format(job_id), stdout=PIPE, stderr=PIPE, shell=True) stdout, stderr = process.communicate() job_priorities = [prio.strip() for prio in stdout.split("\n")[1:] if prio != ''] - assert_true(all(prio == priority for prio in job_priorities)) + assert_true(all(pri == priority for pri in job_priorities)) + + def test_gres(self): + gress = ['gpu', 'gpu:titanblack'] + for gres in gress: + string = pbs_string.format( + "#PBS -l gpu={gres}".format(gres=gres) + ) + with open("test.pbs", "w") as text_file: + text_file.write(string) + process = Popen("sbatch test.pbs", stdout=PIPE, stderr=PIPE, shell=True) + stdout, stderr = process.communicate() + assert_true("Submitted batch job" in stdout) + job_id = stdout.split(" ")[-1] + + time.sleep(0.25) + process = Popen("squeue -u $USER -j {} -O gres".format(job_id), stdout=PIPE, stderr=PIPE, shell=True) + stdout, stderr = process.communicate() + job_gres = [gre.strip() for gre in stdout.split("\n")[1:] if gre != ''] + assert_true(all(gpu == gres for gpu in job_gres)) + + def test_memory(self): + mems = ['2G', '4G'] + for mem in mems: + string = pbs_string.format( + "#PBS -l mem_free={memory}".format(memory=mem) + ) + with open("test.pbs", "w") as text_file: + text_file.write(string) + process = Popen("sbatch test.pbs", stdout=PIPE, stderr=PIPE, shell=True) + stdout, stderr = process.communicate() + assert_true("Submitted batch job" in stdout) + job_id = stdout.split(" ")[-1] + + time.sleep(0.25) + process = Popen("squeue -u $USER -j {} -O minmemory".format(job_id), stdout=PIPE, stderr=PIPE, shell=True) + stdout, stderr = process.communicate() + job_mems = [m.strip() for m in stdout.split("\n")[1:] if m != ''] + assert_true(all(me == mem for me in job_mems)) # def test_pbs_slurm(self): # priorities = ['unkillable', 'high', 'low'] From 0a162434525a9c202defb3378d4ff856109f5c78 Mon Sep 17 00:00:00 2001 From: Adrien Ali Taiga Date: Wed, 9 Aug 2017 11:01:20 -0400 Subject: [PATCH 04/36] Refactored tests --- smartdispatch/tests/pbs_slurm_test.py | 124 ++++++++++---------------- 1 file changed, 47 insertions(+), 77 deletions(-) diff --git a/smartdispatch/tests/pbs_slurm_test.py b/smartdispatch/tests/pbs_slurm_test.py index 6fae910..f4cd65b 100644 --- a/smartdispatch/tests/pbs_slurm_test.py +++ b/smartdispatch/tests/pbs_slurm_test.py @@ -12,7 +12,6 @@ #PBS -o arrayJob_%A_%a.out #PBS -t 1-2 #PBS -l walltime=01:00:00 -#PBS -l naccelerators=1 {} ###################### @@ -43,90 +42,61 @@ nvidia-smi """ +def test_param(self, param_array, command, flag, string=pbs_string): + for param in param_array: + command = pbs_string.format( + string.format(command.format(param)) + ) + with open("test.pbs", "w") as text_file: + text_file.write(command) + process = Popen("sbatch test.pbs", stdout=PIPE, stderr=PIPE, shell=True) + stdout, stderr = process.communicate() + assert_true("Submitted batch job" in stdout) + job_id = stdout.split(" ")[-1].strip() + + time.sleep(0.25) + process = Popen("squeue -u $USER -j {} -O {}".format(job_id, flag), stdout=PIPE, stderr=PIPE, shell=True) + stdout, stderr = process.communicate() + job_params = [c.strip() for c in stdout.split("\n")[1:] if c != ''] + assert_true(all(p == param for p in job_params)) class TestSlurm(unittest.TestCase): + def tearDown(self): + process = Popen("rm *.out", stdout=PIPE, stderr=PIPE, shell=True) + stdout, stderr = process.communicate() + def test_priority(self): - priorities = ['high', 'low'] - for priority in priorities: - string = pbs_string.format( - "#SBATCH --qos={priority}".format(priority=priority) - ) - with open("test.pbs", "w") as text_file: - text_file.write(string) - process = Popen("sbatch test.pbs", stdout=PIPE, stderr=PIPE, shell=True) - stdout, stderr = process.communicate() - assert_true("Submitted batch job" in stdout) - job_id = stdout.split(" ")[-1] - - time.sleep(0.25) - process = Popen("squeue -u $USER -j {} -O qos".format(job_id), stdout=PIPE, stderr=PIPE, shell=True) - stdout, stderr = process.communicate() - job_priorities = [prio.strip() for prio in stdout.split("\n")[1:] if prio != ''] - assert_true(all(pri == priority for pri in job_priorities)) + test_param( + ['high', 'low'], + "#SBATCH --qos={}", + "qos", + pbs_string + ) def test_gres(self): - gress = ['gpu', 'gpu:titanblack'] - for gres in gress: - string = pbs_string.format( - "#PBS -l gpu={gres}".format(gres=gres) - ) - with open("test.pbs", "w") as text_file: - text_file.write(string) - process = Popen("sbatch test.pbs", stdout=PIPE, stderr=PIPE, shell=True) - stdout, stderr = process.communicate() - assert_true("Submitted batch job" in stdout) - job_id = stdout.split(" ")[-1] - - time.sleep(0.25) - process = Popen("squeue -u $USER -j {} -O gres".format(job_id), stdout=PIPE, stderr=PIPE, shell=True) - stdout, stderr = process.communicate() - job_gres = [gre.strip() for gre in stdout.split("\n")[1:] if gre != ''] - assert_true(all(gpu == gres for gpu in job_gres)) + test_param( + ['titanblack], + "#PBS -l naccelerators={}", + "gres", + pbs_string + ) def test_memory(self): - mems = ['2G', '4G'] - for mem in mems: - string = pbs_string.format( - "#PBS -l mem_free={memory}".format(memory=mem) - ) - with open("test.pbs", "w") as text_file: - text_file.write(string) - process = Popen("sbatch test.pbs", stdout=PIPE, stderr=PIPE, shell=True) - stdout, stderr = process.communicate() - assert_true("Submitted batch job" in stdout) - job_id = stdout.split(" ")[-1] - - time.sleep(0.25) - process = Popen("squeue -u $USER -j {} -O minmemory".format(job_id), stdout=PIPE, stderr=PIPE, shell=True) - stdout, stderr = process.communicate() - job_mems = [m.strip() for m in stdout.split("\n")[1:] if m != ''] - assert_true(all(me == mem for me in job_mems)) - - # def test_pbs_slurm(self): - # priorities = ['unkillable', 'high', 'low'] - # # doesn't test 12 and 24 to avoid killing jobs just for testing - # constraints = ['gpulowmem', 'gpu6gb', 'gpu8gb'] - # mem = ['2G', '4G', '6G'] - # mail = 'adrien.alitaiga@gmail.com' - # # gpus = ['titanblack'] - # - # for priority in priorities: - # for cons in constraints: - # for m in mem: - # string = pbs_string.format(**{ - # 'cons': cons, - # 'mail': mail, - # 'priority': priority - # }) - # with open("test.pbs", "w") as text_file: - # text_file.write(string) - # process = Popen("sbatch test.pbs", stdout=PIPE, stderr=PIPE, shell=True) - # time.sleep(1) - # - # stdout, stderr = process.communicate() - # print stdout - # assert_true("Submitted batch job" in stdout) + test_param( + ['2G', '4G'], + "#PBS -l mem={}", + "minmemory", + pbs_string + ) + + def test_nb_cpus(self): + test_param( + ["1", "2"], + "PBS -l ncpus={}", + "cpuspertask", # or numcpus + pbs_string + ) pbs_string2 = """\ From 6f3148993a500e8a09f2a1f0b5397af2100d8ac4 Mon Sep 17 00:00:00 2001 From: Adrien Ali Taiga Date: Thu, 10 Aug 2017 13:25:29 -0400 Subject: [PATCH 05/36] small update --- array.pbs | 16 -------- array.sbatch | 16 -------- smartdispatch/tests/pbs_slurm_test.py | 59 +++++++++++++++------------ 3 files changed, 34 insertions(+), 57 deletions(-) delete mode 100644 array.pbs delete mode 100644 array.sbatch diff --git a/array.pbs b/array.pbs deleted file mode 100644 index 4e4ff7a..0000000 --- a/array.pbs +++ /dev/null @@ -1,16 +0,0 @@ -#!/usr/bin/env /bin/bash - -#PBS -N arrayJob -#PBS -o arrayJob_%A_%a.out -#PBS -t 1-5 -#PBS -l walltime=01:00:00 -#PBS -l naccelerators=1 -#PBS -l proc=gpu6gb - -###################### -# Begin work section # -###################### - -echo "My SLURM_ARRAY_JOB_ID:" $SLURM_ARRAY_JOB_ID -echo "My SLURM_ARRAY_TASK_ID: " $SLURM_ARRAY_TASK_ID -nvidia-smi diff --git a/array.sbatch b/array.sbatch deleted file mode 100644 index 785bea4..0000000 --- a/array.sbatch +++ /dev/null @@ -1,16 +0,0 @@ -#!/usr/bin/env -i /bin/zsh - -#SBATCH --job-name=arrayJob -#SBATCH --output=arrayJob_%A_%a.out -#SBATCH --array=0-5 -#SBATCH --time=01:00:00 -#SBATCH --gres=gpu -#SBATCH --constraint=gpu6gb - -###################### -# Begin work section # -###################### - -echo "My SLURM_ARRAY_JOB_ID:" $SLURM_ARRAY_JOB_ID -echo "My SLURM_ARRAY_TASK_ID: " $SLURM_ARRAY_TASK_ID -nvidia-smi diff --git a/smartdispatch/tests/pbs_slurm_test.py b/smartdispatch/tests/pbs_slurm_test.py index f4cd65b..00bd549 100644 --- a/smartdispatch/tests/pbs_slurm_test.py +++ b/smartdispatch/tests/pbs_slurm_test.py @@ -42,10 +42,10 @@ nvidia-smi """ -def test_param(self, param_array, command, flag, string=pbs_string): +def test_param(param_array, com, flag, string=pbs_string): for param in param_array: command = pbs_string.format( - string.format(command.format(param)) + string.format(com.format(param)) ) with open("test.pbs", "w") as text_file: text_file.write(command) @@ -58,12 +58,13 @@ def test_param(self, param_array, command, flag, string=pbs_string): process = Popen("squeue -u $USER -j {} -O {}".format(job_id, flag), stdout=PIPE, stderr=PIPE, shell=True) stdout, stderr = process.communicate() job_params = [c.strip() for c in stdout.split("\n")[1:] if c != ''] + # import ipdb; ipdb.set_trace() assert_true(all(p == param for p in job_params)) class TestSlurm(unittest.TestCase): def tearDown(self): - process = Popen("rm *.out", stdout=PIPE, stderr=PIPE, shell=True) + process = Popen("rm *.out test.pbs", stdout=PIPE, stderr=PIPE, shell=True) stdout, stderr = process.communicate() def test_priority(self): @@ -76,7 +77,7 @@ def test_priority(self): def test_gres(self): test_param( - ['titanblack], + ['titanblack'], "#PBS -l naccelerators={}", "gres", pbs_string @@ -94,31 +95,39 @@ def test_nb_cpus(self): test_param( ["1", "2"], "PBS -l ncpus={}", - "cpuspertask", # or numcpus + "numcpus", pbs_string ) + def test_constraint(self): + test_param( + ["gpu6gb", "gpu8gb"], + "PBS -l proc={}", + "feature", + pbs_string + ) -pbs_string2 = """\ -#!/usr/bin/env /bin/bash - -#PBS -N arrayJob -#PBS -o arrayJob_%A_%a.out -#PBS -t 1-5 -#PBS -l walltime=01:00:00 -#PBS -l naccelerators=1 -#PBS -l proc={cons} -#PBS -M {mail} -#SBATCH --qos {priority} - -###################### -# Begin work section # -###################### - -echo "My SLURM_ARRAY_JOB_ID:" $SLURM_ARRAY_JOB_ID -echo "My SLURM_ARRAY_TASK_ID: " $SLURM_ARRAY_TASK_ID -nvidia-smi -""" +# +# pbs_string2 = """\ +# #!/usr/bin/env /bin/bash +# +# #PBS -N arrayJob +# #PBS -o arrayJob_%A_%a.out +# #PBS -t 1-5 +# #PBS -l walltime=01:00:00 +# #PBS -l naccelerators=1 +# #PBS -l proc={cons} +# #PBS -M {mail} +# #SBATCH --qos {priority} +# +# ###################### +# # Begin work section # +# ###################### +# +# echo "My SLURM_ARRAY_JOB_ID:" $SLURM_ARRAY_JOB_ID +# echo "My SLURM_ARRAY_TASK_ID: " $SLURM_ARRAY_TASK_ID +# nvidia-smi +# """ if __name__ == '__main__': unittest.main() From 3b0dd0bba9c84e23111c2f7e872b8077c2c42131 Mon Sep 17 00:00:00 2001 From: Adrien Ali Taiga Date: Fri, 1 Sep 2017 16:48:27 -0400 Subject: [PATCH 06/36] Python3 compatibility + PR comments --- smartdispatch/tests/pbs_slurm_test.py | 91 +++++++++++---------------- 1 file changed, 36 insertions(+), 55 deletions(-) diff --git a/smartdispatch/tests/pbs_slurm_test.py b/smartdispatch/tests/pbs_slurm_test.py index 00bd549..2283970 100644 --- a/smartdispatch/tests/pbs_slurm_test.py +++ b/smartdispatch/tests/pbs_slurm_test.py @@ -1,16 +1,15 @@ +from glob import glob +import os import time import unittest - from subprocess import Popen, PIPE -from nose.tools import assert_equal, assert_true pbs_string = """\ #!/usr/bin/env /bin/bash #PBS -N arrayJob #PBS -o arrayJob_%A_%a.out -#PBS -t 1-2 #PBS -l walltime=01:00:00 {} @@ -28,10 +27,10 @@ #SBATCH --job-name=arrayJob #SBATCH --output=arrayJob_%A_%a.out -#SBATCH --array=0-5 #SBATCH --time=01:00:00 #SBATCH --gres=gpu #SBATCH --constraint=gpu6gb +{} ###################### # Begin work section # @@ -42,33 +41,36 @@ nvidia-smi """ -def test_param(param_array, com, flag, string=pbs_string): - for param in param_array: - command = pbs_string.format( - string.format(com.format(param)) - ) - with open("test.pbs", "w") as text_file: - text_file.write(command) - process = Popen("sbatch test.pbs", stdout=PIPE, stderr=PIPE, shell=True) - stdout, stderr = process.communicate() - assert_true("Submitted batch job" in stdout) - job_id = stdout.split(" ")[-1].strip() - - time.sleep(0.25) - process = Popen("squeue -u $USER -j {} -O {}".format(job_id, flag), stdout=PIPE, stderr=PIPE, shell=True) - stdout, stderr = process.communicate() - job_params = [c.strip() for c in stdout.split("\n")[1:] if c != ''] - # import ipdb; ipdb.set_trace() - assert_true(all(p == param for p in job_params)) class TestSlurm(unittest.TestCase): def tearDown(self): - process = Popen("rm *.out test.pbs", stdout=PIPE, stderr=PIPE, shell=True) - stdout, stderr = process.communicate() + for file_name in (glob('*.out') + ["test.pbs"]): + os.remove(file_name) + + def _test_param(self, param_array, com, flag, string=pbs_string): + for param in param_array: + command = pbs_string.format( + string.format(com.format(param)) + ) + with open("test.pbs", "w") as text_file: + text_file.write(command) + process = Popen("sbatch test.pbs", stdout=PIPE, stderr=PIPE, shell=True) + stdout, _ = process.communicate() + stdout = stdout.decode() + print(stdout) + self.assertIn("Submitted batch job", stdout) + job_id = stdout.split(" ")[-1].strip() + + time.sleep(0.25) + process = Popen("squeue -u $USER -j {} -O {}".format(job_id, flag), stdout=PIPE, stderr=PIPE, shell=True) + stdout, _ = process.communicate() + job_params = [c.strip() for c in stdout.decode().split("\n")[1:] if c != ''] + # import ipdb; ipdb.set_trace() + self.assertSequenceEqual(job_params, [param for _ in range(len(job_params))]) def test_priority(self): - test_param( + self._test_param( ['high', 'low'], "#SBATCH --qos={}", "qos", @@ -76,15 +78,15 @@ def test_priority(self): ) def test_gres(self): - test_param( - ['titanblack'], + self._test_param( + ['k80'], "#PBS -l naccelerators={}", "gres", pbs_string ) def test_memory(self): - test_param( + self._test_param( ['2G', '4G'], "#PBS -l mem={}", "minmemory", @@ -92,42 +94,21 @@ def test_memory(self): ) def test_nb_cpus(self): - test_param( - ["1", "2"], - "PBS -l ncpus={}", + self._test_param( + ["2", "3"], + "#PBS -l mppdepth={}", + # "#SBATCH --cpus-per-task={}", "numcpus", pbs_string ) def test_constraint(self): - test_param( + self._test_param( ["gpu6gb", "gpu8gb"], - "PBS -l proc={}", + "#PBS -l proc={}", "feature", pbs_string ) -# -# pbs_string2 = """\ -# #!/usr/bin/env /bin/bash -# -# #PBS -N arrayJob -# #PBS -o arrayJob_%A_%a.out -# #PBS -t 1-5 -# #PBS -l walltime=01:00:00 -# #PBS -l naccelerators=1 -# #PBS -l proc={cons} -# #PBS -M {mail} -# #SBATCH --qos {priority} -# -# ###################### -# # Begin work section # -# ###################### -# -# echo "My SLURM_ARRAY_JOB_ID:" $SLURM_ARRAY_JOB_ID -# echo "My SLURM_ARRAY_TASK_ID: " $SLURM_ARRAY_TASK_ID -# nvidia-smi -# """ - if __name__ == '__main__': unittest.main() From e8e6ec0f67ec4e1465f68db2f0414261fe6dc0b1 Mon Sep 17 00:00:00 2001 From: Adrien Ali Taiga Date: Fri, 1 Sep 2017 17:22:07 -0400 Subject: [PATCH 07/36] Fixed naccelerators issue --- smartdispatch/tests/pbs_slurm_test.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/smartdispatch/tests/pbs_slurm_test.py b/smartdispatch/tests/pbs_slurm_test.py index 2283970..4a8b020 100644 --- a/smartdispatch/tests/pbs_slurm_test.py +++ b/smartdispatch/tests/pbs_slurm_test.py @@ -28,8 +28,6 @@ #SBATCH --job-name=arrayJob #SBATCH --output=arrayJob_%A_%a.out #SBATCH --time=01:00:00 -#SBATCH --gres=gpu -#SBATCH --constraint=gpu6gb {} ###################### @@ -48,17 +46,17 @@ def tearDown(self): for file_name in (glob('*.out') + ["test.pbs"]): os.remove(file_name) - def _test_param(self, param_array, com, flag, string=pbs_string): - for param in param_array: - command = pbs_string.format( - string.format(com.format(param)) + def _test_param(self, param_array, command, flag, string=pbs_string, output_array=None): + output_array = output_array or param_array + for param, output in zip(param_array, output_array): + com = pbs_string.format( + string.format(command.format(param)) ) with open("test.pbs", "w") as text_file: - text_file.write(command) + text_file.write(com) process = Popen("sbatch test.pbs", stdout=PIPE, stderr=PIPE, shell=True) stdout, _ = process.communicate() stdout = stdout.decode() - print(stdout) self.assertIn("Submitted batch job", stdout) job_id = stdout.split(" ")[-1].strip() @@ -66,8 +64,7 @@ def _test_param(self, param_array, com, flag, string=pbs_string): process = Popen("squeue -u $USER -j {} -O {}".format(job_id, flag), stdout=PIPE, stderr=PIPE, shell=True) stdout, _ = process.communicate() job_params = [c.strip() for c in stdout.decode().split("\n")[1:] if c != ''] - # import ipdb; ipdb.set_trace() - self.assertSequenceEqual(job_params, [param for _ in range(len(job_params))]) + self.assertSequenceEqual(job_params, [output for _ in range(len(job_params))]) def test_priority(self): self._test_param( @@ -79,15 +76,16 @@ def test_priority(self): def test_gres(self): self._test_param( - ['k80'], + ["1", "2"], "#PBS -l naccelerators={}", "gres", - pbs_string + pbs_string, + ["gpu:1", "gpu:2"] ) def test_memory(self): self._test_param( - ['2G', '4G'], + ["2G", "4G"], "#PBS -l mem={}", "minmemory", pbs_string From 04f0ff167e157ba67d9725eca7a1c8b58bd42220 Mon Sep 17 00:00:00 2001 From: Adrien Ali Taiga Date: Tue, 19 Sep 2017 17:22:56 -0400 Subject: [PATCH 08/36] Updated tests to skip on Graham and Cedar smartdispatch modified to handle slurm clusters --- scripts/smart-dispatch | 10 +++++++--- smartdispatch/job_generator.py | 20 ++++++++++++++++++++ smartdispatch/pbs.py | 17 +++++++++++++++++ smartdispatch/tests/pbs_slurm_test.py | 14 +++++++++++--- smartdispatch/utils.py | 3 ++- 5 files changed, 57 insertions(+), 7 deletions(-) diff --git a/scripts/smart-dispatch b/scripts/smart-dispatch index 86904fa..6c1c67c 100755 --- a/scripts/smart-dispatch +++ b/scripts/smart-dispatch @@ -29,7 +29,7 @@ TIMEOUT_EXIT_CODE = 124 AUTORESUME_TRIGGER_AFTER = '$(($PBS_WALLTIME - 60))' # By default, 60s before the maximum walltime. AUTORESUME_WORKER_CALL_PREFIX = 'timeout -s TERM {trigger_after} '.format(trigger_after=AUTORESUME_TRIGGER_AFTER) AUTORESUME_WORKER_CALL_SUFFIX = ' WORKER_PIDS+=" $!"' -AUTORESUME_PROLOG = 'WORKER_PIDS=""' +AUTORESUME_PROLOG = 'WORKER_PIDS=""' AUTORESUME_EPILOG = """\ NEED_TO_RESUME=false for WORKER_PID in $WORKER_PIDS; do @@ -167,14 +167,17 @@ def main(): epilog = [AUTORESUME_EPILOG.format(launcher=LAUNCHER if args.launcher is None else args.launcher, path_job=path_job)] job_generator = job_generator_factory(queue, commands, prolog, epilog, command_params, CLUSTER_NAME, path_job) - + # generating default names per each jobs in each batch for pbs_id, pbs in enumerate(job_generator.pbs_list): proper_size_name = utils.jobname_generator(jobname, pbs_id) pbs.add_options(N=proper_size_name) - + if args.pbsFlags is not None: job_generator.add_pbs_flags(args.pbsFlags.split(' ')) + + if args.sbatchFlags is not None: + job_generator.add_sbatch_flags(args.sbatchFlags.split(' ')) pbs_filenames = job_generator.write_pbs_files(path_job_commands) # Launch the jobs @@ -206,6 +209,7 @@ def parse_arguments(): parser.add_argument('-p', '--pool', type=int, help="Number of workers that will be consuming commands. Default: Nb commands") parser.add_argument('--pbsFlags', type=str, help='ADVANCED USAGE: Allow to pass a space seperated list of PBS flags. Ex:--pbsFlags="-lfeature=k80 -t0-4"') + parser.add_argument('--sbatchFlags', type=str, help='ADVANCED USAGE: Allow to pass a space seperated list of SBATCH flags. Ex:--sbatchFlags="-qos=high --output=file.out"') subparsers = parser.add_subparsers(dest="mode") launch_parser = subparsers.add_parser('launch', help="Launch jobs.") diff --git a/smartdispatch/job_generator.py b/smartdispatch/job_generator.py index d2db23c..f589d36 100644 --- a/smartdispatch/job_generator.py +++ b/smartdispatch/job_generator.py @@ -73,6 +73,16 @@ def add_pbs_flags(self, flags): pbs.add_resources(**resources) pbs.add_options(**options) + def add_sbatch_flags(self, flags): + options = {} + + for flag in flags: + split = flag.find('=') + options[flag[:split]] = flag[split+1:] + + for pbs in self.pbs_list: + pbs.add_sbatch_options(**options) + def _generate_base_pbs(self): """ Generates PBS files allowing the execution of every commands on the given queue. """ nb_commands_per_node = self.queue.nb_cores_per_node // self.nb_cores_per_command @@ -171,3 +181,13 @@ def _add_cluster_specific_rules(self): for pbs in self.pbs_list: # Remove forbidden ppn option. Default is 2 cores per gpu. pbs.resources['nodes'] = re.sub(":ppn=[0-9]+", "", pbs.resources['nodes']) + +class SlurmClusterGenerator(JobGenerator): + + def _add_cluster_specific_rules(self): + for pbs in self.pbs_list: + node_resource = pbs.resources.pop('nodes') + gpus = re.match(".*gpus=([0-9]+)", node_resource).group(1) + ppn = re.match(".*ppn=([0-9]+)", node_resource).group(1) + pbs.add_resources(naccelerators=gpus) + pbs.add_resources(ncpus=ppn) diff --git a/smartdispatch/pbs.py b/smartdispatch/pbs.py index f8d7982..2560c50 100644 --- a/smartdispatch/pbs.py +++ b/smartdispatch/pbs.py @@ -35,6 +35,8 @@ def __init__(self, queue_name, walltime): self.options = OrderedDict() self.add_options(q=queue_name) + self.sbatch_options = OrderedDict() + # Declares that all environment variables in the qsub command's environment are to be exported to the batch job. self.add_options(V="") @@ -62,6 +64,18 @@ def add_options(self, **options): self.options["-" + option_name] = option_value + def add_sbatch_options(self, **options): + """ Adds sbatch options to this PBS file. + + Parameters + ---------- + **options : dict + each key is the name of a SBATCH option (see `Options`) + """ + + for option_name, option_value in options.items(): + self.sbatch_options[option_name] = option_value + def add_resources(self, **resources): """ Adds resources to this PBS file. @@ -159,6 +173,9 @@ def __str__(self): for resource_name, resource_value in self.resources.items(): pbs += ["#PBS -l {0}={1}".format(resource_name, resource_value)] + for option_name, option_value in self.sbatch_options.items(): + pbs += ["#SBATCH {0}={1}".format(option_name, option_value)] + pbs += ["\n# Modules #"] for module in self.modules: pbs += ["module load " + module] diff --git a/smartdispatch/tests/pbs_slurm_test.py b/smartdispatch/tests/pbs_slurm_test.py index 4a8b020..961be34 100644 --- a/smartdispatch/tests/pbs_slurm_test.py +++ b/smartdispatch/tests/pbs_slurm_test.py @@ -39,6 +39,13 @@ nvidia-smi """ +# Checking which cluster is running the tests first +process = Popen("sacctmgr list cluster", stdout=PIPE, stderr=PIPE, shell=True) +stdout, _ = process.communicate() +stdout = stdout.decode() +cluster = stdout.splitlines()[2].strip().split(' ')[0] +to_skip = cluster in ['graham', 'cedar'] +message = "Test does not run on cluster {}".format(cluster) class TestSlurm(unittest.TestCase): @@ -66,6 +73,7 @@ def _test_param(self, param_array, command, flag, string=pbs_string, output_arra job_params = [c.strip() for c in stdout.decode().split("\n")[1:] if c != ''] self.assertSequenceEqual(job_params, [output for _ in range(len(job_params))]) + @unittest.skipIf(to_skip, message) def test_priority(self): self._test_param( ['high', 'low'], @@ -94,12 +102,12 @@ def test_memory(self): def test_nb_cpus(self): self._test_param( ["2", "3"], - "#PBS -l mppdepth={}", - # "#SBATCH --cpus-per-task={}", - "numcpus", + "#PBS -l ncpus={}", + "mincpus", pbs_string ) + @unittest.skipIf(to_skip, message) def test_constraint(self): self._test_param( ["gpu6gb", "gpu8gb"], diff --git a/smartdispatch/utils.py b/smartdispatch/utils.py index 9135780..cc2789a 100644 --- a/smartdispatch/utils.py +++ b/smartdispatch/utils.py @@ -17,7 +17,7 @@ def jobname_generator(jobname, job_id): Returns ------- str - The cropped version of the string. + The cropped version of the string. ''' # 64 - 1 since the total length including -1 should be less than 64 job_id = str(job_id) @@ -115,6 +115,7 @@ def detect_cluster(): output = Popen(["qstat", "-B"], stdout=PIPE).communicate()[0] except OSError: # If qstat is not available we assume that the cluster is unknown. + # TODO: handle MILA + CEDAR + GRAHAM return None # Get server name from status server_name = output.split('\n')[2].split(' ')[0] From d7d03005ff83c3d4f0f0beac3bbc4c8cd74886c4 Mon Sep 17 00:00:00 2001 From: Adrien Ali Taiga Date: Tue, 26 Sep 2017 13:44:22 -0400 Subject: [PATCH 09/36] Cleaned code with PR feedback --- smartdispatch/job_generator.py | 16 ++++++--- smartdispatch/pbs.py | 6 +++- smartdispatch/tests/test_job_generator.py | 3 +- smartdispatch/tests/test_pbs.py | 8 ++++- ...m_test.py => verify_slurms_pbs_wrapper.py} | 33 ++++--------------- smartdispatch/utils.py | 7 +++- 6 files changed, 38 insertions(+), 35 deletions(-) rename smartdispatch/tests/{pbs_slurm_test.py => verify_slurms_pbs_wrapper.py} (77%) diff --git a/smartdispatch/job_generator.py b/smartdispatch/job_generator.py index f589d36..8444f2b 100644 --- a/smartdispatch/job_generator.py +++ b/smartdispatch/job_generator.py @@ -78,7 +78,12 @@ def add_sbatch_flags(self, flags): for flag in flags: split = flag.find('=') - options[flag[:split]] = flag[split+1:] + if flag.startswith('--'): + options[flag[2:split]] = flag[split+1:] + elif flag.startswith('-'): + options[flag[1:split]] = flag[split+1:] + else: + raise ValueError("Invalid SBATCH flag ({})".format(flag)) for pbs in self.pbs_list: pbs.add_sbatch_options(**options) @@ -182,12 +187,13 @@ def _add_cluster_specific_rules(self): # Remove forbidden ppn option. Default is 2 cores per gpu. pbs.resources['nodes'] = re.sub(":ppn=[0-9]+", "", pbs.resources['nodes']) -class SlurmClusterGenerator(JobGenerator): +class SlurmJobGenerator(JobGenerator): def _add_cluster_specific_rules(self): for pbs in self.pbs_list: - node_resource = pbs.resources.pop('nodes') - gpus = re.match(".*gpus=([0-9]+)", node_resource).group(1) - ppn = re.match(".*ppn=([0-9]+)", node_resource).group(1) + gpus = re.match(".*gpus=([0-9]+)", pbs.resources['nodes']).group(1) + ppn = re.match(".*ppn=([0-9]+)", pbs.resources['nodes']).group(1) + pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "", pbs.resources['nodes']) + pbs.resources['nodes'] = re.sub(":gpus=[0-9]+", "", pbs.resources['nodes']) pbs.add_resources(naccelerators=gpus) pbs.add_resources(ncpus=ppn) diff --git a/smartdispatch/pbs.py b/smartdispatch/pbs.py index 2560c50..ef60efc 100644 --- a/smartdispatch/pbs.py +++ b/smartdispatch/pbs.py @@ -70,10 +70,14 @@ def add_sbatch_options(self, **options): Parameters ---------- **options : dict - each key is the name of a SBATCH option (see `Options`) + each key is the name of a SBATCH option """ for option_name, option_value in options.items(): + if len(option_name) == 1: + self.sbatch_options["-" + option_name] = option_value + else: + self.sbatch_options["--" + option_name] = option_value self.sbatch_options[option_name] = option_value def add_resources(self, **resources): diff --git a/smartdispatch/tests/test_job_generator.py b/smartdispatch/tests/test_job_generator.py index 7214d9d..22c4b0b 100644 --- a/smartdispatch/tests/test_job_generator.py +++ b/smartdispatch/tests/test_job_generator.py @@ -1,4 +1,5 @@ from nose.tools import assert_true, assert_false, assert_equal, assert_raises +import unittest import os import tempfile @@ -7,6 +8,7 @@ from smartdispatch.job_generator import JobGenerator, job_generator_factory from smartdispatch.job_generator import HeliosJobGenerator, HadesJobGenerator from smartdispatch.job_generator import GuilliminJobGenerator, MammouthJobGenerator +from smartdispatch.job_generator import SlurmJobGenerator class TestJobGenerator(object): @@ -242,7 +244,6 @@ def test_pbs_split_2_job_nb_commands(self): assert_true("ppn=6" in str(self.pbs8[0])) assert_true("ppn=2" in str(self.pbs8[1])) - class TestJobGeneratorFactory(object): def setUp(self): diff --git a/smartdispatch/tests/test_pbs.py b/smartdispatch/tests/test_pbs.py index 6088052..41f567e 100644 --- a/smartdispatch/tests/test_pbs.py +++ b/smartdispatch/tests/test_pbs.py @@ -1,7 +1,6 @@ from nose.tools import assert_true, assert_equal, assert_raises from numpy.testing import assert_array_equal - from smartdispatch.pbs import PBS import unittest import tempfile @@ -38,6 +37,13 @@ def test_add_options(self): assert_equal(self.pbs.options["-A"], "option2") assert_equal(self.pbs.options["-B"], "option3") + def test_add_sbatch_options(self): + self.pbs.add_sbatch_options(a="value1") + assert_equal(self.pbs.sbatch_options["-a"], "value1") + self.pbs.add_sbatch_options(option1="value2", option2="value3") + assert_equal(self.pbs.sbatch_options["--option1"], "value2") + assert_equal(self.pbs.sbatch_options["--option2"], "value3") + def test_add_resources(self): assert_equal(len(self.pbs.resources), 1) assert_equal(self.pbs.resources["walltime"], self.walltime) diff --git a/smartdispatch/tests/pbs_slurm_test.py b/smartdispatch/tests/verify_slurms_pbs_wrapper.py similarity index 77% rename from smartdispatch/tests/pbs_slurm_test.py rename to smartdispatch/tests/verify_slurms_pbs_wrapper.py index 961be34..b3c3c50 100644 --- a/smartdispatch/tests/pbs_slurm_test.py +++ b/smartdispatch/tests/verify_slurms_pbs_wrapper.py @@ -2,9 +2,10 @@ import os import time import unittest - from subprocess import Popen, PIPE +from smartdispatch.utils import get_slurm_cluster_name + pbs_string = """\ #!/usr/bin/env /bin/bash @@ -22,28 +23,8 @@ nvidia-smi """ -sbatch_string = """\ -#!/usr/bin/env -i /bin/zsh - -#SBATCH --job-name=arrayJob -#SBATCH --output=arrayJob_%A_%a.out -#SBATCH --time=01:00:00 -{} - -###################### -# Begin work section # -###################### - -echo "My SLURM_ARRAY_JOB_ID:" $SLURM_ARRAY_JOB_ID -echo "My SLURM_ARRAY_TASK_ID: " $SLURM_ARRAY_TASK_ID -nvidia-smi -""" - # Checking which cluster is running the tests first -process = Popen("sacctmgr list cluster", stdout=PIPE, stderr=PIPE, shell=True) -stdout, _ = process.communicate() -stdout = stdout.decode() -cluster = stdout.splitlines()[2].strip().split(' ')[0] +cluster = get_slurm_cluster_name() to_skip = cluster in ['graham', 'cedar'] message = "Test does not run on cluster {}".format(cluster) @@ -53,14 +34,14 @@ def tearDown(self): for file_name in (glob('*.out') + ["test.pbs"]): os.remove(file_name) - def _test_param(self, param_array, command, flag, string=pbs_string, output_array=None): + def _test_param(self, param_array, command_template, flag, string=pbs_string, output_array=None): output_array = output_array or param_array for param, output in zip(param_array, output_array): - com = pbs_string.format( - string.format(command.format(param)) + param_command = pbs_string.format( + string.format(command_template.format(param)) ) with open("test.pbs", "w") as text_file: - text_file.write(com) + text_file.write(param_command) process = Popen("sbatch test.pbs", stdout=PIPE, stderr=PIPE, shell=True) stdout, _ = process.communicate() stdout = stdout.decode() diff --git a/smartdispatch/utils.py b/smartdispatch/utils.py index cc2789a..3c598ad 100644 --- a/smartdispatch/utils.py +++ b/smartdispatch/utils.py @@ -115,7 +115,7 @@ def detect_cluster(): output = Popen(["qstat", "-B"], stdout=PIPE).communicate()[0] except OSError: # If qstat is not available we assume that the cluster is unknown. - # TODO: handle MILA + CEDAR + GRAHAM + cluster_name = get_slurm_cluster_name() return None # Get server name from status server_name = output.split('\n')[2].split(' ')[0] @@ -131,6 +131,11 @@ def detect_cluster(): cluster_name = "hades" return cluster_name +def get_slurm_cluster_name(): + stdout = Popen("sacctmgr list cluster", stdout=PIPE, shell=True).communicate()[0] + stdout = stdout.decode() + cluster_name = stdout.splitlines()[2].strip().split(' ')[0] + return cluster_name def get_launcher(cluster_name): if cluster_name == "helios": From c7bb2504528dec65f5a9f846911adb51e5a0b4b6 Mon Sep 17 00:00:00 2001 From: Adrien Ali Taiga Date: Fri, 6 Oct 2017 17:08:33 -0400 Subject: [PATCH 10/36] Updated tests --- scripts/smart-dispatch | 2 +- smartdispatch/job_generator.py | 10 +++-- smartdispatch/pbs.py | 5 ++- smartdispatch/tests/test_job_generator.py | 49 +++++++++++++++++++++++ smartdispatch/utils.py | 1 + 5 files changed, 61 insertions(+), 6 deletions(-) diff --git a/scripts/smart-dispatch b/scripts/smart-dispatch index 6c1c67c..8a15b8f 100755 --- a/scripts/smart-dispatch +++ b/scripts/smart-dispatch @@ -209,7 +209,7 @@ def parse_arguments(): parser.add_argument('-p', '--pool', type=int, help="Number of workers that will be consuming commands. Default: Nb commands") parser.add_argument('--pbsFlags', type=str, help='ADVANCED USAGE: Allow to pass a space seperated list of PBS flags. Ex:--pbsFlags="-lfeature=k80 -t0-4"') - parser.add_argument('--sbatchFlags', type=str, help='ADVANCED USAGE: Allow to pass a space seperated list of SBATCH flags. Ex:--sbatchFlags="-qos=high --output=file.out"') + parser.add_argument('--sbatchFlags', type=str, help='ADVANCED USAGE: Allow to pass a space seperated list of SBATCH flags. Ex:--sbatchFlags="--qos=high --ofile.out"') subparsers = parser.add_subparsers(dest="mode") launch_parser = subparsers.add_parser('launch', help="Launch jobs.") diff --git a/smartdispatch/job_generator.py b/smartdispatch/job_generator.py index 8444f2b..b219ea1 100644 --- a/smartdispatch/job_generator.py +++ b/smartdispatch/job_generator.py @@ -79,11 +79,13 @@ def add_sbatch_flags(self, flags): for flag in flags: split = flag.find('=') if flag.startswith('--'): - options[flag[2:split]] = flag[split+1:] - elif flag.startswith('-'): - options[flag[1:split]] = flag[split+1:] + if split == -1: + raise ValueError("Invalid SBATCH flag ({})".format(flag)) + options[flag[:split].lstrip("-")] = flag[split+1:] + elif flag.startswith('-') and split == -1: + options[flag[1:2]] = flag[2:] else: - raise ValueError("Invalid SBATCH flag ({})".format(flag)) + raise ValueError("Invalid SBATCH flag ({}, is it a PBS flag?)".format(flag)) for pbs in self.pbs_list: pbs.add_sbatch_options(**options) diff --git a/smartdispatch/pbs.py b/smartdispatch/pbs.py index ef60efc..df93028 100644 --- a/smartdispatch/pbs.py +++ b/smartdispatch/pbs.py @@ -178,7 +178,10 @@ def __str__(self): pbs += ["#PBS -l {0}={1}".format(resource_name, resource_value)] for option_name, option_value in self.sbatch_options.items(): - pbs += ["#SBATCH {0}={1}".format(option_name, option_value)] + if option_name.startswith('--'): + pbs += ["#SBATCH {0}={1}".format(option_name, option_value)] + else: + pbs += ["#SBATCH {0} {1}".format(option_name, option_value)] pbs += ["\n# Modules #"] for module in self.modules: diff --git a/smartdispatch/tests/test_job_generator.py b/smartdispatch/tests/test_job_generator.py index 22c4b0b..ce36d25 100644 --- a/smartdispatch/tests/test_job_generator.py +++ b/smartdispatch/tests/test_job_generator.py @@ -13,6 +13,7 @@ class TestJobGenerator(object): pbs_flags = ['-lfeature=k80', '-lwalltime=42:42', '-lnodes=6:gpus=66', '-m', '-A123-asd-11', '-t10,20,30'] + sbatch_flags = ['--qos=high', '--output=file.out', '-Cminmemory'] def setUp(self): self.testing_dir = tempfile.mkdtemp() @@ -129,6 +130,32 @@ def test_add_pbs_flags_invalid(self): def test_add_pbs_flags_invalid_resource(self): assert_raises(ValueError, self._test_add_pbs_flags, '-l weeee') + def _test_add_sbatch_flags(self, flags): + job_generator = JobGenerator(self.queue, self.commands) + job_generator.add_sbatch_flags(flags) + options = [] + + for flag in flags: + if flag.startswith('--'): + options += [flag] + elif flag.startswith('-'): + options += [(flag[:2] + ' ' + flag[2:]).strip()] + + for pbs in job_generator.pbs_list: + pbs_str = pbs.__str__() + for flag in options: + assert_equal(pbs_str.count(flag), 1) + + def test_add_sbatch_flags(self): + for flag in self.sbatch_flags: + yield self._test_add_sbatch_flags, [flag] + + yield self._test_add_sbatch_flags, [flag] + + def test_add_sbatch_flag_invalid(self): + invalid_flags = ["--qos high", "gpu", "-lfeature=k80"] + for flag in invalid_flags: + assert_raises(ValueError, self._test_add_sbatch_flags, "--qos high") class TestGuilliminQueue(object): @@ -244,6 +271,28 @@ def test_pbs_split_2_job_nb_commands(self): assert_true("ppn=6" in str(self.pbs8[0])) assert_true("ppn=2" in str(self.pbs8[1])) +class TestSlurmQueue(object): + + def setUp(self): + self.walltime = "10:00" + self.cores = 42 + self.mem_per_node = 32 + self.nb_cores_per_node = 1 + self.nb_gpus_per_node = 2 + self.queue = Queue("slurm", "mila", self.walltime, self.nb_cores_per_node, self.nb_gpus_per_node, self.mem_per_node) + + self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"] + job_generator = SlurmJobGenerator(self.queue, self.commands) + self.pbs = job_generator.pbs_list + + def test_ppn_ncpus(self): + assert_true("ppn" not in str(self.pbs[0])) + assert_true("ncpus" in str(self.pbs[0])) + + def test_gpus_naccelerators(self): + assert_true("gpus" not in str(self.pbs[0])) + assert_true("naccelerators" in str(self.pbs[0])) + class TestJobGeneratorFactory(object): def setUp(self): diff --git a/smartdispatch/utils.py b/smartdispatch/utils.py index 3c598ad..4839c00 100644 --- a/smartdispatch/utils.py +++ b/smartdispatch/utils.py @@ -115,6 +115,7 @@ def detect_cluster(): output = Popen(["qstat", "-B"], stdout=PIPE).communicate()[0] except OSError: # If qstat is not available we assume that the cluster is unknown. + # TODO: handle MILA + CEDAR + GRAHAM cluster_name = get_slurm_cluster_name() return None # Get server name from status From a20405bff0dee3500a342d18421640770861a83d Mon Sep 17 00:00:00 2001 From: Adrien Ali Taiga Date: Mon, 9 Oct 2017 20:35:15 -0400 Subject: [PATCH 11/36] Updated tests using mock --- smartdispatch/job_generator.py | 2 +- smartdispatch/tests/test_job_generator.py | 19 +++++++-- smartdispatch/tests/test_utils.py | 48 +++++++++++++++++++++-- 3 files changed, 61 insertions(+), 8 deletions(-) diff --git a/smartdispatch/job_generator.py b/smartdispatch/job_generator.py index b219ea1..af33e9c 100644 --- a/smartdispatch/job_generator.py +++ b/smartdispatch/job_generator.py @@ -80,7 +80,7 @@ def add_sbatch_flags(self, flags): split = flag.find('=') if flag.startswith('--'): if split == -1: - raise ValueError("Invalid SBATCH flag ({})".format(flag)) + raise ValueError("Invalid SBATCH flag ({}), no '=' character found' ".format(flag)) options[flag[:split].lstrip("-")] = flag[split+1:] elif flag.startswith('-') and split == -1: options[flag[1:2]] = flag[2:] diff --git a/smartdispatch/tests/test_job_generator.py b/smartdispatch/tests/test_job_generator.py index ce36d25..a82c43a 100644 --- a/smartdispatch/tests/test_job_generator.py +++ b/smartdispatch/tests/test_job_generator.py @@ -1,9 +1,14 @@ from nose.tools import assert_true, assert_false, assert_equal, assert_raises -import unittest - import os -import tempfile import shutil +import tempfile +import unittest + +try: + from mock import patch +except ImportError: + from unittest.mock import patch + from smartdispatch.queue import Queue from smartdispatch.job_generator import JobGenerator, job_generator_factory from smartdispatch.job_generator import HeliosJobGenerator, HadesJobGenerator @@ -155,7 +160,7 @@ def test_add_sbatch_flags(self): def test_add_sbatch_flag_invalid(self): invalid_flags = ["--qos high", "gpu", "-lfeature=k80"] for flag in invalid_flags: - assert_raises(ValueError, self._test_add_sbatch_flags, "--qos high") + assert_raises(ValueError, self._test_add_sbatch_flags, flag) class TestGuilliminQueue(object): @@ -285,9 +290,15 @@ def setUp(self): job_generator = SlurmJobGenerator(self.queue, self.commands) self.pbs = job_generator.pbs_list + with patch.object(SlurmJobGenerator,'_add_cluster_specific_rules', side_effect=lambda: None): + dummy_generator = SlurmJobGenerator(self.queue, self.commands) + self.dummy_pbs = dummy_generator.pbs_list + def test_ppn_ncpus(self): assert_true("ppn" not in str(self.pbs[0])) assert_true("ncpus" in str(self.pbs[0])) + assert_true("ppn" in str(self.dummy_pbs[0])) + assert_true("ncpus" not in str(self.dummy_pbs[0])) def test_gpus_naccelerators(self): assert_true("gpus" not in str(self.pbs[0])) diff --git a/smartdispatch/tests/test_utils.py b/smartdispatch/tests/test_utils.py index 4eaef4e..295498b 100644 --- a/smartdispatch/tests/test_utils.py +++ b/smartdispatch/tests/test_utils.py @@ -1,11 +1,16 @@ # -*- coding: utf-8 -*- import unittest - -from smartdispatch import utils - +try: + from mock import patch + import mock +except ImportError: + from unittest.mock import patch + import unittest.mock from nose.tools import assert_equal, assert_true from numpy.testing import assert_array_equal +import subprocess +from smartdispatch import utils class PrintBoxedTests(unittest.TestCase): @@ -49,3 +54,40 @@ def test_slugify(): for arg, expected in testing_arguments: assert_equal(utils.slugify(arg), expected) + +command_output = """\ +Server Max Tot Que Run Hld Wat Trn Ext Com Status +---------------- --- --- --- --- --- --- --- --- --- ---------- +gpu-srv1.{} 0 1674 524 121 47 0 0 22 960 Idle +""" + +slurm_command = """\ + Cluster ControlHost ControlPort RPC Share GrpJobs GrpTRES GrpSubmit MaxJobs MaxTRES MaxSubmit MaxWall QOS Def QOS +---------- --------------- ------------ ----- --------- ------- ------------- --------- ------- ------------- --------- ----------- -------------------- --------- + {} 132.204.24.224 6817 7680 1 normal +""" + + +class ClusterIdentificationTest(unittest.TestCase): + + def test_detect_cluster(self): + server_name = ["hades", "m", "guil", "helios", "hades"] + clusters = ["hades", "mammouth", "guillimin", "helios"] + + for name, cluster in zip(server_name, clusters): + with patch('smartdispatch.utils.Popen') as mock_communicate: + mock_communicate.return_value.communicate.return_value = (command_output.format(name),) + self.assertEquals(utils.detect_cluster(), cluster) + + # def test_detect_mila_cluster(self): + # with patch('smartdispatch.utils.Popen') as mock_communicate: + # mock_communicate.return_value.communicate.side_effect = OSError + # self.assertIsNone(utils.detect_cluster()) + + def test_get_slurm_cluster_name(self): + clusters = ["graham", "cedar", "mila"] + + for cluster in clusters: + with patch('smartdispatch.utils.Popen') as mock_communicate: + mock_communicate.return_value.communicate.return_value = (slurm_command.format(cluster),) + self.assertEquals(utils.get_slurm_cluster_name(), cluster) From 5bde1c6867bd7a298c563b16d8e01670fed72936 Mon Sep 17 00:00:00 2001 From: Xavier Bouthillier Date: Tue, 10 Oct 2017 09:26:39 -0400 Subject: [PATCH 12/36] Refactor detect_cluster tests --- smartdispatch/tests/test_utils.py | 44 +++++++++++++++++-------------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/smartdispatch/tests/test_utils.py b/smartdispatch/tests/test_utils.py index 295498b..26267d9 100644 --- a/smartdispatch/tests/test_utils.py +++ b/smartdispatch/tests/test_utils.py @@ -55,6 +55,7 @@ def test_slugify(): for arg, expected in testing_arguments: assert_equal(utils.slugify(arg), expected) + command_output = """\ Server Max Tot Que Run Hld Wat Trn Ext Com Status ---------------- --- --- --- --- --- --- --- --- --- ---------- @@ -69,25 +70,28 @@ def test_slugify(): class ClusterIdentificationTest(unittest.TestCase): + server_names = ["hades", "m", "guil", "helios", "hades"] + clusters = ["hades", "mammouth", "guillimin", "helios"] + command_output = command_output + + def __init__(self, *args, **kwargs): + super(ClusterIdentificationTest, self).__init__(*args, **kwargs) + self.detect_cluster = utils.detect_cluster def test_detect_cluster(self): - server_name = ["hades", "m", "guil", "helios", "hades"] - clusters = ["hades", "mammouth", "guillimin", "helios"] - - for name, cluster in zip(server_name, clusters): - with patch('smartdispatch.utils.Popen') as mock_communicate: - mock_communicate.return_value.communicate.return_value = (command_output.format(name),) - self.assertEquals(utils.detect_cluster(), cluster) - - # def test_detect_mila_cluster(self): - # with patch('smartdispatch.utils.Popen') as mock_communicate: - # mock_communicate.return_value.communicate.side_effect = OSError - # self.assertIsNone(utils.detect_cluster()) - - def test_get_slurm_cluster_name(self): - clusters = ["graham", "cedar", "mila"] - - for cluster in clusters: - with patch('smartdispatch.utils.Popen') as mock_communicate: - mock_communicate.return_value.communicate.return_value = (slurm_command.format(cluster),) - self.assertEquals(utils.get_slurm_cluster_name(), cluster) + + with patch('smartdispatch.utils.Popen') as MockPopen: + mock_process = MockPopen.return_value + for name, cluster in zip(self.server_names, self.clusters): + mock_process.communicate.return_value = ( + self.command_output.format(name), "") + self.assertEquals(self.detect_cluster(), cluster) + + +class SlurmClusterIdentificationTest(ClusterIdentificationTest): + server_names = clusters = ["graham", "cedar", "mila"] + command_output = slurm_command + + def __init__(self, *args, **kwargs): + super(SlurmClusterIdentificationTest, self).__init__(*args, **kwargs) + self.detect_cluster = utils.get_slurm_cluster_name From 0ff4776d69b050a2ca9d7c58de15ddde5c9d6c10 Mon Sep 17 00:00:00 2001 From: Adrien Ali Taiga Date: Tue, 10 Oct 2017 09:31:46 -0400 Subject: [PATCH 13/36] Small changes in TestSlurmQueue --- smartdispatch/tests/test_job_generator.py | 4 ++++ smartdispatch/tests/test_utils.py | 3 +-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/smartdispatch/tests/test_job_generator.py b/smartdispatch/tests/test_job_generator.py index a82c43a..f506000 100644 --- a/smartdispatch/tests/test_job_generator.py +++ b/smartdispatch/tests/test_job_generator.py @@ -295,12 +295,16 @@ def setUp(self): self.dummy_pbs = dummy_generator.pbs_list def test_ppn_ncpus(self): + assert_true("ppn" in str(self.dummy_pbs[0])) + assert_true("ncpus" not in str(self.dummy_pbs[0])) assert_true("ppn" not in str(self.pbs[0])) assert_true("ncpus" in str(self.pbs[0])) assert_true("ppn" in str(self.dummy_pbs[0])) assert_true("ncpus" not in str(self.dummy_pbs[0])) def test_gpus_naccelerators(self): + assert_true("gpus" in str(self.dummy_pbs[0])) + assert_true("naccelerators" not in str(self.dummy_pbs[0])) assert_true("gpus" not in str(self.pbs[0])) assert_true("naccelerators" in str(self.pbs[0])) diff --git a/smartdispatch/tests/test_utils.py b/smartdispatch/tests/test_utils.py index 26267d9..d6e357b 100644 --- a/smartdispatch/tests/test_utils.py +++ b/smartdispatch/tests/test_utils.py @@ -2,10 +2,9 @@ import unittest try: from mock import patch - import mock except ImportError: from unittest.mock import patch - import unittest.mock + from nose.tools import assert_equal, assert_true from numpy.testing import assert_array_equal import subprocess From d1ad338daa4e592ed72b04e4453494f8c036a1bf Mon Sep 17 00:00:00 2001 From: Xavier Bouthillier Date: Fri, 13 Oct 2017 10:13:47 -0400 Subject: [PATCH 14/36] Fix add_sbatch_option bug Why: For each option, add_sbatch_option would add the option in both the form --[OPTION_NAME] and [OPTION_NAME]. --- smartdispatch/pbs.py | 6 +++--- smartdispatch/tests/test_pbs.py | 3 +++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/smartdispatch/pbs.py b/smartdispatch/pbs.py index df93028..1fc22af 100644 --- a/smartdispatch/pbs.py +++ b/smartdispatch/pbs.py @@ -75,10 +75,10 @@ def add_sbatch_options(self, **options): for option_name, option_value in options.items(): if len(option_name) == 1: - self.sbatch_options["-" + option_name] = option_value + dash = "-" else: - self.sbatch_options["--" + option_name] = option_value - self.sbatch_options[option_name] = option_value + dash = "--" + self.sbatch_options[dash + option_name] = option_value def add_resources(self, **resources): """ Adds resources to this PBS file. diff --git a/smartdispatch/tests/test_pbs.py b/smartdispatch/tests/test_pbs.py index 41f567e..e9f938f 100644 --- a/smartdispatch/tests/test_pbs.py +++ b/smartdispatch/tests/test_pbs.py @@ -40,9 +40,12 @@ def test_add_options(self): def test_add_sbatch_options(self): self.pbs.add_sbatch_options(a="value1") assert_equal(self.pbs.sbatch_options["-a"], "value1") + assert_equal(len(self.pbs.sbatch_options), 1) + self.pbs.sbatch_options.pop("-a") self.pbs.add_sbatch_options(option1="value2", option2="value3") assert_equal(self.pbs.sbatch_options["--option1"], "value2") assert_equal(self.pbs.sbatch_options["--option2"], "value3") + assert_equal(len(self.pbs.sbatch_options), 2) def test_add_resources(self): assert_equal(len(self.pbs.resources), 1) From 899167ffc3db26ab9932cca1360d77ed029b4d01 Mon Sep 17 00:00:00 2001 From: Xavier Bouthillier Date: Fri, 13 Oct 2017 18:45:36 -0400 Subject: [PATCH 15/36] Refactor SlurmJobGenerator It will need many conversions, not only on resources, so better make it clean. --- smartdispatch/job_generator.py | 39 ++++++++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/smartdispatch/job_generator.py b/smartdispatch/job_generator.py index af33e9c..079e0a2 100644 --- a/smartdispatch/job_generator.py +++ b/smartdispatch/job_generator.py @@ -189,13 +189,40 @@ def _add_cluster_specific_rules(self): # Remove forbidden ppn option. Default is 2 cores per gpu. pbs.resources['nodes'] = re.sub(":ppn=[0-9]+", "", pbs.resources['nodes']) + class SlurmJobGenerator(JobGenerator): - def _add_cluster_specific_rules(self): - for pbs in self.pbs_list: - gpus = re.match(".*gpus=([0-9]+)", pbs.resources['nodes']).group(1) - ppn = re.match(".*ppn=([0-9]+)", pbs.resources['nodes']).group(1) - pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "", pbs.resources['nodes']) - pbs.resources['nodes'] = re.sub(":gpus=[0-9]+", "", pbs.resources['nodes']) + def __init__(self, *args, **kwargs): + super(SlurmJobGenerator, self).__init__(*args, **kwargs) + + def _adapt_options(self, pbs): + pass + + def _adapt_commands(self, pbs): + pass + + def _adapt_resources(self, pbs): + # Set proper option for gpus + match = re.match(".*gpus=([0-9]+)", pbs.resources['nodes']) + if match: + gpus = match.group(1) pbs.add_resources(naccelerators=gpus) + pbs.resources['nodes'] = re.sub(":gpus=[0-9]+", "", + pbs.resources['nodes']) + + # Set proper option for cpus + match = re.match(".*ppn=([0-9]+)", pbs.resources['nodes']) + if match: + ppn = match.group(1) pbs.add_resources(ncpus=ppn) + pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "", pbs.resources['nodes']) + + def _adapt_variable_names(self, pbs): + pass + + def _add_cluster_specific_rules(self): + for pbs in self.pbs_list: + self._adapt_options(pbs) + self._adapt_resources(pbs) + self._adapt_commands(pbs) + self._adapt_variable_names(pbs) From 581f835bf459ea1e12a4e19aac5ab3fdfe9b44de Mon Sep 17 00:00:00 2001 From: Xavier Bouthillier Date: Fri, 13 Oct 2017 18:54:10 -0400 Subject: [PATCH 16/36] Remove queue name for Slurm clusters Slurm has no queues, so PBS option -q is invalid and non-convertible. --- smartdispatch/job_generator.py | 4 +++- smartdispatch/tests/test_job_generator.py | 10 +++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/smartdispatch/job_generator.py b/smartdispatch/job_generator.py index 079e0a2..a25b4b0 100644 --- a/smartdispatch/job_generator.py +++ b/smartdispatch/job_generator.py @@ -196,7 +196,9 @@ def __init__(self, *args, **kwargs): super(SlurmJobGenerator, self).__init__(*args, **kwargs) def _adapt_options(self, pbs): - pass + # Remove queue, there is no queue in Slurm + if "-q" in pbs.options: + del pbs.options["-q"] def _adapt_commands(self, pbs): pass diff --git a/smartdispatch/tests/test_job_generator.py b/smartdispatch/tests/test_job_generator.py index f506000..8d053e5 100644 --- a/smartdispatch/tests/test_job_generator.py +++ b/smartdispatch/tests/test_job_generator.py @@ -276,7 +276,8 @@ def test_pbs_split_2_job_nb_commands(self): assert_true("ppn=6" in str(self.pbs8[0])) assert_true("ppn=2" in str(self.pbs8[1])) -class TestSlurmQueue(object): + +class TestSlurmQueue(unittest.TestCase): def setUp(self): self.walltime = "10:00" @@ -299,8 +300,6 @@ def test_ppn_ncpus(self): assert_true("ncpus" not in str(self.dummy_pbs[0])) assert_true("ppn" not in str(self.pbs[0])) assert_true("ncpus" in str(self.pbs[0])) - assert_true("ppn" in str(self.dummy_pbs[0])) - assert_true("ncpus" not in str(self.dummy_pbs[0])) def test_gpus_naccelerators(self): assert_true("gpus" in str(self.dummy_pbs[0])) @@ -308,6 +307,11 @@ def test_gpus_naccelerators(self): assert_true("gpus" not in str(self.pbs[0])) assert_true("naccelerators" in str(self.pbs[0])) + def test_queue(self): + assert_true("PBS -q" in str(self.dummy_pbs[0])) + assert_true("PBS -q" not in str(self.pbs[0])) + + class TestJobGeneratorFactory(object): def setUp(self): From 1d74e1ae802d68ee1a3cb4bceb3b683a48a9ec68 Mon Sep 17 00:00:00 2001 From: Xavier Bouthillier Date: Fri, 13 Oct 2017 19:03:52 -0400 Subject: [PATCH 17/36] Replace PBS_JOBID with SLURM_JOB_ID $PBS_JOBID was used to set the stdout/err of the job as well as in the commands. Replace them with $SLURM_JOB_ID. Also, workers were accessing os.environ[PBS_JOBID] so we added a second fetch on SLURM_JOB_ID in case os.environ[PBS_JOBID] gave undefined. --- smartdispatch/job_generator.py | 10 ++++++++- smartdispatch/tests/test_job_generator.py | 27 ++++++++++++++++++++++- smartdispatch/workers/base_worker.py | 5 +++++ 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/smartdispatch/job_generator.py b/smartdispatch/job_generator.py index a25b4b0..2cf7363 100644 --- a/smartdispatch/job_generator.py +++ b/smartdispatch/job_generator.py @@ -200,6 +200,12 @@ def _adapt_options(self, pbs): if "-q" in pbs.options: del pbs.options["-q"] + # SBATCH does not interpret options, they can only contain %A if we + # want to include job's name and %a to include job array's index + for option in ['-o', '-e']: + pbs.options[option] = re.sub('"\$PBS_JOBID"', '%A', + pbs.options[option]) + def _adapt_commands(self, pbs): pass @@ -220,7 +226,9 @@ def _adapt_resources(self, pbs): pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "", pbs.resources['nodes']) def _adapt_variable_names(self, pbs): - pass + for command_id, command in enumerate(pbs.commands): + pbs.commands[command_id] = command = re.sub( + "\$PBS_JOBID", "$SLURM_JOB_ID", command) def _add_cluster_specific_rules(self): for pbs in self.pbs_list: diff --git a/smartdispatch/tests/test_job_generator.py b/smartdispatch/tests/test_job_generator.py index 8d053e5..7287056 100644 --- a/smartdispatch/tests/test_job_generator.py +++ b/smartdispatch/tests/test_job_generator.py @@ -287,7 +287,10 @@ def setUp(self): self.nb_gpus_per_node = 2 self.queue = Queue("slurm", "mila", self.walltime, self.nb_cores_per_node, self.nb_gpus_per_node, self.mem_per_node) - self.commands = ["echo 1", "echo 2", "echo 3", "echo 4"] + self.nb_of_commands = 4 + self.commands = ["echo %d; echo $PBS_JOBID; echo $PBS_WALLTIME" % i + for i in range(self.nb_of_commands)] + job_generator = SlurmJobGenerator(self.queue, self.commands) self.pbs = job_generator.pbs_list @@ -311,6 +314,28 @@ def test_queue(self): assert_true("PBS -q" in str(self.dummy_pbs[0])) assert_true("PBS -q" not in str(self.pbs[0])) + def test_outputs(self): + for std in ['-e', '-o']: + value = self.dummy_pbs[0].options[std] + assert_true("$PBS_JOBID" in value, + "$PBS_JOBID should be present in option %s: %s" % + (std, value)) + + value = self.pbs[0].options[std] + assert_true("$PBS_JOBID" not in value, + "$PBS_JOBID not should be present in option %s: %s" % + (std, value)) + assert_true("%A" in value, + "%%A should be present in option %s: %s" % + (std, value)) + + def test_job_id_env_var(self): + self.assertIn("$PBS_JOBID", str(self.dummy_pbs[0])) + self.assertNotIn("$SLURM_JOB_ID", str(self.dummy_pbs[0])) + + self.assertNotIn("$PBS_JOBID", str(self.pbs[0])) + self.assertIn("$SLURM_JOB_ID", str(self.pbs[0])) + class TestJobGeneratorFactory(object): diff --git a/smartdispatch/workers/base_worker.py b/smartdispatch/workers/base_worker.py index b44f5d5..0fec970 100755 --- a/smartdispatch/workers/base_worker.py +++ b/smartdispatch/workers/base_worker.py @@ -76,6 +76,11 @@ def sigterm_handler(signal, frame): # Get job and node ID job_id = os.environ.get('PBS_JOBID', 'undefined') + + # It might be a slurm scheduler + if job_id == 'undefined': + job_id = os.environ.get('SLURM_JOB_ID', 'undefined') + node_name = os.environ.get('HOSTNAME', 'undefined') with open(stdout_filename, 'a') as stdout_file: From f00e877b8c5810891a2a922efe5bfa7fd69087ba Mon Sep 17 00:00:00 2001 From: Xavier Bouthillier Date: Fri, 13 Oct 2017 20:56:23 -0400 Subject: [PATCH 18/36] Add PBS_FILENAME definition to pbs.prolog Slurm cannot be passed environment variables defined locally on command-line like PBS_FILENAME is. To bypass this, we add a definition in the prolog, making PBS_FILENAME available to all commands and epilog. NOTE: We leave PBS_FILENAME definition in command-line too such that any user using $PBS_FILENAME inside a custom pbsFlag can still do so. --- scripts/smart-dispatch | 1 + smartdispatch/pbs.py | 2 ++ smartdispatch/tests/test_job_generator.py | 8 ++++++-- smartdispatch/tests/test_pbs.py | 1 + 4 files changed, 10 insertions(+), 2 deletions(-) diff --git a/scripts/smart-dispatch b/scripts/smart-dispatch index 8a15b8f..48e5c62 100755 --- a/scripts/smart-dispatch +++ b/scripts/smart-dispatch @@ -48,6 +48,7 @@ fi def main(): # Necessary if we want 'logging.info' to appear in stderr. + # TODO: Could we avoid this, can -v (verbose) be sufficiant? logging.root.setLevel(logging.INFO) args = parse_arguments() diff --git a/smartdispatch/pbs.py b/smartdispatch/pbs.py index 1fc22af..a5a2f58 100644 --- a/smartdispatch/pbs.py +++ b/smartdispatch/pbs.py @@ -162,7 +162,9 @@ def save(self, filename): specified where to save this PBS file """ with open(filename, 'w') as pbs_file: + self.prolog.insert(0, "PBS_FILENAME=%s" % filename) pbs_file.write(str(self)) + self.prolog.pop(0) def __str__(self): pbs = [] diff --git a/smartdispatch/tests/test_job_generator.py b/smartdispatch/tests/test_job_generator.py index 7287056..f43adc4 100644 --- a/smartdispatch/tests/test_job_generator.py +++ b/smartdispatch/tests/test_job_generator.py @@ -291,11 +291,15 @@ def setUp(self): self.commands = ["echo %d; echo $PBS_JOBID; echo $PBS_WALLTIME" % i for i in range(self.nb_of_commands)] - job_generator = SlurmJobGenerator(self.queue, self.commands) + self.prolog = ["echo prolog"] + self.epilog = ["echo $PBS_FILENAME"] + job_generator = SlurmJobGenerator( + self.queue, self.commands, prolog=self.prolog, epilog=self.epilog) self.pbs = job_generator.pbs_list with patch.object(SlurmJobGenerator,'_add_cluster_specific_rules', side_effect=lambda: None): - dummy_generator = SlurmJobGenerator(self.queue, self.commands) + dummy_generator = SlurmJobGenerator( + self.queue, self.commands, prolog=self.prolog, epilog=self.epilog) self.dummy_pbs = dummy_generator.pbs_list def test_ppn_ncpus(self): diff --git a/smartdispatch/tests/test_pbs.py b/smartdispatch/tests/test_pbs.py index e9f938f..fcf8b59 100644 --- a/smartdispatch/tests/test_pbs.py +++ b/smartdispatch/tests/test_pbs.py @@ -144,4 +144,5 @@ def test_str(self): def test_save(self): pbs_filename = os.path.join(self.testing_dir, "pbs.sh") self.pbs.save(pbs_filename) + self.pbs.prolog.insert(0, "PBS_FILENAME=%s" % pbs_filename) assert_equal(str(self.pbs), open(pbs_filename).read()) From 6b2d5305e662687eb14136731064baaf7e2148ee Mon Sep 17 00:00:00 2001 From: Xavier Bouthillier Date: Fri, 13 Oct 2017 21:13:28 -0400 Subject: [PATCH 19/36] Fix env var export option for Slurm PBS options -V is not converted properly to SBATCH --export ALL. We remove it and replace it with --export=ALL is the sbatch options. --- smartdispatch/job_generator.py | 9 +++++++++ smartdispatch/tests/test_job_generator.py | 5 +++++ 2 files changed, 14 insertions(+) diff --git a/smartdispatch/job_generator.py b/smartdispatch/job_generator.py index 2cf7363..7b53260 100644 --- a/smartdispatch/job_generator.py +++ b/smartdispatch/job_generator.py @@ -206,6 +206,15 @@ def _adapt_options(self, pbs): pbs.options[option] = re.sub('"\$PBS_JOBID"', '%A', pbs.options[option]) + # Convert to Slurm's --export + # + # Warning: Slurm does **not** export variables defined locally such as + # variables defined along the command line. For ex: + # PBS_FILENAME=something sbatch --export=ALL somefile.sh + # would *not* export PBS_FILENAME to the script. + if pbs.options.pop('-V', None) is not None: + pbs.add_sbatch_options(export='ALL') + def _adapt_commands(self, pbs): pass diff --git a/smartdispatch/tests/test_job_generator.py b/smartdispatch/tests/test_job_generator.py index f43adc4..0b6975e 100644 --- a/smartdispatch/tests/test_job_generator.py +++ b/smartdispatch/tests/test_job_generator.py @@ -318,6 +318,11 @@ def test_queue(self): assert_true("PBS -q" in str(self.dummy_pbs[0])) assert_true("PBS -q" not in str(self.pbs[0])) + def test_export(self): + assert_true("#PBS -V" in str(self.dummy_pbs[0])) + assert_true("#PBS -V" not in str(self.pbs[0])) + assert_true("#SBATCH --export=ALL" in str(self.pbs[0])) + def test_outputs(self): for std in ['-e', '-o']: value = self.dummy_pbs[0].options[std] From 21df3dd449fd3e9a8ad2df80e2d75fadf9f5b2ff Mon Sep 17 00:00:00 2001 From: Xavier Bouthillier Date: Fri, 13 Oct 2017 21:54:02 -0400 Subject: [PATCH 20/36] Adapt PBS_WALLTIME for slurm Slurm does not have a equivalent environment variable set like PBS_WALLTIME. To avoid confusion, all variables PBS_WALLTIME are renamed to SBATCH_TIMELIMIT (the environment variable one would use to set --time with sbatch). As SBATCH_TIMELIMIT is not set automatically, we add it to the prolog to make it available to all commands and epilog. NOTE: PBS_WALLTIME is set in seconds, but we only have HH:MM:SS-like strings at the time of building the PBS file. We needed to add a walltime_to_seconds helper function to convert HH:MM:SS like strings into seconds, so that SBATCH_TIMELIMIT is set with seconds like PBS_WALLTIME. --- smartdispatch/job_generator.py | 14 +++++- smartdispatch/tests/test_job_generator.py | 12 +++++ smartdispatch/tests/test_utils.py | 58 +++++++++++++++++++++++ smartdispatch/utils.py | 26 +++++++++- 4 files changed, 107 insertions(+), 3 deletions(-) diff --git a/smartdispatch/job_generator.py b/smartdispatch/job_generator.py index 7b53260..a65424a 100644 --- a/smartdispatch/job_generator.py +++ b/smartdispatch/job_generator.py @@ -238,10 +238,22 @@ def _adapt_variable_names(self, pbs): for command_id, command in enumerate(pbs.commands): pbs.commands[command_id] = command = re.sub( "\$PBS_JOBID", "$SLURM_JOB_ID", command) + # NOTE: SBATCH_TIMELIMIT is **not** an official slurm environment + # variable, it needs to be set in the script. + pbs.commands[command_id] = command = re.sub( + "\$PBS_WALLTIME", "$SBATCH_TIMELIMIT", command) + + def _adapt_prolog(self, pbs): + # Set SBATCH_TIMELIMIT in the prolog, hence, before any code from + # commands and epilog. + pbs.add_to_prolog( + "SBATCH_TIMELIMIT=%s" % + utils.walltime_to_seconds(pbs.resources["walltime"])) def _add_cluster_specific_rules(self): for pbs in self.pbs_list: self._adapt_options(pbs) self._adapt_resources(pbs) - self._adapt_commands(pbs) self._adapt_variable_names(pbs) + self._adapt_prolog(pbs) + self._adapt_commands(pbs) diff --git a/smartdispatch/tests/test_job_generator.py b/smartdispatch/tests/test_job_generator.py index 0b6975e..d735c23 100644 --- a/smartdispatch/tests/test_job_generator.py +++ b/smartdispatch/tests/test_job_generator.py @@ -345,6 +345,18 @@ def test_job_id_env_var(self): self.assertNotIn("$PBS_JOBID", str(self.pbs[0])) self.assertIn("$SLURM_JOB_ID", str(self.pbs[0])) + def test_walltime_env_var(self): + self.assertIn("$PBS_WALLTIME", str(self.dummy_pbs[0])) + self.assertNotIn("$SBATCH_TIMELIMIT", str(self.dummy_pbs[0])) + + self.assertNotIn("$PBS_WALLTIME", str(self.pbs[0])) + self.assertIn("$SBATCH_TIMELIMIT", str(self.pbs[0])) + + self.assertNotIn("SBATCH_TIMELIMIT=", + "\n".join(self.dummy_pbs[0].prolog)) + self.assertIn("SBATCH_TIMELIMIT=", + "\n".join(self.pbs[0].prolog)) + class TestJobGeneratorFactory(object): diff --git a/smartdispatch/tests/test_utils.py b/smartdispatch/tests/test_utils.py index d6e357b..5acba68 100644 --- a/smartdispatch/tests/test_utils.py +++ b/smartdispatch/tests/test_utils.py @@ -1,4 +1,6 @@ # -*- coding: utf-8 -*- +import random + import unittest try: from mock import patch @@ -6,11 +8,67 @@ from unittest.mock import patch from nose.tools import assert_equal, assert_true + from numpy.testing import assert_array_equal import subprocess from smartdispatch import utils + +class TestWalltimeToSeconds(unittest.TestCase): + def setUp(self): + self.format = dict( + days=random.randint(0, 10), + hours=random.randint(0, 23), + minutes=random.randint(0, 59), + seconds=random.randint(0, 59)) + + def _compute_seconds(self, days=0, hours=0, minutes=0, seconds=0): + return ((((days * 24) + hours) * 60) + minutes * 60) + seconds + + def test_ddhhmmss(self): + seconds = utils.walltime_to_seconds( + "{days}:{hours}:{minutes}:{seconds}".format(**self.format)) + self.assertEqual(seconds, self._compute_seconds(**self.format)) + + def test_hhmmss(self): + truncated_format = self.format.copy() + truncated_format.pop("days") + + seconds = utils.walltime_to_seconds( + "{hours}:{minutes}:{seconds}".format(**truncated_format)) + self.assertEqual(seconds, self._compute_seconds(**truncated_format)) + + def test_mmss(self): + truncated_format = self.format.copy() + truncated_format.pop("days") + truncated_format.pop("hours") + + seconds = utils.walltime_to_seconds( + "{minutes}:{seconds}".format(**truncated_format)) + self.assertEqual(seconds, self._compute_seconds(**truncated_format)) + + def test_ss(self): + truncated_format = self.format.copy() + truncated_format.pop("days") + truncated_format.pop("hours") + truncated_format.pop("minutes") + + seconds = utils.walltime_to_seconds( + "{seconds}".format(**truncated_format)) + self.assertEqual(seconds, self._compute_seconds(**truncated_format)) + + def test_too_much_columns(self): + with self.assertRaises(ValueError): + seconds = utils.walltime_to_seconds( + "1:{days}:{hours}:{minutes}:{seconds}".format(**self.format)) + + def test_with_text(self): + with self.assertRaises(ValueError): + seconds = utils.walltime_to_seconds( + "{days}hoho:{hours}:{minutes}:{seconds}".format(**self.format)) + + class PrintBoxedTests(unittest.TestCase): def setUp(self): diff --git a/smartdispatch/utils.py b/smartdispatch/utils.py index 4839c00..633682e 100644 --- a/smartdispatch/utils.py +++ b/smartdispatch/utils.py @@ -1,11 +1,33 @@ -import re import hashlib -import unicodedata import json +import re +import unicodedata from distutils.util import strtobool from subprocess import Popen, PIPE + +TIME_REGEX = re.compile( + "^(?:(?:(?:(\d*):)?(\d*):)?(\d*):)?(\d*)$") + + +def walltime_to_seconds(walltime): + if not TIME_REGEX.match(walltime): + raise ValueError( + "Invalid walltime format: %s\n" + "It must be either DD:HH:MM:SS, HH:MM:SS, MM:SS or S" % + walltime) + + split = walltime.split(":") + + while len(split) < 4: + split = [0] + split + + days, hours, minutes, seconds = map(int, split) + + return ((((days * 24) + hours) * 60) + minutes * 60) + seconds + + def jobname_generator(jobname, job_id): '''Crop the jobname to a maximum of 64 characters. Parameters From ea1d5b3e37fa03ea88c885fd6bcea8f708fe9677 Mon Sep 17 00:00:00 2001 From: Xavier Bouthillier Date: Fri, 13 Oct 2017 22:37:23 -0400 Subject: [PATCH 21/36] Add sbatch to command-line launcher options --- scripts/sd-launch-pbs | 2 +- scripts/smart-dispatch | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/sd-launch-pbs b/scripts/sd-launch-pbs index 0b38733..c575e28 100644 --- a/scripts/sd-launch-pbs +++ b/scripts/sd-launch-pbs @@ -23,7 +23,7 @@ def main(): def parse_arguments(): parser = argparse.ArgumentParser() - parser.add_argument('-L', '--launcher', choices=['qsub', 'msub'], required=False, help='Which launcher to use. Default: qsub') + parser.add_argument('-L', '--launcher', choices=['qsub', 'msub', 'sbatch'], required=False, help='Which launcher to use. Default: qsub') parser.add_argument('pbs', type=str, help='PBS filename to launch.') parser.add_argument('path_job', type=str, help='Path to the job folder.') diff --git a/scripts/smart-dispatch b/scripts/smart-dispatch index 48e5c62..109b5ea 100755 --- a/scripts/smart-dispatch +++ b/scripts/smart-dispatch @@ -194,7 +194,7 @@ def parse_arguments(): parser.add_argument('-q', '--queueName', required=True, help='Queue used (ex: qwork@mp2, qfat256@mp2, gpu_1)') parser.add_argument('-n', '--batchName', required=False, help='The name of the batch. Default: The commands launched.') parser.add_argument('-t', '--walltime', required=False, help='Set the estimated running time of your jobs using the DD:HH:MM:SS format. Note that they will be killed when this time limit is reached.') - parser.add_argument('-L', '--launcher', choices=['qsub', 'msub'], required=False, help='Which launcher to use. Default: qsub') + parser.add_argument('-L', '--launcher', choices=['qsub', 'msub', 'sbatch'], required=False, help='Which launcher to use. Default: qsub') parser.add_argument('-C', '--coresPerNode', type=int, required=False, help='How many cores there are per node.') parser.add_argument('-G', '--gpusPerNode', type=int, required=False, help='How many gpus there are per node.') # parser.add_argument('-M', '--memPerNode', type=int, required=False, help='How much memory there are per node (in Gb).') From adb8cba0043c7cabbc036c0820b6f55a373a2af0 Mon Sep 17 00:00:00 2001 From: Xavier Bouthillier Date: Fri, 13 Oct 2017 22:41:36 -0400 Subject: [PATCH 22/36] Make get_launcher more flexible It is possible to query the system to see if some commands are available using distutils.spawn.find_executable(command_name). Clusters where more than one launcher are available will still get launchers selected based on string matching. For instance, get_launcher("helios") would always return msub no matter what is available on the system. --- smartdispatch/tests/test_utils.py | 69 ++++++++++++++++++++++++++++++- smartdispatch/utils.py | 33 +++++++++++---- 2 files changed, 92 insertions(+), 10 deletions(-) diff --git a/smartdispatch/tests/test_utils.py b/smartdispatch/tests/test_utils.py index 5acba68..39dbac0 100644 --- a/smartdispatch/tests/test_utils.py +++ b/smartdispatch/tests/test_utils.py @@ -1,5 +1,7 @@ # -*- coding: utf-8 -*- import random +import string +import subprocess import unittest try: @@ -10,7 +12,6 @@ from nose.tools import assert_equal, assert_true from numpy.testing import assert_array_equal -import subprocess from smartdispatch import utils @@ -152,3 +153,69 @@ class SlurmClusterIdentificationTest(ClusterIdentificationTest): def __init__(self, *args, **kwargs): super(SlurmClusterIdentificationTest, self).__init__(*args, **kwargs) self.detect_cluster = utils.get_slurm_cluster_name + + +class TestGetLauncher(unittest.TestCase): + longMessage = True + + N_RANDOM = 10 + RANDOM_SIZE = (2, 10) + + CLUSTER_NAMES = ["hades", "mammouth", "guillimin", "helios"] + + def _get_random_string(self): + return ''.join([random.choice(string.lowercase) + for i in xrange(random.randint(*self.RANDOM_SIZE))]) + + def _assert_launcher(self, desired, cluster_name): + if cluster_name in utils.MSUB_CLUSTERS: + desired = "msub" + + self.assertEqual( + desired, utils.get_launcher(cluster_name), + msg="for cluster %s" % cluster_name) + + def test_get_launcher(self): + self.assertEqual("msub", utils.get_launcher("helios")) + + # For supported launcher and random ones... + with patch('smartdispatch.utils.distutils') as mock_distutils: + + for launcher in utils.SUPPORTED_LAUNCHERS: + + mock_distutils.spawn.find_executable.side_effect = ( + lambda command: launcher if launcher == command else None) + + for cluster_name in self.CLUSTER_NAMES: + self._assert_launcher(launcher, cluster_name) + + for idx in range(self.N_RANDOM): + self._assert_launcher(launcher, self._get_random_string()) + + # Test if there was no *supported* launcher on the system + launcher = self._get_random_string() + mock_distutils.spawn.find_executable.side_effect = ( + lambda command: launcher if launcher == command else None) + + for cluster_name in self.CLUSTER_NAMES: + if cluster_name in utils.MSUB_CLUSTERS: + continue + with self.assertRaises(RuntimeError): + utils.get_launcher(cluster_name) + + for idx in range(self.N_RANDOM): + with self.assertRaises(RuntimeError): + utils.get_launcher(self._get_random_string()) + + # Test if command_is_available only returns None + mock_distutils.spawn.find_executable.return_value = None + + for cluster_name in self.CLUSTER_NAMES: + if cluster_name in utils.MSUB_CLUSTERS: + continue + with self.assertRaises(RuntimeError): + utils.get_launcher(cluster_name) + + for idx in range(self.N_RANDOM): + with self.assertRaises(RuntimeError): + utils.get_launcher(self._get_random_string()) diff --git a/smartdispatch/utils.py b/smartdispatch/utils.py index 633682e..7f1db5e 100644 --- a/smartdispatch/utils.py +++ b/smartdispatch/utils.py @@ -1,9 +1,10 @@ +import distutils +import distutils.spawn import hashlib import json import re import unicodedata -from distutils.util import strtobool from subprocess import Popen, PIPE @@ -70,10 +71,10 @@ def yes_no_prompt(query, default=None): while True: try: answer = raw_input("{0}{1}".format(query, available_prompts[default])) - return strtobool(answer) + return distutils.strtobool(answer) except ValueError: if answer == '' and default is not None: - return strtobool(default) + return distutils.strtobool(default) def chunks(sequence, n): @@ -136,10 +137,10 @@ def detect_cluster(): try: output = Popen(["qstat", "-B"], stdout=PIPE).communicate()[0] except OSError: - # If qstat is not available we assume that the cluster is unknown. - # TODO: handle MILA + CEDAR + GRAHAM + # If qstat is not available we assume that the cluster uses slurm. + # (Otherwise return None) cluster_name = get_slurm_cluster_name() - return None + return cluster_name # Get server name from status server_name = output.split('\n')[2].split(' ')[0] # Cleanup the name and return it @@ -160,8 +161,22 @@ def get_slurm_cluster_name(): cluster_name = stdout.splitlines()[2].strip().split(' ')[0] return cluster_name + +MSUB_CLUSTERS = ["helios"] +SUPPORTED_LAUNCHERS = ["qsub", "msub", "sbatch"] + + +def command_is_available(command): + return distutils.spawn.find_executable(command) is not None + + def get_launcher(cluster_name): - if cluster_name == "helios": + # Gives priority to msub if qsub is also present + if cluster_name in MSUB_CLUSTERS: return "msub" - else: - return "qsub" + + for launcher in SUPPORTED_LAUNCHERS: + if command_is_available(launcher): + return launcher + + raise RuntimeError("No compatible launcher found on the system") From f3661ba19b6e6babdec9f403fafa921f575d6000 Mon Sep 17 00:00:00 2001 From: Xavier Bouthillier Date: Fri, 13 Oct 2017 22:54:49 -0400 Subject: [PATCH 23/36] Add verbosity to smart-dispatch It is difficult to debug resuming while important process are taking place in the pbs script automatically built by SmartDispatch. We add verbose to smart-dispatch script and add debugging prints in epilog. --- scripts/sd-launch-pbs | 17 ++++++++++++ scripts/smart-dispatch | 60 +++++++++++++++++++++++++++++++++++++----- 2 files changed, 71 insertions(+), 6 deletions(-) diff --git a/scripts/sd-launch-pbs b/scripts/sd-launch-pbs index c575e28..f0190bc 100644 --- a/scripts/sd-launch-pbs +++ b/scripts/sd-launch-pbs @@ -7,6 +7,10 @@ import logging from smartdispatch import launch_jobs from smartdispatch import utils + +logger = logging.getLogger() + + LOGS_FOLDERNAME = "SMART_DISPATCH_LOGS" CLUSTER_NAME = utils.detect_cluster() LAUNCHER = utils.get_launcher(CLUSTER_NAME) @@ -27,8 +31,21 @@ def parse_arguments(): parser.add_argument('pbs', type=str, help='PBS filename to launch.') parser.add_argument('path_job', type=str, help='Path to the job folder.') + parser.add_argument( + '-v', '--verbose', action='count', default=0, + help="Print informations about the process.\n" + " -v: INFO\n" + " -vv: DEBUG") + args = parser.parse_args() + if args.verbose == 0: + logging.basicConfig(level=logging.WARNING) + elif args.verbose == 1: + logging.basicConfig(level=logging.INFO) + elif args.verbose >= 2: + logging.basicConfig(level=logging.DEBUG) + return args diff --git a/scripts/smart-dispatch b/scripts/smart-dispatch index 109b5ea..79768a0 100755 --- a/scripts/smart-dispatch +++ b/scripts/smart-dispatch @@ -1,9 +1,10 @@ #!/usr/bin/env python2 # -*- coding: utf-8 -*- +import argparse +import logging import os import sys -import argparse import time as t from os.path import join as pjoin from textwrap import dedent @@ -16,9 +17,12 @@ from smartdispatch import get_available_queues from smartdispatch import launch_jobs from smartdispatch import utils -import logging import smartdispatch + +logger = logging.getLogger() + + LOGS_FOLDERNAME = "SMART_DISPATCH_LOGS" CLUSTER_NAME = utils.detect_cluster() AVAILABLE_QUEUES = get_available_queues(CLUSTER_NAME) @@ -29,19 +33,45 @@ TIMEOUT_EXIT_CODE = 124 AUTORESUME_TRIGGER_AFTER = '$(($PBS_WALLTIME - 60))' # By default, 60s before the maximum walltime. AUTORESUME_WORKER_CALL_PREFIX = 'timeout -s TERM {trigger_after} '.format(trigger_after=AUTORESUME_TRIGGER_AFTER) AUTORESUME_WORKER_CALL_SUFFIX = ' WORKER_PIDS+=" $!"' -AUTORESUME_PROLOG = 'WORKER_PIDS=""' +AUTORESUME_PROLOG = """ +WORKER_PIDS="" +VERBOSE={verbose} +""" AUTORESUME_EPILOG = """\ NEED_TO_RESUME=false +if [ $VERBOSE = true ]; then + echo NEED_TO_RESUME=$NEED_TO_RESUME + echo WORKER_PIDS=$WORKER_PIDS +fi for WORKER_PID in $WORKER_PIDS; do + if [ $VERBOSE = true ]; then + echo WORKER_PID=$WORKER_PID + fi wait "$WORKER_PID" RETURN_CODE=$? + if [ $VERBOSE = true ]; then + echo "RETURN_CODE is $RETURN_CODE while " \ + "timeout_exit_code is {timeout_exit_code}" + fi if [ $RETURN_CODE -eq {timeout_exit_code} ]; then NEED_TO_RESUME=true fi + if [ $VERBOSE = true ]; then + echo NEED_TO_RESUME=$NEED_TO_RESUME + fi done +if [ $VERBOSE = true ]; then + echo NEED_TO_RESUME=$NEED_TO_RESUME +fi if [ "$NEED_TO_RESUME" = true ]; then echo "Autoresuming using: {{launcher}} $PBS_FILENAME" - sd-launch-pbs --launcher {{launcher}} $PBS_FILENAME {{path_job}} + if [ $VERBOSE = true]; then + VERBOSE_OPTION="-vv" + else + VERBOSE_OPTION="" + fi + + sd-launch-pbs $VERBOSE_OPTION --launcher {{launcher}} $PBS_FILENAME {{path_job}} fi """.format(timeout_exit_code=TIMEOUT_EXIT_CODE) @@ -164,8 +194,12 @@ def main(): prolog = [] epilog = ['wait'] if args.autoresume: - prolog = [AUTORESUME_PROLOG] - epilog = [AUTORESUME_EPILOG.format(launcher=LAUNCHER if args.launcher is None else args.launcher, path_job=path_job)] + prolog = [ + AUTORESUME_PROLOG.format(verbose=str(args.verbose >= 2).lower())] + epilog = [ + AUTORESUME_EPILOG.format( + launcher=LAUNCHER if args.launcher is None else args.launcher, + path_job=path_job)] job_generator = job_generator_factory(queue, commands, prolog, epilog, command_params, CLUSTER_NAME, path_job) @@ -191,6 +225,13 @@ def main(): def parse_arguments(): parser = argparse.ArgumentParser() + + parser.add_argument( + '-v', '--verbose', action='count', default=0, + help="Print informations about the process.\n" + " -v: INFO\n" + " -vv: DEBUG") + parser.add_argument('-q', '--queueName', required=True, help='Queue used (ex: qwork@mp2, qfat256@mp2, gpu_1)') parser.add_argument('-n', '--batchName', required=False, help='The name of the batch. Default: The commands launched.') parser.add_argument('-t', '--walltime', required=False, help='Set the estimated running time of your jobs using the DD:HH:MM:SS format. Note that they will be killed when this time limit is reached.') @@ -231,6 +272,13 @@ def parse_arguments(): if args.coresPerCommand < 1: parser.error("coresPerNode must be at least 1") + if args.verbose == 0: + logging.basicConfig(level=logging.WARNING) + elif args.verbose == 1: + logging.basicConfig(level=logging.INFO) + elif args.verbose >= 2: + logging.basicConfig(level=logging.DEBUG) + return args From 972a1ab70d5d1f8ab7cd53dac1f83ba844e2be2f Mon Sep 17 00:00:00 2001 From: Adrien Ali Taiga Date: Sun, 15 Oct 2017 17:38:00 -0400 Subject: [PATCH 24/36] Updated documentation for slurm clusters --- README.md | 8 ++++++++ smartdispatch/utils.py | 9 ++++++--- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 5dea792..5b15432 100644 --- a/README.md +++ b/README.md @@ -74,3 +74,11 @@ python my_script.py 9 Jobs that did not terminate properly, for example, it exceeded the walltime, can be resumed using the {batch_id} given to you upon launch. Of course, all this assuming your script is resumable. *Note: Jobs are always in a batch, even if it's a batch of one.* + +### SLURM clusters + +Smartdispatch can also run on SLURM clusters +All features like `--gpusPerNode` or `--coresPerNode` are supported +However you need to pass SLURM specific features like `--qos` or `--output`as sbatch flach with: +`smart-dispatch -q qtest@mp2 launch python my_script.py [1:4] --sbatchFlags="--qos=high -ofile.out` +If you use the full name of a feature you have to separate the feature and its value by a = like `--qos=high` if not you have to append the features and its value ex: `-ofile.out` is valid, not `-o file.out` diff --git a/smartdispatch/utils.py b/smartdispatch/utils.py index 7f1db5e..58159f9 100644 --- a/smartdispatch/utils.py +++ b/smartdispatch/utils.py @@ -156,9 +156,12 @@ def detect_cluster(): return cluster_name def get_slurm_cluster_name(): - stdout = Popen("sacctmgr list cluster", stdout=PIPE, shell=True).communicate()[0] - stdout = stdout.decode() - cluster_name = stdout.splitlines()[2].strip().split(' ')[0] + try: + stdout = Popen("sacctmgr list cluster", stdout=PIPE, shell=True).communicate()[0] + stdout = stdout.decode() + cluster_name = stdout.splitlines()[2].strip().split(' ')[0] + except IndexError, OSError: + return None return cluster_name From 29973b0c1b72c984607be97262b35a643bde737a Mon Sep 17 00:00:00 2001 From: Xavier Bouthillier Date: Mon, 16 Oct 2017 09:13:50 -0400 Subject: [PATCH 25/36] Add support for SlurmJobGenerator JobGenerators are selected by job_generator_factory based on the cluster's name. We use a more flexible, duck typing approach for Slurm clusters. If cluster name is not known, or not any of the if-case clauses in the factory, then we look at which launchers are available in the system. If it is sbatch, then a SlurmJobGenerator is built, a JobGenerator otherwise. --- smartdispatch/job_generator.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/smartdispatch/job_generator.py b/smartdispatch/job_generator.py index a65424a..9465c8b 100644 --- a/smartdispatch/job_generator.py +++ b/smartdispatch/job_generator.py @@ -15,6 +15,8 @@ def job_generator_factory(queue, commands, prolog=[], epilog=[], command_params= return HeliosJobGenerator(queue, commands, prolog, epilog, command_params, base_path) elif cluster_name == "hades": return HadesJobGenerator(queue, commands, prolog, epilog, command_params, base_path) + elif utils.get_launcher(cluster_name) == "sbatch": + return SlurmJobGenerator(queue, commands, prolog, epilog, command_params, base_path) return JobGenerator(queue, commands, prolog, epilog, command_params, base_path) From f734fb36c08280689f9b5ceadaf7bdf7f7796d9f Mon Sep 17 00:00:00 2001 From: Xavier Bouthillier Date: Mon, 16 Oct 2017 09:19:20 -0400 Subject: [PATCH 26/36] Print stderr when both qsub and sacctmgr fails The command `sacctmgr` fails on some computers (mila01 namely), but the current behavior gives the impression sbatch is simply not available. Printing the stderr makes it more obvious that sbatch should be available, but something is broken behind sacctmgr. It only appears when using -vv options nevertheless. --- smartdispatch/utils.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/smartdispatch/utils.py b/smartdispatch/utils.py index 58159f9..fc86eb6 100644 --- a/smartdispatch/utils.py +++ b/smartdispatch/utils.py @@ -2,12 +2,16 @@ import distutils.spawn import hashlib import json +import logging import re import unicodedata from subprocess import Popen, PIPE +logger = logging.getLogger(__name__) + + TIME_REGEX = re.compile( "^(?:(?:(?:(\d*):)?(\d*):)?(\d*):)?(\d*)$") @@ -157,11 +161,18 @@ def detect_cluster(): def get_slurm_cluster_name(): try: - stdout = Popen("sacctmgr list cluster", stdout=PIPE, shell=True).communicate()[0] + popen = Popen("sacctmgr list cluster", stdout=PIPE, shell=True) + stdout, stderr = popen.communicate() + except OSError: + return None + + try: stdout = stdout.decode() cluster_name = stdout.splitlines()[2].strip().split(' ')[0] - except IndexError, OSError: + except IndexError: + logger.debug(stderr) return None + return cluster_name From 45068870e8b572e910659ec1b4440267dd63a0d5 Mon Sep 17 00:00:00 2001 From: Xavier Bouthillier Date: Mon, 16 Oct 2017 09:25:09 -0400 Subject: [PATCH 27/36] Add automatic script for cluster verification Adding a script to do automatic verifications to assert validity of the current code. The verifications are not automatic unit-tests, they need automatically checks that the process executed successfully, but the administrator still needs to verify manually, reading the logs, that the requested resources were provided. Verifications can easily be combined, building on top of each others, from complex ones to simpler ones. Here is a list of all the verification currently implemented for slurm clusters: 1. very_simple_task (1 CPU) 2. verify_simple_task_with_one_gpu (1 CPU 1 GPU) 3. verify_simple_task_with_many_gpus (1 CPU X GPU) 4. verify_many_task (X CPU) 5. verify_many_task_with_many_cores (XY CPU) 6. verify_many_task_with_one_gpu (X CPU X GPU) 7. verify_many_task_with_many_gpus (X CPU Y GPU) 8. verify_simple_task_with_autoresume_unneeded (1 CPU) 9. verify_simple_task_with_autoresume_needed (1 CPU) 10. verify_many_task_with_autoresume_needed (X CPU) --- smartdispatch/tests/verify_slurm_cluster.py | 477 ++++++++++++++++++++ 1 file changed, 477 insertions(+) create mode 100644 smartdispatch/tests/verify_slurm_cluster.py diff --git a/smartdispatch/tests/verify_slurm_cluster.py b/smartdispatch/tests/verify_slurm_cluster.py new file mode 100644 index 0000000..a3c1aa6 --- /dev/null +++ b/smartdispatch/tests/verify_slurm_cluster.py @@ -0,0 +1,477 @@ +import datetime +import inspect +import functools +import getpass +import glob +import os +import pdb +import subprocess +import sys +import time +import traceback + +WALLTIME = 60 # seconds + +command_string = """\ +#!/usr/bin/env /bin/bash + +###################### +# Begin work section # +###################### + +echo "My SLURM_JOB_ID:" $SLURM_JOB_ID +echo "My SLURM_ARRAY_JOB_ID:" $SLURM_ARRAY_JOB_ID +echo "My SLURM_ARRAY_TASK_ID: " $SLURM_ARRAY_TASK_ID + +echo called with option "$1" + +export HOME=`getent passwd $USER | cut -d':' -f6` +source ~/.bashrc +export THEANO_FLAGS=... +export PYTHONUNBUFFERED=1 +echo Running on $HOSTNAME + +if [ -e "paused$1.log" ] +then + echo "resuming $1" + touch resumed$1.log +else + echo "running $1 from scratch" + touch running$1.log +fi + + +# Test GPUs +echo "echo CUDA_VISIBLE_DEVICES" +echo $CUDA_VISIBLE_DEVICES +echo + +nvidia-smi + +# Test CPUs +# How? + +# Test resume +if [ ! -e "paused$1.log" ] +then + touch paused$1.log + echo "sleeping $1 %(sleep)s seconds" + sleep %(sleep)ss +fi + +echo completed $1 +mv paused$1.log completed$1.log +""" + + +def set_defaults(dictionary, **kwargs): + + for item, value in kwargs.iteritems(): + dictionary.setdefault(item, value) + + +def strfdelta(tdelta, fmt): + """ + From https://stackoverflow.com/a/8907269 + """ + + d = {} + d["hours"], rem = divmod(tdelta.seconds, 3600) + d["hours"] += tdelta.days * 24 + d["minutes"], d["seconds"] = divmod(rem, 60) + return fmt % d + + +def infer_verification_name(): + + for stack in inspect.stack(): + if stack[3].startswith("verify_"): + return stack[3] + + raise RuntimeError("Cannot infer verification name:\n %s" % + "\n".join(str(t) for t in traceback.format_stack())) + + +def build_argv(coresPerCommand, gpusPerCommand, walltime, coresPerNode, + gpusPerNode, batchName=None, commandsFile=None, + doNotLaunch=False, autoresume=False, pool=None, + sbatchFlags=None): + + if batchName is None: + batchName = infer_verification_name() + + argv = """ +-vv +--queueName dummy +--batchName %(batchName)s --walltime %(walltime)s +--coresPerCommand %(coresPerCommand)s +--gpusPerCommand %(gpusPerCommand)s +--coresPerNode %(coresPerNode)s +--gpusPerNode %(gpusPerNode)s + """ % dict(batchName=batchName, + walltime=strfdelta( + datetime.timedelta(seconds=walltime), + "%(hours)02d:%(minutes)02d:%(seconds)02d"), + coresPerCommand=coresPerCommand, + gpusPerCommand=gpusPerCommand, + coresPerNode=coresPerNode, + gpusPerNode=gpusPerNode) + + # File containing commands to launch. Each command must + # be on a seperate line. (Replaces commandAndOptions) + if commandsFile: + argv += " --commandsFile " + commandsFile + + # Generate all the files without launching the job. + if doNotLaunch: + argv += " --doNotLaunch" + + # Requeue the job when the running time hits the maximum + # walltime allowed on the cluster. Assumes that commands + # are resumable. + if autoresume: + argv += " --autoresume" + + # Number of workers that will be consuming commands. + # Default: Nb commands + if pool: + argv += " --pool " + pool + + # ADVANCED USAGE: Allow to pass a space seperated list of SBATCH flags. + # Ex:--sbatchFlags="--qos=high --ofile.out" + if sbatchFlags: + argv += " --sbatchFlags=" + sbatchFlags + + return argv.replace("\n", " ") + + +def get_squeue(): + command = ("squeue -u %(username)s" % + dict(username=getpass.getuser())) + process = subprocess.Popen( + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + stdout, stderr = process.communicate() + return stdout + + +def try_to_remove_file(filename_template, expected_number): + file_names = glob.glob(filename_template) + try: + i = 0 + for file_name in file_names: + i += 1 + os.remove(file_name) + except OSError as e: + print str(e) + + if i != expected_number: + print "Error: Expected %d files, found %d" % (expected_number, i) + else: + print "OK: All %d files %s were found:\n%s" % ( + expected_number, filename_template, + "\n".join(sorted(file_names))) + + +def minimum_requirement(attribute_name, minimum_value): + + def decorator(method): + + @functools.wraps(method) + def call(self, *args, **kwargs): + + # Method was called from another verification + try: + verification_name = infer_verification_name() + # Method was called directly + except RuntimeError: + verification_name = method.__name__ + + if not hasattr(self, attribute_name): + raise ValueError("Invalid requirement, object %s does not " + "have attribute %s" % + (self.__class__.__name__, attribute_name)) + + if getattr(self, attribute_name) >= minimum_value: + return method(self, *args, **kwargs) + else: + print ("%s does not have enough %s: %d." + "Skipping %s." % + (self.__class__.__name__, attribute_name, minimum_value, + verification_name)) + return None + + return call + + return decorator + + +class VerifySlurmCluster(object): + + WALLTIME = 60 + CORES_PER_NODE = 8 + GPUS_PER_NODE = 0 + + def __init__(self, debug=False, no_fork=False): + self.debug = debug + self.no_fork = no_fork + + def get_verification_methods(self, filtered_by=None): + methods = inspect.getmembers(self, predicate=inspect.ismethod) + + def filtering(item): + key = item[0] + + if not key.startswith("verify_"): + return False + elif filtered_by is not None and key not in filtered_by: + return False + + return True + + return dict(filter(filtering, methods)) + + def run_verifications(self, filtered_by=None): + if filtered_by is not None and len(filtered_by) == 0: + filtered_by = None + + verification_methods = self.get_verification_methods(filtered_by) + processes = [] + for verification_name, verification_fct in \ + verification_methods.iteritems(): + print "========%s" % ("=" * len(verification_name)) + print "Running %s" % verification_name + print "========%s\n\n" % ("=" * len(verification_name)) + + if self.debug or self.no_fork: + verification_fct() + else: + # fork the process in a new dir and new stdout, stderr + verification_dir = os.path.join( + os.getcwd(), self.__class__.__name__, verification_name) + + if not os.path.isdir(verification_dir): + os.makedirs(verification_dir) + + stdout = open(os.path.join(verification_dir, + "validation.out"), 'w') + stderr = open(os.path.join(verification_dir, + "validation.err"), 'w') + + popen = subprocess.Popen( + "/bin/bash", + shell=True, + stdin=subprocess.PIPE, + stdout=stdout, + stderr=stderr) + + popen.stdin.write("cd %s;" % verification_dir) + + script_path = os.path.join( + os.getcwd(), inspect.getfile(self.__class__)) + popen.stdin.write( + "python %s --no-fork %s;" % ( + script_path, verification_name)) + print "python %s --no-fork %s;" % ( + script_path, verification_name) + + processes.append(popen) + + for popen in processes: + # popen.communicate() + popen.terminate() + + def run_test(self, argv, command_string, command_arguments=""): + FILE_NAME = "test.sh" + + with open("test.sh", "w") as text_file: + text_file.write(command_string) + + command = ("smart-dispatch %s launch bash %s %s" % + (argv, FILE_NAME, command_arguments)) + print "running test with command: " + print command + + process = subprocess.Popen( + command, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + stdout, stderr = process.communicate() + + print "\nstdout:" + print stdout.decode() + + print "\nstderr:" + print stderr.decode() + return stdout.split("\n")[-2].strip() + + def validate(self, root_dir, argv, squeue_wait, nb_of_commands=1, + resume=False): + + print "\nValidating arguments:" + print argv + + stdout = get_squeue() + number_of_process = stdout.count("\n") - 1 + + while number_of_process > 0: + root = os.path.join(root_dir, "commands") + for file_path in os.listdir(root): + if file_path.endswith(".sh"): + print file_path + print open(os.path.join(root, file_path), 'r').read() + + print stdout + print "Waiting %d seconds" % squeue_wait + time.sleep(squeue_wait) + stdout = get_squeue() + number_of_process = stdout.count("\n") - 1 + print stdout + print number_of_process + + try_to_remove_file("running*.log", expected_number=nb_of_commands) + try_to_remove_file("resumed*.log", + expected_number=nb_of_commands * int(resume)) + try_to_remove_file("completed*.log", expected_number=nb_of_commands) + + root = os.path.join(root_dir, "logs") + for file_path in reversed(sorted(os.listdir(root))): + if file_path.endswith(".err") or file_path.endswith(".out"): + print file_path + print open(os.path.join(root, file_path), 'r').read() + if self.debug: + pdb.set_trace() + + def get_arguments(self, **kwargs): + + set_defaults( + kwargs, + coresPerCommand=1, + gpusPerCommand=0, + walltime=self.WALLTIME, + coresPerNode=self.CORES_PER_NODE, + gpusPerNode=self.GPUS_PER_NODE) + + return kwargs + + def base_verification(self, sleep_time=0, command_arguments="", + resume=False, squeue_wait=None, nb_of_commands=1, + **kwargs): + + if squeue_wait is None and self.debug: + squeue_wait = sleep_time + 5 + elif squeue_wait is None: + squeue_wait = self.WALLTIME * 2 + + arguments = self.get_arguments(**kwargs) + argv = build_argv(**arguments) + + root_dir = self.run_test(argv, command_string % dict(sleep=sleep_time), + command_arguments=command_arguments) + self.validate(root_dir, argv, squeue_wait, nb_of_commands, + resume=resume) + + def verify_simple_task(self, **kwargs): + self.base_verification(**kwargs) + + def verify_simple_task_with_one_gpu(self, **kwargs): + set_defaults( + kwargs, + gpusPerCommand=1, + gpusPerNode=1) + + self.verify_simple_task(**kwargs) + + @minimum_requirement("GPUS_PER_NODE", 2) + def verify_simple_task_with_many_gpus(self, **kwargs): + + for gpus_per_command in xrange(2, self.GPUS_PER_NODE + 1): + arguments = kwargs.copy() + arguments["gpusPerCommand"] = gpus_per_command + + self.verify_simple_task(**arguments) + + @minimum_requirement("CORES_PER_NODE", 2) + def verify_many_task(self, **kwargs): + set_defaults( + kwargs, + nb_of_commands=self.CORES_PER_NODE) + + command_arguments = ( + "[%s]" % " ".join(str(i) for i in range(kwargs["nb_of_commands"]))) + + set_defaults( + kwargs, + command_arguments=command_arguments) + + self.verify_simple_task(**kwargs) + + @minimum_requirement("CORES_PER_NODE", 4) + def verify_many_task_with_many_cores(self, **kwargs): + for cores_per_command in xrange(2, self.CORES_PER_NODE): + if cores_per_command // self.CORES_PER_NODE <= 1: + break + + arguments = kwargs.copy() + arguments["cores_per_command"] = cores_per_command + arguments["nb_of_commands"] = ( + cores_per_command // + self.CORES_PER_NODE) + + self.many_task(**arguments) + + @minimum_requirement("GPUS_PER_NODE", 2) + def verify_many_task_with_one_gpu(self, **kwargs): + set_defaults( + kwargs, + nb_of_commands=self.GPUS_PER_NODE, + gpusPerCommand=1) + + self.verify_many_task(**kwargs) + + @minimum_requirement("GPUS_PER_NODE", 4) + def verify_many_task_with_many_gpus(self, **kwargs): + for gpus_per_command in xrange(2, self.GPUS_PER_NODE + 1): + if gpus_per_command // self.GPUS_PER_NODE <= 1: + break + + arguments = kwargs.copy() + arguments["gpusPerCommand"] = gpus_per_command + arguments["nb_of_commands"] = ( + gpus_per_command // + self.GPUS_PER_NODE) + + self.verify_many_task_with_one_gpu(**arguments) + + def verify_simple_task_with_autoresume_unneeded(self, **kwargs): + walltime = 2 * 60 + set_defaults( + kwargs, + walltime=walltime, + resume=False, + autoresume=True) + + self.verify_simple_task(**kwargs) + + def verify_simple_task_with_autoresume_needed(self, **kwargs): + walltime = 2 * 60 + set_defaults( + kwargs, + sleep_time=walltime, + walltime=walltime, + resume=True, + autoresume=True) + + self.verify_simple_task(**kwargs) + + def verify_many_task_with_autoresume_needed(self, **kwargs): + walltime = 2 * 60 + set_defaults( + kwargs, + sleep_time=walltime, + walltime=walltime, + resume=True, + autoresume=True) + + self.verify_many_task(**kwargs) + + # def verify_pool(self, **kwargs): + # pass From 02845e00d599229b9dca729c9d108ae8abc46053 Mon Sep 17 00:00:00 2001 From: Xavier Bouthillier Date: Mon, 16 Oct 2017 09:36:48 -0400 Subject: [PATCH 28/36] Add verification script for cedar --- smartdispatch/tests/verify_cedar.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 smartdispatch/tests/verify_cedar.py diff --git a/smartdispatch/tests/verify_cedar.py b/smartdispatch/tests/verify_cedar.py new file mode 100644 index 0000000..5fc3fba --- /dev/null +++ b/smartdispatch/tests/verify_cedar.py @@ -0,0 +1,29 @@ +import sys + +from verify_slurm_cluster import VerifySlurmCluster, set_defaults + + +class VerifyCedarCluster(VerifySlurmCluster): + + WALLTIME = 60 + CORES_PER_NODE = 24 + GPUS_PER_NODE = 4 + + def get_arguments(self, **kwargs): + + set_defaults( + kwargs, + coresPerCommand=1, + gpusPerCommand=0, + walltime=self.WALLTIME, + coresPerNode=self.CORES_PER_NODE, + gpusPerNode=self.GPUS_PER_NODE) + + return kwargs + + +if __name__ == "__main__": + verifications = filter(lambda o: not o.startswith("--"), sys.argv[1:]) + VerifyCedarCluster(debug="--debug" in sys.argv[1:], + no_fork="--no-fork" in sys.argv[1:]).run_verifications( + filtered_by=verifications) From 2d6e6fd55da5e06940b9ff35128140111164e6aa Mon Sep 17 00:00:00 2001 From: Xavier Bouthillier Date: Mon, 16 Oct 2017 09:36:55 -0400 Subject: [PATCH 29/36] Add verification script for graham --- smartdispatch/tests/verify_graham.py | 30 ++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 smartdispatch/tests/verify_graham.py diff --git a/smartdispatch/tests/verify_graham.py b/smartdispatch/tests/verify_graham.py new file mode 100644 index 0000000..7f79731 --- /dev/null +++ b/smartdispatch/tests/verify_graham.py @@ -0,0 +1,30 @@ +import sys + +from verify_slurm_cluster import VerifySlurmCluster, set_defaults + + +class VerifyGrahamCluster(VerifySlurmCluster): + + WALLTIME = 60 + CORES_PER_NODE = 32 + GPUS_PER_NODE = 2 + + def get_arguments(self, **kwargs): + + set_defaults( + kwargs, + coresPerCommand=1, + gpusPerCommand=0, + walltime=self.WALLTIME, + coresPerNode=self.CORES_PER_NODE, + gpusPerNode=self.GPUS_PER_NODE, + sbatchFlags="--account=rpp-bengioy") + + return kwargs + + +if __name__ == "__main__": + verifications = filter(lambda o: not o.startswith("--"), sys.argv[1:]) + VerifyGrahamCluster(debug="--debug" in sys.argv[1:], + no_fork="--no-fork" in sys.argv[1:]).run_verifications( + filtered_by=verifications) From f967180222ac8788598d6f7b0a7311f05be7e7c4 Mon Sep 17 00:00:00 2001 From: Xavier Bouthillier Date: Mon, 16 Oct 2017 09:37:22 -0400 Subject: [PATCH 30/36] Add verification script for mila --- smartdispatch/tests/verify_mila.py | 38 ++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 smartdispatch/tests/verify_mila.py diff --git a/smartdispatch/tests/verify_mila.py b/smartdispatch/tests/verify_mila.py new file mode 100644 index 0000000..3d14815 --- /dev/null +++ b/smartdispatch/tests/verify_mila.py @@ -0,0 +1,38 @@ +import sys + +from verify_slurm_cluster import VerifySlurmCluster, set_defaults + + +class VerifyMILACluster(VerifySlurmCluster): + + WALLTIME = 60 + CORES_PER_NODE = 8 + GPUS_PER_NODE = 2 + + def get_arguments(self, **kwargs): + + set_defaults( + kwargs, + coresPerCommand=1, + gpusPerCommand=0, + walltime=self.WALLTIME, + coresPerNode=self.CORES_PER_NODE, + gpusPerNode=self.GPUS_PER_NODE) + + return kwargs + + def verify_simple_task_with_constraints(self, **kwargs): + + set_defaults( + kwargs, + gpusPerCommand=1, + sbatchFlags='"-C\"gpu12gb\""') + + self.base_verification(**kwargs) + + +if __name__ == "__main__": + verifications = filter(lambda o: not o.startswith("--"), sys.argv[1:]) + VerifyMILACluster(debug="--debug" in sys.argv[1:], + no_fork="--no-fork" in sys.argv[1:]).run_verifications( + filtered_by=verifications) From 8c655b43eb059f08db757ce63552d10289e8a4b2 Mon Sep 17 00:00:00 2001 From: Xavier Bouthillier Date: Mon, 16 Oct 2017 10:08:34 -0400 Subject: [PATCH 31/36] Make get_launcher return None when no launcher My initial though was that get_launcher should raise an error when no launcher is found on the system since there cannot be any job launcher. I realized that this would break the --doNotLaunch option that users may want to use on system with no launcher, just to create the files. --- smartdispatch/tests/test_utils.py | 16 ++++------------ smartdispatch/utils.py | 2 +- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/smartdispatch/tests/test_utils.py b/smartdispatch/tests/test_utils.py index 39dbac0..d4303a5 100644 --- a/smartdispatch/tests/test_utils.py +++ b/smartdispatch/tests/test_utils.py @@ -198,24 +198,16 @@ def test_get_launcher(self): lambda command: launcher if launcher == command else None) for cluster_name in self.CLUSTER_NAMES: - if cluster_name in utils.MSUB_CLUSTERS: - continue - with self.assertRaises(RuntimeError): - utils.get_launcher(cluster_name) + self._assert_launcher(None, cluster_name) for idx in range(self.N_RANDOM): - with self.assertRaises(RuntimeError): - utils.get_launcher(self._get_random_string()) + self._assert_launcher(None, self._get_random_string()) # Test if command_is_available only returns None mock_distutils.spawn.find_executable.return_value = None for cluster_name in self.CLUSTER_NAMES: - if cluster_name in utils.MSUB_CLUSTERS: - continue - with self.assertRaises(RuntimeError): - utils.get_launcher(cluster_name) + self._assert_launcher(None, cluster_name) for idx in range(self.N_RANDOM): - with self.assertRaises(RuntimeError): - utils.get_launcher(self._get_random_string()) + self._assert_launcher(None, self._get_random_string()) diff --git a/smartdispatch/utils.py b/smartdispatch/utils.py index fc86eb6..f3abb34 100644 --- a/smartdispatch/utils.py +++ b/smartdispatch/utils.py @@ -193,4 +193,4 @@ def get_launcher(cluster_name): if command_is_available(launcher): return launcher - raise RuntimeError("No compatible launcher found on the system") + return None From 998f3ba6efe986f9cd641c2c807266cac28fa13c Mon Sep 17 00:00:00 2001 From: Adrien Ali Taiga Date: Mon, 16 Oct 2017 10:18:21 -0400 Subject: [PATCH 32/36] Updated README --- README.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 5b15432..c0379e8 100644 --- a/README.md +++ b/README.md @@ -77,8 +77,9 @@ Jobs that did not terminate properly, for example, it exceeded the walltime, can ### SLURM clusters -Smartdispatch can also run on SLURM clusters -All features like `--gpusPerNode` or `--coresPerNode` are supported -However you need to pass SLURM specific features like `--qos` or `--output`as sbatch flach with: -`smart-dispatch -q qtest@mp2 launch python my_script.py [1:4] --sbatchFlags="--qos=high -ofile.out` -If you use the full name of a feature you have to separate the feature and its value by a = like `--qos=high` if not you have to append the features and its value ex: `-ofile.out` is valid, not `-o file.out` +Smartdispatch can also run on SLURM clusters. +All features like `--gpusPerNode` or `--coresPerNode` are supported. +However you need to pass SLURM specific features using --sbatchFlags. For simplicity, --sbatchFlags supports short and long option definitions only with the following syntax: +-Cgpu6gb or --constraint=gpu6gb +For comparison, this would be invalid: +-C gpu6gb or --constraint gpu6gb. From a3c08c867b532969a7639115d13903bdfc3007c6 Mon Sep 17 00:00:00 2001 From: Xavier Bouthillier Date: Mon, 16 Oct 2017 13:37:28 -0400 Subject: [PATCH 33/36] Set properly account in verify_graham The tests were failing because the account was not specified. --- smartdispatch/tests/verify_graham.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/smartdispatch/tests/verify_graham.py b/smartdispatch/tests/verify_graham.py index 7f79731..c95e7c0 100644 --- a/smartdispatch/tests/verify_graham.py +++ b/smartdispatch/tests/verify_graham.py @@ -11,14 +11,17 @@ class VerifyGrahamCluster(VerifySlurmCluster): def get_arguments(self, **kwargs): - set_defaults( - kwargs, - coresPerCommand=1, - gpusPerCommand=0, - walltime=self.WALLTIME, - coresPerNode=self.CORES_PER_NODE, - gpusPerNode=self.GPUS_PER_NODE, - sbatchFlags="--account=rpp-bengioy") + kwargs = super(VerifyGrahamCluster, self).get_arguments(**kwargs) + + if kwargs["gpusPerCommand"] == 0: + account = os.environ.get("CPU_SLURM_ACCOUNT") + else: + account = os.environ.get("GPU_SLURM_ACCOUNT") + + if "sbatchFlags" not in kwargs or len(kwargs["sbatchFlags"]) == 0: + kwargs["sbatchFlags"] = "--account=" + account + else: + kwargs["sbatchFlags"] += " --account=" + account return kwargs From 9fb5ab66a7c4ff6c3df067b92ff00d4f8b27fcac Mon Sep 17 00:00:00 2001 From: Xavier Bouthillier Date: Mon, 16 Oct 2017 13:41:48 -0400 Subject: [PATCH 34/36] Set properly account in verify_cedar The tests were failing because the account was not specified --- smartdispatch/tests/verify_cedar.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/smartdispatch/tests/verify_cedar.py b/smartdispatch/tests/verify_cedar.py index 5fc3fba..94e4f7f 100644 --- a/smartdispatch/tests/verify_cedar.py +++ b/smartdispatch/tests/verify_cedar.py @@ -1,3 +1,4 @@ +import os import sys from verify_slurm_cluster import VerifySlurmCluster, set_defaults @@ -11,13 +12,17 @@ class VerifyCedarCluster(VerifySlurmCluster): def get_arguments(self, **kwargs): - set_defaults( - kwargs, - coresPerCommand=1, - gpusPerCommand=0, - walltime=self.WALLTIME, - coresPerNode=self.CORES_PER_NODE, - gpusPerNode=self.GPUS_PER_NODE) + kwargs = super(VerifyCedarCluster, self).get_arguments(**kwargs) + + if kwargs["gpusPerCommand"] == 0: + account = os.environ.get("CPU_SLURM_ACCOUNT") + else: + account = os.environ.get("GPU_SLURM_ACCOUNT") + + if "sbatchFlags" not in kwargs or len(kwargs["sbatchFlags"]) == 0: + kwargs["sbatchFlags"] = "--account=" + account + else: + kwargs["sbatchFlags"] += " --account=" + account return kwargs From 1dea0d8288d898f8c8997379f2fa3606d1457aad Mon Sep 17 00:00:00 2001 From: Xavier Bouthillier Date: Tue, 17 Oct 2017 16:21:00 -0400 Subject: [PATCH 35/36] Fix walltime_to_seconds convertion There was a missing parentheses which was causing a bad conversion of "DD:HH:MM:SS" to seconds. The unit-test was also missing the same parentheses. I added a unit-test to make sure such error could not occur again. --- smartdispatch/tests/test_utils.py | 17 ++++++++++++++++- smartdispatch/utils.py | 2 +- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/smartdispatch/tests/test_utils.py b/smartdispatch/tests/test_utils.py index d4303a5..9306faa 100644 --- a/smartdispatch/tests/test_utils.py +++ b/smartdispatch/tests/test_utils.py @@ -25,7 +25,22 @@ def setUp(self): seconds=random.randint(0, 59)) def _compute_seconds(self, days=0, hours=0, minutes=0, seconds=0): - return ((((days * 24) + hours) * 60) + minutes * 60) + seconds + return (((((days * 24) + hours) * 60) + minutes) * 60) + seconds + + def test_compute_seconds(self): + + date_format = dict( + days=2, + hours=3, + minutes=5, + seconds=7) + + total_seconds = 183907 + + self.assertEqual(self._compute_seconds(**date_format), total_seconds) + self.assertEqual(utils.walltime_to_seconds( + "{days}:{hours}:{minutes}:{seconds}".format(**date_format)), + total_seconds) def test_ddhhmmss(self): seconds = utils.walltime_to_seconds( diff --git a/smartdispatch/utils.py b/smartdispatch/utils.py index f3abb34..a0598d3 100644 --- a/smartdispatch/utils.py +++ b/smartdispatch/utils.py @@ -30,7 +30,7 @@ def walltime_to_seconds(walltime): days, hours, minutes, seconds = map(int, split) - return ((((days * 24) + hours) * 60) + minutes * 60) + seconds + return (((((days * 24) + hours) * 60) + minutes) * 60) + seconds def jobname_generator(jobname, job_id): From cac2f0891ab904a7f979ac6306100ac5a9e4f3f6 Mon Sep 17 00:00:00 2001 From: Xavier Bouthillier Date: Mon, 16 Oct 2017 13:45:42 -0400 Subject: [PATCH 36/36] Add configuration file for Graham --- smartdispatch/config/graham.json | 46 ++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 smartdispatch/config/graham.json diff --git a/smartdispatch/config/graham.json b/smartdispatch/config/graham.json new file mode 100644 index 0000000..edf86f4 --- /dev/null +++ b/smartdispatch/config/graham.json @@ -0,0 +1,46 @@ +{ + "cpu": { + "ram": 126, + "cores": 32, + "max_walltime": "648:00:00", + "gpus": 0, + "nodes": 800 + }, + "large_mem": { + "ram": 252, + "cores": 32, + "max_walltime": "648:00:00", + "gpus": 0, + "nodes": 56 + }, + "large_mem_500": { + "ram": 504, + "cores": 32, + "max_walltime": "648:00:00", + "gpus": 0, + "nodes": 24 + }, + "large_mem_3000": { + "ram": 3024, + "cores": 64, + "max_walltime": "?", + "gpus": 0, + "nodes": 3 + }, + "gpu_1": { + "ram": 63, + "modules": ["cuda/8.0.44"], + "cores": 16, + "max_walltime": "648:00:00", + "gpus": 1, + "nodes": 320 + }, + "gpu_2": { + "ram": 126, + "modules": ["cuda/8.0.44"], + "cores": 32, + "max_walltime": "648:00:00", + "gpus": 2, + "nodes": 160 + } +}