Skip to content

Commit

Permalink
🔧 add SEFF job to print slurm metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
juanesarango committed Oct 14, 2024
1 parent e7d47bd commit d22a4bd
Showing 1 changed file with 18 additions and 17 deletions.
35 changes: 18 additions & 17 deletions isabl_cli/batch_systems/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,15 @@ def submit_slurm(app, command_tuples): # pragma: no cover


def submit_slurm_array(
commands, requirements, jobname, extra_args=None, throttle_by=50, wait=False
commands, requirements, jobname, extra_args=None, throttle_by=50, wait=False, unbuffer=True,
): # pragma: no cover
"""
Submit an array of bash scripts.
Two other jobs will also be submitted:
Three other jobs will also be submitted:
EXIT: run exit command if failure.
SEFF: run slurm utility to printout job metrics.
CLEAN: clean temporary files and directories after completion.
Arguments:
Expand All @@ -92,6 +93,7 @@ def submit_slurm_array(
extra_args (str): extra SLURM args.
throttle_by (int): max number of jobs running at same time.
wait (bool): if true, wait until clean command finishes.
unbuffer (bool): if true, will unbuffer the stdout/stderr.
Returns:
str: jobid of clean up job.
Expand All @@ -106,7 +108,8 @@ def submit_slurm_array(
datetime.now(system_settings.TIME_ZONE).isoformat(),
)

wait = "-W" if wait else ""
wait_flag = "-W" if wait else ""
unbuffer = "unbuffer" if unbuffer else ""
os.makedirs(root, exist_ok=True)
jobname += "-rundir: {}".format(root)
jobname = slugify(jobname)
Expand All @@ -118,24 +121,24 @@ def submit_slurm_array(
index += 1
rundir = abspath(dirname(command))

with open(join(root, "in.%s" % index), "w") as f:
with open(join(root, f"in.{index}"), "w") as f:
# submit a dependency job on failure
# important when the scheduler kills the head job
dependency = "${SLURM_ARRAY_JOB_ID}_${SLURM_ARRAY_TASK_ID}"
after_not_ok_job = (
f"sbatch {extra_args} --depend=afternotok:{dependency} --kill-on-invalid-dep yes "
f'--export=TMP,TMPDIR,TMP_DIR -o {join(rundir, "head_job.exit")} -J "EXIT: {dependency}" '
f"sbatch {extra_args} --depend=afternotok:{dependency} --kill-on-invalid-dep=yes "
f'--export=ALL -o {join(rundir, "head_job.exit")} -J "EXIT: {jobname}" '
f"<< EOF\n#!/bin/bash\n{exit_command}\nEOF\n"
)

# use random sleep to avoid parallel API hits
f.write(
f"#!/bin/bash\n\n"
f"sleep {random.uniform(0, 10):.3} && "
f"({after_not_ok_job}) && bash {command}"
f"sleep {random.uniform(0, 10):.3f} && "
f"({after_not_ok_job}) && {unbuffer} bash {command}"
)

for j in "log", "err", "exit", "slurm":
for j in {"log", "err", "exit", "slurm"}:
src = join(rundir, f"head_job.{j}")
dst = join(root, f"{j}.{index}")
open(src, "w").close()
Expand All @@ -144,9 +147,6 @@ def submit_slurm_array(
with open(join(root, "in.sh"), "w") as f:
f.write(f"#!/bin/bash\nbash {root}/in.$SLURM_ARRAY_TASK_ID")

with open(join(root, "clean.sh"), "w") as f:
f.write(f"#!/bin/bash\nrm -rf {root}")

# Main job array
cmd = (
f"sbatch {requirements} {extra_args} --array 1-{total}%{throttle_by} "
Expand All @@ -159,19 +159,20 @@ def submit_slurm_array(
seff_jobids = []
for i in range(1, total + 1):
seff_cmd = (
f"sbatch {extra_args} --kill-on-invalid-dep=yes "
f"--dependency=afterany:{jobid}_{i} -o '{root}/slurm.{i}' -J 'SEFF: {jobname}' "
f"--wrap='seff {jobid}_{i}'"
f"sbatch {extra_args} -o /dev/null -e /dev/null "
f"--dependency=afterany:{jobid}_{i} -J 'SEFF: {jobname}' "
f"--wrap='seff {jobid}_{i} >> {root}/slurm.{i}'"
)
seff_jobid = subprocess.check_output(seff_cmd, shell=True).decode("utf-8").strip()
seff_jobids.append(seff_jobid.split()[-1])

# Job to clean job array rundir
with open(join(root, "clean.sh"), "w") as f:
f.write(f"#!/bin/bash\nrm -rf {root}")

cmd = (
f"sbatch {extra_args} -J 'CLEAN: {jobname}' {wait} --kill-on-invalid-dep yes "
f"-o /dev/null -e /dev/null --depend=afterany:{':'.join(seff_jobids)} --parsable {root}/clean.sh"
f"sbatch {extra_args} -J 'CLEAN: {jobname}' {wait_flag} -o /dev/null "
f"-e /dev/null --dependency=afterany:{':'.join(seff_jobids)} {root}/clean.sh"
)

return subprocess.check_output(cmd, shell=True).decode("utf-8").strip()

0 comments on commit d22a4bd

Please sign in to comment.