diff --git a/bin/desi_proc_night b/bin/desi_proc_night index a6d01cdb3..145ebecf4 100755 --- a/bin/desi_proc_night +++ b/bin/desi_proc_night @@ -102,6 +102,9 @@ def parse_args(): help="If set then the code will NOT raise an error " + "if asked to link psfnight calibrations " + "without fiberflatnight calibrations.") + parser.add_argument("--no-resub-failed", action="store_true", required=False, + help="Give this flag if you do NOT want to resubmit " + + "jobs with Slurm status 'FAILED' by default.") parser.add_argument("--still-acquiring", action='store_true', help="for testing --daily mode, assume more data is still coming even if " + "outside of normal observing hours.") diff --git a/bin/desi_resubmit_queue_failures b/bin/desi_resubmit_queue_failures index 92a2b7f47..247cd995c 100755 --- a/bin/desi_resubmit_queue_failures +++ b/bin/desi_resubmit_queue_failures @@ -38,16 +38,28 @@ def parse_args(): # options=None): help="The SLURM queue states that should be resubmitted. " + "E.g. UNSUBMITTED, BOOT_FAIL, DEADLINE, NODE_FAIL, " + "OUT_OF_MEMORY, PREEMPTED, TIMEOUT, CANCELLED, FAILED.") - parser.add_argument("--dont-resub-failed", action="store_true", required=False, + parser.add_argument("--no-resub-failed", action="store_true", required=False, help="Give this flag if you do NOT want to resubmit " + - "jobs with Slurm status 'FAILED' by default.") + "jobs with Slurm status 'FAILED' by default. " + + "This should not be used if defining " + + "--resub-states explicitly.") args = parser.parse_args() if args.resub_states is not None: + ## User should never provide custom list of states and request to remove FAILED + if args.no_resub_failed: + log = get_logger() + msg = f"Provided user-defined resubmision states {args.resub_states} but " \ + + f"also requested args.no_resub_failed. Please choose one or the other." + log.critical(msg) + raise ValueError(msg) + ## Clean up the input string into a list of strings args.resub_states = [state.strip().upper() for state in args.resub_states.split(',')] + return args + if __name__ == '__main__': args = parse_args() log = get_logger() @@ -62,14 +74,6 @@ if __name__ == '__main__': if not os.path.exists(ptable_pathname): ValueError(f"Processing table: {ptable_pathname} doesn't exist.") - resub_states = args.resub_states - if resub_states is None: - resub_states = get_resubmission_states() - if not args.dont_resub_failed: - resub_states.append('FAILED') - - log.info(f"Resubmitting the following Slurm states: {resub_states}") - if args.dry_run > 0 and args.dry_run < 3: log.warning(f"{args.dry_run=} will be run with limited simulation " f"because we don't want to write out incorrect queue information.") @@ -81,7 +85,8 @@ if __name__ == '__main__': ptable = load_table(tablename=ptable_pathname, tabletype=table_type) log.info(f"Identified ptable with {len(ptable)} entries.") ptable, nsubmits = update_and_recursively_submit(ptable, submits=0, - resubmission_states=resub_states, + resubmission_states=args.resub_states, + no_resub_failed=args.no_resub_failed, ptab_name=ptable_pathname, dry_run=args.dry_run, reservation=args.reservation) diff --git a/py/desispec/scripts/proc_night.py b/py/desispec/scripts/proc_night.py index cc73e56e0..09aefcc38 100644 --- a/py/desispec/scripts/proc_night.py +++ b/py/desispec/scripts/proc_night.py @@ -36,7 +36,8 @@ set_calibrator_flag, make_exposure_prow, \ all_calibs_submitted, \ update_and_recursively_submit, update_accounted_for_with_linking -from desispec.workflow.queue import update_from_queue, any_jobs_failed +from desispec.workflow.queue import update_from_queue, any_jobs_failed, \ + get_resubmission_states from desispec.io.util import decode_camword, difference_camwords, \ create_camword, replace_prefix, erow_to_goodcamword, camword_union @@ -55,7 +56,7 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None, path_to_data=None, exp_obstypes=None, camword=None, badcamword=None, badamps=None, exps_to_ignore=None, sub_wait_time=0.1, verbose=False, dont_require_cals=False, - psf_linking_without_fflat=False, + psf_linking_without_fflat=False, no_resub_failed=False, still_acquiring=False): """ Process some or all exposures on a night. Can be used to process an entire @@ -163,6 +164,8 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None, psf_linking_without_fflat: bool. Default False. If set then the code will NOT raise an error if asked to link psfnight calibrations without fiberflatnight calibrations. + no_resub_failed: bool. Set to True if you do NOT want to resubmit + jobs with Slurm status 'FAILED' by default. Default is False. still_acquiring: bool. If True, assume more data might be coming, e.g. wait for additional exposures of latest tile. If False, auto-derive True/False based upon night and current time. Primarily for testing. @@ -366,7 +369,9 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None, if np.max([len(qids) for qids in ptable['ALL_QIDS']]) < 3: log.info("Job failures were detected. Resubmitting those jobs " + "before continuing with new submissions.") + ptable, nsubmits = update_and_recursively_submit(ptable, + no_resub_failed=no_resub_failed, ptab_name=proc_table_pathname, dry_run=dry_run, reservation=reservation) diff --git a/py/desispec/workflow/processing.py b/py/desispec/workflow/processing.py index 88200cc8c..1df250438 100644 --- a/py/desispec/workflow/processing.py +++ b/py/desispec/workflow/processing.py @@ -20,7 +20,8 @@ get_ztile_script_suffix from desispec.workflow.exptable import read_minimal_science_exptab_cols from desispec.workflow.queue import get_resubmission_states, update_from_queue, \ - queue_info_from_qids, get_queue_states_from_qids, update_queue_state_cache + queue_info_from_qids, get_queue_states_from_qids, update_queue_state_cache, \ + get_non_final_states from desispec.workflow.timing import what_night_is_it from desispec.workflow.desi_proc_funcs import get_desi_proc_batch_file_pathname, \ create_desi_proc_batch_script, \ @@ -632,12 +633,23 @@ def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=F ## should no longer be necessary in the normal workflow. # workaround for sbatch --dependency bug not tracking jobs correctly # see NERSC TICKET INC0203024 + failed_dependency = False if len(dep_qids) > 0 and not dry_run: + non_final_states = get_non_final_states() state_dict = get_queue_states_from_qids(dep_qids, dry_run=dry_run, use_cache=True) still_depids = [] for depid in dep_qids: - if depid in state_dict.keys() and state_dict[int(depid)] == 'COMPLETED': - log.info(f"removing completed jobid {depid}") + if depid in state_dict.keys(): + if state_dict[int(depid)] == 'COMPLETED': + log.info(f"removing completed jobid {depid}") + elif state_dict[int(depid)] not in non_final_states: + failed_dependency = True + log.info("Found a dependency in a bad final state=" + + f"{state_dict[int(depid)]} for depjobid={depid}," + + " not submitting this job.") + still_depids.append(depid) + else: + still_depids.append(depid) else: still_depids.append(depid) dep_qids = np.array(still_depids) @@ -691,9 +703,12 @@ def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=F batch_params.append(f'{script_path}') submitted = True + ## If dry_run give it a fake QID + ## if a dependency has failed don't even try to submit the job because + ## Slurm will refuse, instead just mark as unsubmitted. if dry_run: current_qid = _get_fake_qid() - else: + elif not failed_dependency: #- sbatch sometimes fails; try several times before giving up max_attempts = 3 for attempt in range(max_attempts): @@ -714,14 +729,16 @@ def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=F log.error(msg) current_qid = get_default_qid() submitted = False - - log.info(batch_params) + else: + current_qid = get_default_qid() + submitted = False ## Update prow with new information prow['LATEST_QID'] = current_qid ## If we didn't submit, don't say we did and don't add to ALL_QIDS if submitted: + log.info(batch_params) log.info(f'Submitted {jobname} with dependencies {dep_str} and ' + f'reservation={reservation}. Returned qid: {current_qid}') @@ -730,6 +747,7 @@ def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=F prow['STATUS'] = 'SUBMITTED' prow['SUBMIT_DATE'] = int(time.time()) else: + log.info(f"Would have submitted: {batch_params}") prow['STATUS'] = 'UNSUBMITTED' ## Update the Slurm jobid cache of job states @@ -1137,7 +1155,8 @@ def all_calibs_submitted(accounted_for, do_cte_flats): return np.all(list(test_dict.values())) def update_and_recursively_submit(proc_table, submits=0, resubmission_states=None, - ptab_name=None, dry_run=0, reservation=None): + no_resub_failed=False, ptab_name=None, + dry_run=0, reservation=None): """ Given an processing table, this loops over job rows and resubmits failed jobs (as defined by resubmission_states). Before submitting a job, it checks the dependencies for failures. If a dependency needs to be resubmitted, it recursively @@ -1150,6 +1169,8 @@ def update_and_recursively_submit(proc_table, submits=0, resubmission_states=Non resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a possible Slurm scheduler state, where you wish for jobs with that outcome to be resubmitted + no_resub_failed: bool. Set to True if you do NOT want to resubmit + jobs with Slurm status 'FAILED' by default. Default is False. ptab_name, str, the full pathname where the processing table should be saved. dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If dry_run=2, the scripts will not be writter or submitted. Logging will remain the same @@ -1169,7 +1190,8 @@ def update_and_recursively_submit(proc_table, submits=0, resubmission_states=Non """ log = get_logger() if resubmission_states is None: - resubmission_states = get_resubmission_states() + resubmission_states = get_resubmission_states(no_resub_failed=no_resub_failed) + log.info(f"Resubmitting jobs with current states in the following: {resubmission_states}") proc_table = update_from_queue(proc_table, dry_run=dry_run) log.info("Updated processing table queue information:") diff --git a/py/desispec/workflow/queue.py b/py/desispec/workflow/queue.py index a51796fb0..54718eb1c 100644 --- a/py/desispec/workflow/queue.py +++ b/py/desispec/workflow/queue.py @@ -16,7 +16,7 @@ global _cached_slurm_states _cached_slurm_states = dict() -def get_resubmission_states(): +def get_resubmission_states(no_resub_failed=False): """ Defines what Slurm job failure modes should be resubmitted in the hopes of the job succeeding the next time. @@ -39,14 +39,21 @@ def get_resubmission_states(): S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs. TO TIMEOUT Job terminated upon reaching its time limit. + Args: + no_resub_failed: bool. Set to True if you do NOT want to resubmit + jobs with Slurm status 'FAILED' by default. Default is False. + Returns: list. A list of strings outlining the job states that should be resubmitted. """ ## 'UNSUBMITTED' is default pipeline state for things not yet submitted ## 'DEP_NOT_SUBD' is set when resubmission can't proceed because a ## dependency has failed - return ['UNSUBMITTED', 'DEP_NOT_SUBD', 'BOOT_FAIL', 'DEADLINE', 'NODE_FAIL', - 'OUT_OF_MEMORY', 'PREEMPTED', 'TIMEOUT', 'CANCELLED'] + resub_states = ['UNSUBMITTED', 'DEP_NOT_SUBD', 'BOOT_FAIL', 'DEADLINE', 'NODE_FAIL', + 'OUT_OF_MEMORY', 'PREEMPTED', 'TIMEOUT', 'CANCELLED'] + if not no_resub_failed: + resub_states.append('FAILED') + return resub_states def get_termination_states(): @@ -115,6 +122,35 @@ def get_failed_states(): 'OUT_OF_MEMORY', 'PREEMPTED', 'REVOKED', 'SUSPENDED', 'TIMEOUT'] +def get_non_final_states(): + """ + Defines what Slurm job states that are not final and therefore indicate the + job hasn't finished running. + + Possible values that Slurm returns are: + + CA or ca or CANCELLED for cancelled jobs will only show currently running jobs in queue unless times are explicitly given + BF BOOT_FAIL Job terminated due to launch failure + CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated. + CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero. + DL DEADLINE Job terminated on deadline. + F FAILED Job terminated with non-zero exit code or other failure condition. + NF NODE_FAIL Job terminated due to failure of one or more allocated nodes. + OOM OUT_OF_MEMORY Job experienced out of memory error. + PD PENDING Job is awaiting resource allocation. + PR PREEMPTED Job terminated due to preemption. + R RUNNING Job currently has an allocation. + RQ REQUEUED Job was requeued. + RS RESIZING Job is about to change size. + RV REVOKED Sibling was removed from cluster due to other cluster starting the job. + S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs. + TO TIMEOUT Job terminated upon reaching its time limit. + + Returns: + list. A list of strings outlining the job states that are considered final (without human investigation/intervention) + """ + return ['PENDING', 'RUNNING', 'REQUEUED', 'RESIZING'] + def queue_info_from_time_window(start_time=None, end_time=None, user=None, \ columns='jobid,jobname,partition,submit,eligible,'+ 'start,end,elapsed,state,exitcode', @@ -232,6 +268,8 @@ def queue_info_from_qids(qids, columns='jobid,jobname,partition,submit,'+ results.append(queue_info_from_qids(qids[i:i+nmax], columns=columns, dry_run=dry_run)) results = vstack(results) return results + elif len(qids) == 0: + return Table(names=columns.upper().split(',')) ## Turn the queue id's into a list ## this should work with str or int type also, though not officially supported @@ -377,7 +415,8 @@ def clear_queue_state_cache(): _cached_slurm_states.clear() -def update_from_queue(ptable, qtable=None, dry_run=0, ignore_scriptnames=False): +def update_from_queue(ptable, qtable=None, dry_run=0, ignore_scriptnames=False, + check_complete_jobs=False): """ Given an input prcessing table (ptable) and query table from the Slurm queue (qtable) it cross matches the Slurm job ID's and updates the 'state' in the table using the current state in the Slurm scheduler system. @@ -390,28 +429,42 @@ def update_from_queue(ptable, qtable=None, dry_run=0, ignore_scriptnames=False): ignore_scriptnames, bool. Default is False. Set to true if you do not want to check whether the scriptname matches the jobname return by the slurm scheduler. + check_complete_jobs, bool. Default is False. Set to true if you want to + also check QID's that currently have a STATUS "COMPLETED". + in the ptable. The following are only used if qtable is not provided: dry_run, int. Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default table that doesn't query the Slurm scheduler. Returns: - ptable, Table. The same processing table as the input except that the "STATUS" column in ptable for all jobs is + ptab, Table. A opy of the same processing table as the input except that the "STATUS" column in ptable for all jobs is updated based on the 'STATE' in the qtable (as matched by "LATEST_QID" in the ptable and "JOBID" in the qtable). """ log = get_logger() + ptab = ptable.copy() if qtable is None: - log.info("qtable not provided, querying Slurm using ptable's LATEST_QID set") - qids = np.array(ptable['LATEST_QID']) - ## Avoid null valued QID's (set to -99) - qids = qids[qids > 0] + log.info("qtable not provided, querying Slurm using ptab's LATEST_QID set") + ## Avoid null valued QID's (set to 2) + sel = ptab['LATEST_QID'] > 2 + ## Only submit incomplete jobs unless explicitly told to check them + ## completed jobs shouldn't change status + if not check_complete_jobs: + sel &= (ptab['STATUS'] != 'COMPLETED') + log.info(f"Querying Slurm for {np.sum(sel)} QIDs from table of length {len(ptab)}.") + qids = np.array(ptab['LATEST_QID'][sel]) + ## If you provide empty jobids Slurm gives you the three most recent jobs, + ## which we don't want here + if len(qids) == 0: + log.info(f"No QIDs left to query. Returning the original table.") + return ptab qtable = queue_info_from_qids(qids, dry_run=dry_run) log.info(f"Slurm returned information on {len(qtable)} jobs out of " - +f"{len(ptable)} jobs in the ptable. Updating those now.") + +f"{len(ptab)} jobs in the ptab. Updating those now.") check_scriptname = ('JOBNAME' in qtable.colnames - and 'SCRIPTNAME' in ptable.colnames + and 'SCRIPTNAME' in ptab.colnames and not ignore_scriptnames) if check_scriptname: log.info("Will be verifying that the file names are consistent") @@ -419,21 +472,21 @@ def update_from_queue(ptable, qtable=None, dry_run=0, ignore_scriptnames=False): for row in qtable: if int(row['JOBID']) == get_default_qid(): continue - match = (int(row['JOBID']) == ptable['LATEST_QID']) + match = (int(row['JOBID']) == ptab['LATEST_QID']) if np.any(match): ind = np.where(match)[0][0] - if check_scriptname and ptable['SCRIPTNAME'][ind] not in row['JOBNAME']: - log.warning(f"For job with expids:{ptable['EXPID'][ind]}" - + f" the scriptname is {ptable['SCRIPTNAME'][ind]}" + if check_scriptname and ptab['SCRIPTNAME'][ind] not in row['JOBNAME']: + log.warning(f"For job with expids:{ptab['EXPID'][ind]}" + + f" the scriptname is {ptab['SCRIPTNAME'][ind]}" + f" but the jobname in the queue was " + f"{row['JOBNAME']}.") state = str(row['STATE']).split(' ')[0] ## Since dry run 1 and 2 save proc tables, don't alter the ## states for these when simulating if dry_run > 2 or dry_run < 1: - ptable['STATUS'][ind] = state + ptab['STATUS'][ind] = state - return ptable + return ptab def any_jobs_not_complete(statuses, termination_states=None): """