From 301d390e92367386c9c7f4661dd60fe5ab5ec2e9 Mon Sep 17 00:00:00 2001 From: akremin Date: Thu, 15 Aug 2024 18:04:04 -0700 Subject: [PATCH] dont exit on submission failure, just mark unsubmitted --- py/desispec/workflow/processing.py | 38 +++++++++++++++++++----------- py/desispec/workflow/proctable.py | 15 ++++++++---- py/desispec/workflow/queue.py | 17 ++++++++++--- 3 files changed, 48 insertions(+), 22 deletions(-) diff --git a/py/desispec/workflow/processing.py b/py/desispec/workflow/processing.py index 5868dd4d1..c566054db 100644 --- a/py/desispec/workflow/processing.py +++ b/py/desispec/workflow/processing.py @@ -33,7 +33,7 @@ from desispec.workflow.tableio import write_table, load_table from desispec.workflow.proctable import table_row_to_dict, erow_to_prow, \ read_minimal_tilenight_proctab_cols, read_minimal_full_proctab_cols, \ - update_full_ptab_cache + update_full_ptab_cache, default_prow, get_default_qid from desiutil.log import get_logger from desispec.io import findfile, specprod_root @@ -690,7 +690,7 @@ def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=F batch_params.append(f'--reservation={reservation}') batch_params.append(f'{script_path}') - + submitted = True if dry_run: current_qid = _get_fake_qid() else: @@ -709,21 +709,31 @@ def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=F log.info('Sleeping 60 seconds then retrying') time.sleep(60) else: #- for/else happens if loop doesn't succeed - msg = f'{jobname} submission failed {max_attempts} times; exiting' - log.critical(msg) - raise RuntimeError(msg) + msg = f'{jobname} submission failed {max_attempts} times.' \ + + ' setting as unsubmitted and moving on' + log.error(msg) + current_qid = get_default_qid() + submitted = False log.info(batch_params) - log.info(f'Submitted {jobname} with dependencies {dep_str} and reservation={reservation}. Returned qid: {current_qid}') ## Update prow with new information prow['LATEST_QID'] = current_qid - prow['ALL_QIDS'] = np.append(prow['ALL_QIDS'],current_qid) - prow['STATUS'] = 'SUBMITTED' - prow['SUBMIT_DATE'] = int(time.time()) - ## Update the Slurm jobid cache of job states - update_queue_state_cache(qid=prow['LATEST_QID'], state=prow['STATUS']) + ## If we didn't submit, don't say we did and don't add to ALL_QIDS + if submitted: + log.info(f'Submitted {jobname} with dependencies {dep_str} and ' + + f'reservation={reservation}. Returned qid: {current_qid}') + + ## Update prow with new information + prow['ALL_QIDS'] = np.append(prow['ALL_QIDS'],current_qid) + prow['STATUS'] = 'SUBMITTED' + prow['SUBMIT_DATE'] = int(time.time()) + else: + prow['STATUS'] = 'UNSUBMITTED' + + ## Update the Slurm jobid cache of job states + update_queue_state_cache(qid=prow['LATEST_QID'], state=prow['STATUS']) return prow @@ -1784,7 +1794,7 @@ def make_joint_prow(prows, descriptor, internal_id): joint_prow['LATEST_QID'] = -99 joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int) joint_prow['SUBMIT_DATE'] = -99 - joint_prow['STATUS'] = 'U' + joint_prow['STATUS'] = 'UNSUBMITTED' joint_prow['SCRIPTNAME'] = '' joint_prow['EXPID'] = np.unique(np.concatenate([currow['EXPID'] for currow in prows])).astype(int) @@ -1861,7 +1871,7 @@ def make_tnight_prow(prows, calibjobs, internal_id): joint_prow['LATEST_QID'] = -99 joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int) joint_prow['SUBMIT_DATE'] = -99 - joint_prow['STATUS'] = 'U' + joint_prow['STATUS'] = 'UNSUBMITTED' joint_prow['SCRIPTNAME'] = '' joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int) @@ -1892,7 +1902,7 @@ def make_redshift_prow(prows, tnights, descriptor, internal_id): redshift_prow['LATEST_QID'] = -99 redshift_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int) redshift_prow['SUBMIT_DATE'] = -99 - redshift_prow['STATUS'] = 'U' + redshift_prow['STATUS'] = 'UNSUBMITTED' redshift_prow['SCRIPTNAME'] = '' redshift_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int) diff --git a/py/desispec/workflow/proctable.py b/py/desispec/workflow/proctable.py index 3d995c66f..4b37253d4 100644 --- a/py/desispec/workflow/proctable.py +++ b/py/desispec/workflow/proctable.py @@ -77,7 +77,7 @@ def get_processing_table_column_defs(return_default_values=False, """ ## Define the column names for the internal production table and their respective datatypes, split in two ## only for readability's sake - + defqid = get_default_qid() colnames1 = ['EXPID' , 'OBSTYPE', 'TILEID', 'NIGHT' ] coltypes1 = [np.ndarray , 'S10' , int , int ] coldeflt1 = [np.ndarray(shape=0).astype(int), 'unknown', -99 , 20000101] @@ -88,11 +88,11 @@ def get_processing_table_column_defs(return_default_values=False, colnames2 = [ 'PROCCAMWORD' ,'CALIBRATOR', 'INTID', 'OBSDESC', 'JOBDESC', 'LATEST_QID'] coltypes2 = [ 'S40' , np.int8 , int , 'S16' , 'S12' , int ] - coldeflt2 = [ 'a0123456789' , 0 , -99 , '' , 'unknown', -99 ] + coldeflt2 = [ 'a0123456789' , 0 , -99 , '' , 'unknown', defqid ] - colnames2 += [ 'SUBMIT_DATE', 'STATUS', 'SCRIPTNAME'] - coltypes2 += [ int , 'S14' , 'S40' ] - coldeflt2 += [ -99 , 'U' , '' ] + colnames2 += [ 'SUBMIT_DATE', 'STATUS' , 'SCRIPTNAME'] + coltypes2 += [ int , 'S14' , 'S40' ] + coldeflt2 += [ -99 , 'UNSUBMITTED', '' ] colnames2 += ['INT_DEP_IDS' , 'LATEST_DEP_QID' , 'ALL_QIDS' ] coltypes2 += [np.ndarray , np.ndarray , np.ndarray ] @@ -111,6 +111,11 @@ def get_processing_table_column_defs(return_default_values=False, return colnames, coldtypes, coldeflts else: return colnames, coldtypes +def get_default_qid(): + """ + Returns the default slurm job id (QID) for the pipeline + """ + return 1 #99999999 def default_obstypes_for_proctable(): """ diff --git a/py/desispec/workflow/queue.py b/py/desispec/workflow/queue.py index ccff9c704..0d6c04320 100644 --- a/py/desispec/workflow/queue.py +++ b/py/desispec/workflow/queue.py @@ -7,6 +7,8 @@ import numpy as np from astropy.table import Table, vstack import subprocess + +from desispec.workflow.proctable import get_default_qid from desiutil.log import get_logger import time, datetime @@ -39,7 +41,11 @@ def get_resubmission_states(): Returns: list. A list of strings outlining the job states that should be resubmitted. """ - return ['UNSUBMITTED', 'BOOT_FAIL', 'DEADLINE', 'NODE_FAIL', 'OUT_OF_MEMORY', 'PREEMPTED', 'TIMEOUT', 'CANCELLED'] + ## 'UNSUBMITTED' is default pipeline state for things not yet submitted + ## 'DEP_NOT_SUBD' is set when resubmission can't proceed because a + ## dependency has failed + return ['UNSUBMITTED', 'DEP_NOT_SUBD', 'BOOT_FAIL', 'DEADLINE', 'NODE_FAIL', + 'OUT_OF_MEMORY', 'PREEMPTED', 'TIMEOUT', 'CANCELLED'] def get_termination_states(): @@ -301,6 +307,7 @@ def get_queue_states_from_qids(qids, dry_run=0, use_cache=False): Dict Dictionary with the keys as jobids and values as the slurm state of the job. """ + def_qid = get_default_qid() global _cached_slurm_states qids = np.atleast_1d(qids).astype(int) log = get_logger() @@ -317,7 +324,8 @@ def get_queue_states_from_qids(qids, dry_run=0, use_cache=False): if dry_run > 2 or dry_run < 1: outtable = queue_info_from_qids(qids, columns='jobid,state', dry_run=dry_run) for row in outtable: - outdict[int(row['JOBID'])] = row['STATE'] + if int(row['JOBID']) != def_qid: + outdict[int(row['JOBID'])] = row['STATE'] return outdict def update_queue_state_cache_from_table(queue_info_table): @@ -357,7 +365,8 @@ def update_queue_state_cache(qid, state): """ global _cached_slurm_states - _cached_slurm_states[int(qid)] = state + if int(qid) != get_default_qid(): + _cached_slurm_states[int(qid)] = state def clear_queue_state_cache(): """ @@ -407,6 +416,8 @@ def update_from_queue(ptable, qtable=None, dry_run=0, ignore_scriptnames=False): log.info("Will be verifying that the file names are consistent") for row in qtable: + if int(row['JOBID']) == get_default_qid(): + continue match = (int(row['JOBID']) == ptable['LATEST_QID']) if np.any(match): ind = np.where(match)[0][0]