Skip to content

Commit

Permalink
dont exit on submission failure, just mark unsubmitted
Browse files Browse the repository at this point in the history
  • Loading branch information
akremin committed Aug 16, 2024
1 parent c3444b8 commit 301d390
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 22 deletions.
38 changes: 24 additions & 14 deletions py/desispec/workflow/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from desispec.workflow.tableio import write_table, load_table
from desispec.workflow.proctable import table_row_to_dict, erow_to_prow, \
read_minimal_tilenight_proctab_cols, read_minimal_full_proctab_cols, \
update_full_ptab_cache
update_full_ptab_cache, default_prow, get_default_qid
from desiutil.log import get_logger

from desispec.io import findfile, specprod_root
Expand Down Expand Up @@ -690,7 +690,7 @@ def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=F
batch_params.append(f'--reservation={reservation}')

batch_params.append(f'{script_path}')

submitted = True
if dry_run:
current_qid = _get_fake_qid()
else:
Expand All @@ -709,21 +709,31 @@ def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=F
log.info('Sleeping 60 seconds then retrying')
time.sleep(60)
else: #- for/else happens if loop doesn't succeed
msg = f'{jobname} submission failed {max_attempts} times; exiting'
log.critical(msg)
raise RuntimeError(msg)
msg = f'{jobname} submission failed {max_attempts} times.' \
+ ' setting as unsubmitted and moving on'
log.error(msg)
current_qid = get_default_qid()
submitted = False

log.info(batch_params)
log.info(f'Submitted {jobname} with dependencies {dep_str} and reservation={reservation}. Returned qid: {current_qid}')

## Update prow with new information
prow['LATEST_QID'] = current_qid
prow['ALL_QIDS'] = np.append(prow['ALL_QIDS'],current_qid)
prow['STATUS'] = 'SUBMITTED'
prow['SUBMIT_DATE'] = int(time.time())

## Update the Slurm jobid cache of job states
update_queue_state_cache(qid=prow['LATEST_QID'], state=prow['STATUS'])
## If we didn't submit, don't say we did and don't add to ALL_QIDS
if submitted:
log.info(f'Submitted {jobname} with dependencies {dep_str} and '
+ f'reservation={reservation}. Returned qid: {current_qid}')

## Update prow with new information
prow['ALL_QIDS'] = np.append(prow['ALL_QIDS'],current_qid)
prow['STATUS'] = 'SUBMITTED'
prow['SUBMIT_DATE'] = int(time.time())
else:
prow['STATUS'] = 'UNSUBMITTED'

## Update the Slurm jobid cache of job states
update_queue_state_cache(qid=prow['LATEST_QID'], state=prow['STATUS'])

return prow

Expand Down Expand Up @@ -1784,7 +1794,7 @@ def make_joint_prow(prows, descriptor, internal_id):
joint_prow['LATEST_QID'] = -99
joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
joint_prow['SUBMIT_DATE'] = -99
joint_prow['STATUS'] = 'U'
joint_prow['STATUS'] = 'UNSUBMITTED'
joint_prow['SCRIPTNAME'] = ''
joint_prow['EXPID'] = np.unique(np.concatenate([currow['EXPID'] for currow in prows])).astype(int)

Expand Down Expand Up @@ -1861,7 +1871,7 @@ def make_tnight_prow(prows, calibjobs, internal_id):
joint_prow['LATEST_QID'] = -99
joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
joint_prow['SUBMIT_DATE'] = -99
joint_prow['STATUS'] = 'U'
joint_prow['STATUS'] = 'UNSUBMITTED'
joint_prow['SCRIPTNAME'] = ''
joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)

Expand Down Expand Up @@ -1892,7 +1902,7 @@ def make_redshift_prow(prows, tnights, descriptor, internal_id):
redshift_prow['LATEST_QID'] = -99
redshift_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
redshift_prow['SUBMIT_DATE'] = -99
redshift_prow['STATUS'] = 'U'
redshift_prow['STATUS'] = 'UNSUBMITTED'
redshift_prow['SCRIPTNAME'] = ''
redshift_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)

