From d22a4bdcfd524d3d389771a512e52726e931e594 Mon Sep 17 00:00:00 2001 From: "Juan E. Arango" Date: Mon, 14 Oct 2024 13:37:51 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A7=20add=20SEFF=20job=20to=20print=20?= =?UTF-8?q?slurm=20metrics?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- isabl_cli/batch_systems/slurm.py | 35 ++++++++++++++++---------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/isabl_cli/batch_systems/slurm.py b/isabl_cli/batch_systems/slurm.py index d0e1615..044df06 100644 --- a/isabl_cli/batch_systems/slurm.py +++ b/isabl_cli/batch_systems/slurm.py @@ -75,14 +75,15 @@ def submit_slurm(app, command_tuples): # pragma: no cover def submit_slurm_array( - commands, requirements, jobname, extra_args=None, throttle_by=50, wait=False + commands, requirements, jobname, extra_args=None, throttle_by=50, wait=False, unbuffer=True, ): # pragma: no cover """ Submit an array of bash scripts. - Two other jobs will also be submitted: + Three other jobs will also be submitted: EXIT: run exit command if failure. + SEFF: run slurm utility to printout job metrics. CLEAN: clean temporary files and directories after completion. Arguments: @@ -92,6 +93,7 @@ def submit_slurm_array( extra_args (str): extra SLURM args. throttle_by (int): max number of jobs running at same time. wait (bool): if true, wait until clean command finishes. + unbuffer (bool): if true, will unbuffer the stdout/stderr. Returns: str: jobid of clean up job. @@ -106,7 +108,8 @@ def submit_slurm_array( datetime.now(system_settings.TIME_ZONE).isoformat(), ) - wait = "-W" if wait else "" + wait_flag = "-W" if wait else "" + unbuffer = "unbuffer" if unbuffer else "" os.makedirs(root, exist_ok=True) jobname += "-rundir: {}".format(root) jobname = slugify(jobname) @@ -118,24 +121,24 @@ def submit_slurm_array( index += 1 rundir = abspath(dirname(command)) - with open(join(root, "in.%s" % index), "w") as f: + with open(join(root, f"in.{index}"), "w") as f: # submit a dependency job on failure # important when the scheduler kills the head job dependency = "${SLURM_ARRAY_JOB_ID}_${SLURM_ARRAY_TASK_ID}" after_not_ok_job = ( - f"sbatch {extra_args} --depend=afternotok:{dependency} --kill-on-invalid-dep yes " - f'--export=TMP,TMPDIR,TMP_DIR -o {join(rundir, "head_job.exit")} -J "EXIT: {dependency}" ' + f"sbatch {extra_args} --depend=afternotok:{dependency} --kill-on-invalid-dep=yes " + f'--export=ALL -o {join(rundir, "head_job.exit")} -J "EXIT: {jobname}" ' f"<< EOF\n#!/bin/bash\n{exit_command}\nEOF\n" ) # use random sleep to avoid parallel API hits f.write( f"#!/bin/bash\n\n" - f"sleep {random.uniform(0, 10):.3} && " - f"({after_not_ok_job}) && bash {command}" + f"sleep {random.uniform(0, 10):.3f} && " + f"({after_not_ok_job}) && {unbuffer} bash {command}" ) - for j in "log", "err", "exit", "slurm": + for j in {"log", "err", "exit", "slurm"}: src = join(rundir, f"head_job.{j}") dst = join(root, f"{j}.{index}") open(src, "w").close() @@ -144,9 +147,6 @@ def submit_slurm_array( with open(join(root, "in.sh"), "w") as f: f.write(f"#!/bin/bash\nbash {root}/in.$SLURM_ARRAY_TASK_ID") - with open(join(root, "clean.sh"), "w") as f: - f.write(f"#!/bin/bash\nrm -rf {root}") - # Main job array cmd = ( f"sbatch {requirements} {extra_args} --array 1-{total}%{throttle_by} " @@ -159,9 +159,9 @@ def submit_slurm_array( seff_jobids = [] for i in range(1, total + 1): seff_cmd = ( - f"sbatch {extra_args} --kill-on-invalid-dep=yes " - f"--dependency=afterany:{jobid}_{i} -o '{root}/slurm.{i}' -J 'SEFF: {jobname}' " - f"--wrap='seff {jobid}_{i}'" + f"sbatch {extra_args} -o /dev/null -e /dev/null " + f"--dependency=afterany:{jobid}_{i} -J 'SEFF: {jobname}' " + f"--wrap='seff {jobid}_{i} >> {root}/slurm.{i}'" ) seff_jobid = subprocess.check_output(seff_cmd, shell=True).decode("utf-8").strip() seff_jobids.append(seff_jobid.split()[-1]) @@ -169,9 +169,10 @@ def submit_slurm_array( # Job to clean job array rundir with open(join(root, "clean.sh"), "w") as f: f.write(f"#!/bin/bash\nrm -rf {root}") + cmd = ( - f"sbatch {extra_args} -J 'CLEAN: {jobname}' {wait} --kill-on-invalid-dep yes " - f"-o /dev/null -e /dev/null --depend=afterany:{':'.join(seff_jobids)} --parsable {root}/clean.sh" + f"sbatch {extra_args} -J 'CLEAN: {jobname}' {wait_flag} -o /dev/null " + f"-e /dev/null --dependency=afterany:{':'.join(seff_jobids)} {root}/clean.sh" ) return subprocess.check_output(cmd, shell=True).decode("utf-8").strip()