Skip to content

Commit

Permalink
Merge branch 'refs/heads/queue_deps_handling' into dashboard_status
Browse files Browse the repository at this point in the history
  • Loading branch information
akremin committed Aug 30, 2024
2 parents 10c8f48 + 75a2b18 commit d411fa1
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 38 deletions.
3 changes: 3 additions & 0 deletions bin/desi_proc_night
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ def parse_args():
help="If set then the code will NOT raise an error "
+ "if asked to link psfnight calibrations "
+ "without fiberflatnight calibrations.")
parser.add_argument("--no-resub-failed", action="store_true", required=False,
help="Give this flag if you do NOT want to resubmit " +
"jobs with Slurm status 'FAILED' by default.")
parser.add_argument("--still-acquiring", action='store_true',
help="for testing --daily mode, assume more data is still coming even if "
+ "outside of normal observing hours.")
Expand Down
27 changes: 16 additions & 11 deletions bin/desi_resubmit_queue_failures
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,28 @@ def parse_args(): # options=None):
help="The SLURM queue states that should be resubmitted. " +
"E.g. UNSUBMITTED, BOOT_FAIL, DEADLINE, NODE_FAIL, " +
"OUT_OF_MEMORY, PREEMPTED, TIMEOUT, CANCELLED, FAILED.")
parser.add_argument("--dont-resub-failed", action="store_true", required=False,
parser.add_argument("--no-resub-failed", action="store_true", required=False,
help="Give this flag if you do NOT want to resubmit " +
"jobs with Slurm status 'FAILED' by default.")
"jobs with Slurm status 'FAILED' by default. " +
"This should not be used if defining " +
"--resub-states explicitly.")

args = parser.parse_args()

if args.resub_states is not None:
## User should never provide custom list of states and request to remove FAILED
if args.no_resub_failed:
log = get_logger()
msg = f"Provided user-defined resubmision states {args.resub_states} but " \
+ f"also requested args.no_resub_failed. Please choose one or the other."
log.critical(msg)
raise ValueError(msg)
## Clean up the input string into a list of strings
args.resub_states = [state.strip().upper() for state in args.resub_states.split(',')]

return args


if __name__ == '__main__':
args = parse_args()
log = get_logger()
Expand All @@ -62,14 +74,6 @@ if __name__ == '__main__':
if not os.path.exists(ptable_pathname):
ValueError(f"Processing table: {ptable_pathname} doesn't exist.")

resub_states = args.resub_states
if resub_states is None:
resub_states = get_resubmission_states()
if not args.dont_resub_failed:
resub_states.append('FAILED')

log.info(f"Resubmitting the following Slurm states: {resub_states}")

if args.dry_run > 0 and args.dry_run < 3:
log.warning(f"{args.dry_run=} will be run with limited simulation "
f"because we don't want to write out incorrect queue information.")
Expand All @@ -81,7 +85,8 @@ if __name__ == '__main__':
ptable = load_table(tablename=ptable_pathname, tabletype=table_type)
log.info(f"Identified ptable with {len(ptable)} entries.")
ptable, nsubmits = update_and_recursively_submit(ptable, submits=0,
resubmission_states=resub_states,
resubmission_states=args.resub_states,
no_resub_failed=args.no_resub_failed,
ptab_name=ptable_pathname, dry_run=args.dry_run,
reservation=args.reservation)

Expand Down
9 changes: 7 additions & 2 deletions py/desispec/scripts/proc_night.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
set_calibrator_flag, make_exposure_prow, \
all_calibs_submitted, \
update_and_recursively_submit, update_accounted_for_with_linking
from desispec.workflow.queue import update_from_queue, any_jobs_failed
from desispec.workflow.queue import update_from_queue, any_jobs_failed, \
get_resubmission_states
from desispec.io.util import decode_camword, difference_camwords, \
create_camword, replace_prefix, erow_to_goodcamword, camword_union