Expand Down
15 changes: 10 additions & 5 deletions py/desispec/workflow/proctable.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def get_processing_table_column_defs(return_default_values=False,
"""
## Define the column names for the internal production table and their respective datatypes, split in two
## only for readability's sake

defqid = get_default_qid()
colnames1 = ['EXPID' , 'OBSTYPE', 'TILEID', 'NIGHT' ]
coltypes1 = [np.ndarray , 'S10' , int , int ]
coldeflt1 = [np.ndarray(shape=0).astype(int), 'unknown', -99 , 20000101]
Expand All @@ -88,11 +88,11 @@ def get_processing_table_column_defs(return_default_values=False,

colnames2 = [ 'PROCCAMWORD' ,'CALIBRATOR', 'INTID', 'OBSDESC', 'JOBDESC', 'LATEST_QID']
coltypes2 = [ 'S40' , np.int8 , int , 'S16' , 'S12' , int ]
coldeflt2 = [ 'a0123456789' , 0 , -99 , '' , 'unknown', -99 ]
coldeflt2 = [ 'a0123456789' , 0 , -99 , '' , 'unknown', defqid ]

colnames2 += [ 'SUBMIT_DATE', 'STATUS', 'SCRIPTNAME']
coltypes2 += [ int , 'S14' , 'S40' ]
coldeflt2 += [ -99 , 'U' , '' ]
colnames2 += [ 'SUBMIT_DATE', 'STATUS' , 'SCRIPTNAME']
coltypes2 += [ int , 'S14' , 'S40' ]
coldeflt2 += [ -99 , 'UNSUBMITTED', '' ]

colnames2 += ['INT_DEP_IDS' , 'LATEST_DEP_QID' , 'ALL_QIDS' ]
coltypes2 += [np.ndarray , np.ndarray , np.ndarray ]
Expand All @@ -111,6 +111,11 @@ def get_processing_table_column_defs(return_default_values=False,
return colnames, coldtypes, coldeflts
else:
return colnames, coldtypes
def get_default_qid():
"""
Returns the default slurm job id (QID) for the pipeline
"""
return 1 #99999999

def default_obstypes_for_proctable():
"""
Expand Down
17 changes: 14 additions & 3 deletions py/desispec/workflow/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import numpy as np
from astropy.table import Table, vstack
import subprocess

from desispec.workflow.proctable import get_default_qid
from desiutil.log import get_logger
import time, datetime

Expand Down Expand Up @@ -39,7 +41,11 @@ def get_resubmission_states():
Returns:
list. A list of strings outlining the job states that should be resubmitted.
"""
return ['UNSUBMITTED', 'BOOT_FAIL', 'DEADLINE', 'NODE_FAIL', 'OUT_OF_MEMORY', 'PREEMPTED', 'TIMEOUT', 'CANCELLED']
## 'UNSUBMITTED' is default pipeline state for things not yet submitted
## 'DEP_NOT_SUBD' is set when resubmission can't proceed because a
## dependency has failed
return ['UNSUBMITTED', 'DEP_NOT_SUBD', 'BOOT_FAIL', 'DEADLINE', 'NODE_FAIL',
'OUT_OF_MEMORY', 'PREEMPTED', 'TIMEOUT', 'CANCELLED']


def get_termination_states():
Expand Down Expand Up @@ -301,6 +307,7 @@ def get_queue_states_from_qids(qids, dry_run=0, use_cache=False):
Dict
Dictionary with the keys as jobids and values as the slurm state of the job.
"""
def_qid = get_default_qid()
global _cached_slurm_states
qids = np.atleast_1d(qids).astype(int)
log = get_logger()
Expand All @@ -317,7 +324,8 @@ def get_queue_states_from_qids(qids, dry_run=0, use_cache=False):
if dry_run > 2 or dry_run < 1:
outtable = queue_info_from_qids(qids, columns='jobid,state', dry_run=dry_run)
for row in outtable:
outdict[int(row['JOBID'])] = row['STATE']
if int(row['JOBID']) != def_qid:
outdict[int(row['JOBID'])] = row['STATE']
return outdict

def update_queue_state_cache_from_table(queue_info_table):
Expand Down Expand Up @@ -357,7 +365,8 @@ def update_queue_state_cache(qid, state):
"""
global _cached_slurm_states
_cached_slurm_states[int(qid)] = state
if int(qid) != get_default_qid():
_cached_slurm_states[int(qid)] = state

def clear_queue_state_cache():
"""
Expand Down Expand Up @@ -407,6 +416,8 @@ def update_from_queue(ptable, qtable=None, dry_run=0, ignore_scriptnames=False):
log.info("Will be verifying that the file names are consistent")

for row in qtable:
if int(row['JOBID']) == get_default_qid():
continue
match = (int(row['JOBID']) == ptable['LATEST_QID'])
if np.any(match):
ind = np.where(match)[0][0]
Expand Down

0 comments on commit 301d390

Please sign in to comment.