Expand All @@ -55,7 +56,7 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
path_to_data=None, exp_obstypes=None, camword=None,
badcamword=None, badamps=None, exps_to_ignore=None,
sub_wait_time=0.1, verbose=False, dont_require_cals=False,
psf_linking_without_fflat=False,
psf_linking_without_fflat=False, no_resub_failed=False,
still_acquiring=False):
"""
Process some or all exposures on a night. Can be used to process an entire
Expand Down Expand Up @@ -163,6 +164,8 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
psf_linking_without_fflat: bool. Default False. If set then the code
will NOT raise an error if asked to link psfnight calibrations
without fiberflatnight calibrations.
no_resub_failed: bool. Set to True if you do NOT want to resubmit
jobs with Slurm status 'FAILED' by default. Default is False.
still_acquiring: bool. If True, assume more data might be coming, e.g.
wait for additional exposures of latest tile. If False, auto-derive
True/False based upon night and current time. Primarily for testing.
Expand Down Expand Up @@ -366,7 +369,9 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
if np.max([len(qids) for qids in ptable['ALL_QIDS']]) < 3:
log.info("Job failures were detected. Resubmitting those jobs "
+ "before continuing with new submissions.")

ptable, nsubmits = update_and_recursively_submit(ptable,
no_resub_failed=no_resub_failed,
ptab_name=proc_table_pathname,
dry_run=dry_run,
reservation=reservation)
Expand Down
38 changes: 30 additions & 8 deletions py/desispec/workflow/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
get_ztile_script_suffix
from desispec.workflow.exptable import read_minimal_science_exptab_cols
from desispec.workflow.queue import get_resubmission_states, update_from_queue, \
queue_info_from_qids, get_queue_states_from_qids, update_queue_state_cache
queue_info_from_qids, get_queue_states_from_qids, update_queue_state_cache, \
get_non_final_states
from desispec.workflow.timing import what_night_is_it
from desispec.workflow.desi_proc_funcs import get_desi_proc_batch_file_pathname, \
create_desi_proc_batch_script, \
Expand Down Expand Up @@ -632,12 +633,23 @@ def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=F
## should no longer be necessary in the normal workflow.
# workaround for sbatch --dependency bug not tracking jobs correctly
# see NERSC TICKET INC0203024
failed_dependency = False
if len(dep_qids) > 0 and not dry_run:
non_final_states = get_non_final_states()
state_dict = get_queue_states_from_qids(dep_qids, dry_run=dry_run, use_cache=True)
still_depids = []
for depid in dep_qids:
if depid in state_dict.keys() and state_dict[int(depid)] == 'COMPLETED':
log.info(f"removing completed jobid {depid}")
if depid in state_dict.keys():
if state_dict[int(depid)] == 'COMPLETED':
log.info(f"removing completed jobid {depid}")
elif state_dict[int(depid)] not in non_final_states:
failed_dependency = True
log.info("Found a dependency in a bad final state="
+ f"{state_dict[int(depid)]} for depjobid={depid},"
+ " not submitting this job.")
still_depids.append(depid)
else:
still_depids.append(depid)
else:
still_depids.append(depid)
dep_qids = np.array(still_depids)
Expand Down Expand Up @@ -691,9 +703,12 @@ def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=F

batch_params.append(f'{script_path}')
submitted = True
## If dry_run give it a fake QID
## if a dependency has failed don't even try to submit the job because
## Slurm will refuse, instead just mark as unsubmitted.
if dry_run:
current_qid = _get_fake_qid()
else:
elif not failed_dependency:
#- sbatch sometimes fails; try several times before giving up
max_attempts = 3
for attempt in range(max_attempts):
Expand All @@ -714,14 +729,16 @@ def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=F
log.error(msg)
current_qid = get_default_qid()
submitted = False

log.info(batch_params)
else:
current_qid = get_default_qid()
submitted = False

## Update prow with new information
prow['LATEST_QID'] = current_qid

## If we didn't submit, don't say we did and don't add to ALL_QIDS
if submitted:
log.info(batch_params)
log.info(f'Submitted {jobname} with dependencies {dep_str} and '
+ f'reservation={reservation}. Returned qid: {current_qid}')

Expand All @@ -730,6 +747,7 @@ def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=F
prow['STATUS'] = 'SUBMITTED'
prow['SUBMIT_DATE'] = int(time.time())
else:
log.info(f"Would have submitted: {batch_params}")
prow['STATUS'] = 'UNSUBMITTED'

## Update the Slurm jobid cache of job states
Expand Down Expand Up @@ -1137,7 +1155,8 @@ def all_calibs_submitted(accounted_for, do_cte_flats):
return np.all(list(test_dict.values()))

def update_and_recursively_submit(proc_table, submits=0, resubmission_states=None,
ptab_name=None, dry_run=0, reservation=None):
no_resub_failed=False, ptab_name=None,
dry_run=0, reservation=None):
"""
Given an processing table, this loops over job rows and resubmits failed jobs (as defined by resubmission_states).
Before submitting a job, it checks the dependencies for failures. If a dependency needs to be resubmitted, it recursively
Expand All @@ -1150,6 +1169,8 @@ def update_and_recursively_submit(proc_table, submits=0, resubmission_states=Non
resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
possible Slurm scheduler state, where you wish for jobs with that
outcome to be resubmitted
no_resub_failed: bool. Set to True if you do NOT want to resubmit
jobs with Slurm status 'FAILED' by default. Default is False.
ptab_name, str, the full pathname where the processing table should be saved.
dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
Expand All @@ -1169,7 +1190,8 @@ def update_and_recursively_submit(proc_table, submits=0, resubmission_states=Non
"""
log = get_logger()
if resubmission_states is None:
resubmission_states = get_resubmission_states()
resubmission_states = get_resubmission_states(no_resub_failed=no_resub_failed)

log.info(f"Resubmitting jobs with current states in the following: {resubmission_states}")
proc_table = update_from_queue(proc_table, dry_run=dry_run)
log.info("Updated processing table queue information:")
Expand Down
87 changes: 70 additions & 17 deletions py/desispec/workflow/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
global _cached_slurm_states
_cached_slurm_states = dict()

def get_resubmission_states():
def get_resubmission_states(no_resub_failed=False):
"""
Defines what Slurm job failure modes should be resubmitted in the hopes of the job succeeding the next time.
Expand All @@ -39,14 +39,21 @@ def get_resubmission_states():
S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.
TO TIMEOUT Job terminated upon reaching its time limit.
Args:
no_resub_failed: bool. Set to True if you do NOT want to resubmit
jobs with Slurm status 'FAILED' by default. Default is False.
Returns:
list. A list of strings outlining the job states that should be resubmitted.
"""
## 'UNSUBMITTED' is default pipeline state for things not yet submitted
## 'DEP_NOT_SUBD' is set when resubmission can't proceed because a
## dependency has failed
return ['UNSUBMITTED', 'DEP_NOT_SUBD', 'BOOT_FAIL', 'DEADLINE', 'NODE_FAIL',
'OUT_OF_MEMORY', 'PREEMPTED', 'TIMEOUT', 'CANCELLED']
resub_states = ['UNSUBMITTED', 'DEP_NOT_SUBD', 'BOOT_FAIL', 'DEADLINE', 'NODE_FAIL',
'OUT_OF_MEMORY', 'PREEMPTED', 'TIMEOUT', 'CANCELLED']
if not no_resub_failed:
resub_states.append('FAILED')
return resub_states


def get_termination_states():
Expand Down Expand Up @@ -115,6 +122,35 @@ def get_failed_states():
'OUT_OF_MEMORY', 'PREEMPTED', 'REVOKED', 'SUSPENDED', 'TIMEOUT']


def get_non_final_states():
"""
Defines what Slurm job states that are not final and therefore indicate the
job hasn't finished running.
Possible values that Slurm returns are:
CA or ca or CANCELLED for cancelled jobs will only show currently running jobs in queue unless times are explicitly given
BF BOOT_FAIL Job terminated due to launch failure
CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated.
CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero.
DL DEADLINE Job terminated on deadline.
F FAILED Job terminated with non-zero exit code or other failure condition.
NF NODE_FAIL Job terminated due to failure of one or more allocated nodes.
OOM OUT_OF_MEMORY Job experienced out of memory error.
PD PENDING Job is awaiting resource allocation.
PR PREEMPTED Job terminated due to preemption.
R RUNNING Job currently has an allocation.
RQ REQUEUED Job was requeued.
RS RESIZING Job is about to change size.
RV REVOKED Sibling was removed from cluster due to other cluster starting the job.
S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs.
TO TIMEOUT Job terminated upon reaching its time limit.
Returns:
list. A list of strings outlining the job states that are considered final (without human investigation/intervention)
"""
return ['PENDING', 'RUNNING', 'REQUEUED', 'RESIZING']

def queue_info_from_time_window(start_time=None, end_time=None, user=None, \
columns='jobid,jobname,partition,submit,eligible,'+
'start,end,elapsed,state,exitcode',
Expand Down Expand Up @@ -232,6 +268,8 @@ def queue_info_from_qids(qids, columns='jobid,jobname,partition,submit,'+
results.append(queue_info_from_qids(qids[i:i+nmax], columns=columns, dry_run=dry_run))
results = vstack(results)
return results
elif len(qids) == 0:
return Table(names=columns.upper().split(','))

## Turn the queue id's into a list
## this should work with str or int type also, though not officially supported
Expand Down Expand Up @@ -377,7 +415,8 @@ def clear_queue_state_cache():
_cached_slurm_states.clear()


def update_from_queue(ptable, qtable=None, dry_run=0, ignore_scriptnames=False):
def update_from_queue(ptable, qtable=None, dry_run=0, ignore_scriptnames=False,
check_complete_jobs=False):
"""
Given an input prcessing table (ptable) and query table from the Slurm queue (qtable) it cross matches the
Slurm job ID's and updates the 'state' in the table using the current state in the Slurm scheduler system.
Expand All @@ -390,50 +429,64 @@ def update_from_queue(ptable, qtable=None, dry_run=0, ignore_scriptnames=False):
ignore_scriptnames, bool. Default is False. Set to true if you do not
want to check whether the scriptname matches the jobname
return by the slurm scheduler.
check_complete_jobs, bool. Default is False. Set to true if you want to
also check QID's that currently have a STATUS "COMPLETED".
in the ptable.
The following are only used if qtable is not provided:
dry_run, int. Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default
table that doesn't query the Slurm scheduler.
Returns:
ptable, Table. The same processing table as the input except that the "STATUS" column in ptable for all jobs is
ptab, Table. A opy of the same processing table as the input except that the "STATUS" column in ptable for all jobs is
updated based on the 'STATE' in the qtable (as matched by "LATEST_QID" in the ptable
and "JOBID" in the qtable).
"""
log = get_logger()
ptab = ptable.copy()
if qtable is None:
log.info("qtable not provided, querying Slurm using ptable's LATEST_QID set")
qids = np.array(ptable['LATEST_QID'])
## Avoid null valued QID's (set to -99)
qids = qids[qids > 0]
log.info("qtable not provided, querying Slurm using ptab's LATEST_QID set")
## Avoid null valued QID's (set to 2)
sel = ptab['LATEST_QID'] > 2
## Only submit incomplete jobs unless explicitly told to check them
## completed jobs shouldn't change status
if not check_complete_jobs:
sel &= (ptab['STATUS'] != 'COMPLETED')
log.info(f"Querying Slurm for {np.sum(sel)} QIDs from table of length {len(ptab)}.")
qids = np.array(ptab['LATEST_QID'][sel])
## If you provide empty jobids Slurm gives you the three most recent jobs,
## which we don't want here
if len(qids) == 0:
log.info(f"No QIDs left to query. Returning the original table.")
return ptab
qtable = queue_info_from_qids(qids, dry_run=dry_run)

log.info(f"Slurm returned information on {len(qtable)} jobs out of "
+f"{len(ptable)} jobs in the ptable. Updating those now.")
+f"{len(ptab)} jobs in the ptab. Updating those now.")

check_scriptname = ('JOBNAME' in qtable.colnames
and 'SCRIPTNAME' in ptable.colnames
and 'SCRIPTNAME' in ptab.colnames
and not ignore_scriptnames)
if check_scriptname:
log.info("Will be verifying that the file names are consistent")

for row in qtable:
if int(row['JOBID']) == get_default_qid():
continue
match = (int(row['JOBID']) == ptable['LATEST_QID'])
match = (int(row['JOBID']) == ptab['LATEST_QID'])
if np.any(match):
ind = np.where(match)[0][0]
if check_scriptname and ptable['SCRIPTNAME'][ind] not in row['JOBNAME']:
log.warning(f"For job with expids:{ptable['EXPID'][ind]}"
+ f" the scriptname is {ptable['SCRIPTNAME'][ind]}"
if check_scriptname and ptab['SCRIPTNAME'][ind] not in row['JOBNAME']:
log.warning(f"For job with expids:{ptab['EXPID'][ind]}"
+ f" the scriptname is {ptab['SCRIPTNAME'][ind]}"
+ f" but the jobname in the queue was "
+ f"{row['JOBNAME']}.")
state = str(row['STATE']).split(' ')[0]
## Since dry run 1 and 2 save proc tables, don't alter the
## states for these when simulating
if dry_run > 2 or dry_run < 1:
ptable['STATUS'][ind] = state
ptab['STATUS'][ind] = state

return ptable
return ptab

def any_jobs_not_complete(statuses, termination_states=None):
"""
Expand Down

0 comments on commit d411fa1

Please sign in to comment.