Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cross-night dependencies for cumulative redshift jobs #2321

Merged
merged 13 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bin/desi_job_graph
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ fx.write(f"""<!DOCTYPE html>
classDef OUT_OF_MEMORY fill:#d95f0e;
classDef TIMEOUT fill:#d95f0e;
classDef CANCELLED fill:#fed98e;
classDef NOTSUBMITTED fill:#fcae1e;
classDef UNKNOWN fill:#ffffcc;
""")

Expand All @@ -106,6 +107,8 @@ for row in proctable:

if qid in jobinfo:
state = jobinfo[qid]['STATE'].split()[0]
elif qid == 1:
state = 'NOTSUBMITTED'
else:
state = 'UNKNOWN'

Expand Down
22 changes: 14 additions & 8 deletions bin/desi_resubmit_queue_failures
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ from astropy.table import Table
import glob

## Import some helper functions, you can see their definitions by uncomenting the bash shell command
from desiutil.log import get_logger
from desispec.workflow.tableio import load_table, write_table
from desispec.workflow.proctable import get_processing_table_pathname
from desispec.workflow.processing import update_and_recurvsively_submit
from desispec.workflow.processing import update_and_recursively_submit
from desispec.workflow.queue import get_resubmission_states

def parse_args(): # options=None):
Expand Down Expand Up @@ -49,6 +50,7 @@ def parse_args(): # options=None):

if __name__ == '__main__':
args = parse_args()
log = get_logger()
ptable_pathname = args.proc_table_pathname
if ptable_pathname is None:
if args.night is None:
Expand All @@ -66,21 +68,25 @@ if __name__ == '__main__':
if not args.dont_resub_failed:
resub_states.append('FAILED')

print(f"Resubmitting the following Slurm states: {resub_states}")
log.info(f"Resubmitting the following Slurm states: {resub_states}")

if args.dry_run > 0 and args.dry_run < 3:
log.warning(f"{args.dry_run=} will be run with limited simulation "
f"because we don't want to write out incorrect queue information.")

## Combine the table names and types for easier passing to io functions
table_type = 'proctable'

## Load in the files defined above
ptable = load_table(tablename=ptable_pathname, tabletype=table_type)
print(f"Identified ptable with {len(ptable)} entries.")
ptable, nsubmits = update_and_recurvsively_submit(ptable, submits=0,
resubmission_states=resub_states,
ptab_name=ptable_pathname, dry_run=args.dry_run,
reservation=args.reservation)
log.info(f"Identified ptable with {len(ptable)} entries.")
ptable, nsubmits = update_and_recursively_submit(ptable, submits=0,
resubmission_states=resub_states,
ptab_name=ptable_pathname, dry_run=args.dry_run,
reservation=args.reservation)

if not args.dry_run:
write_table(ptable, tablename=ptable_pathname)

print("Completed all necessary queue resubmissions from processing "
log.info("Completed all necessary queue resubmissions from processing "
+ f"table: {ptable_pathname}")
6 changes: 3 additions & 3 deletions py/desispec/scripts/daily_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
erow_to_prow, default_prow
from desispec.workflow.processing import parse_previous_tables, flat_joint_fit, arc_joint_fit, get_type_and_tile, \
science_joint_fit, define_and_assign_dependency, create_and_submit, \
update_and_recurvsively_submit, checkfor_and_submit_joint_job, \
update_and_recursively_submit, checkfor_and_submit_joint_job, \
submit_tilenight_and_redshifts
from desispec.workflow.queue import update_from_queue, any_jobs_not_complete
from desispec.io.util import difference_camwords, parse_badamps, validate_badamps
Expand Down Expand Up @@ -445,7 +445,7 @@ def daily_processing_manager(specprod=None, exp_table_path=None, proc_table_path

if len(ptable) > 0:
ptable = update_from_queue(ptable, dry_run=dry_run_level)
# ptable, nsubmits = update_and_recurvsively_submit(ptable,
# ptable, nsubmits = update_and_recursively_submit(ptable,
# ptab_name=proc_table_pathname, dry_run=dry_run_level)

## Exposure table doesn't change in the interim, so no need to re-write it to disk
Expand Down Expand Up @@ -500,7 +500,7 @@ def daily_processing_manager(specprod=None, exp_table_path=None, proc_table_path
# ii,nsubmits = 0, 0
# while ii < 4 and any_jobs_not_complete(ptable['STATUS']):
# print(f"Starting iteration {ii} of queue updating and resubmissions of failures.")
# ptable, nsubmits = update_and_recurvsively_submit(ptable, submits=nsubmits,
# ptable, nsubmits = update_and_recursively_submit(ptable, submits=nsubmits,
# ptab_name=proc_table_pathname, dry_run=dry_run_level)
# if dry_run_level < 3:
# write_table(ptable, tablename=proc_table_pathname)
Expand Down
38 changes: 26 additions & 12 deletions py/desispec/scripts/proc_night.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@
from desispec.workflow.utils import sleep_and_report, \
verify_variable_with_environment, load_override_file
from desispec.workflow.timing import what_night_is_it, during_operating_hours
from desispec.workflow.exptable import get_last_step_options
from desispec.workflow.exptable import get_last_step_options, \
read_minimal_science_exptab_cols
from desispec.workflow.proctable import default_obstypes_for_proctable, \
erow_to_prow, default_prow
erow_to_prow, default_prow, read_minimal_tilenight_proctab_cols
from desispec.workflow.processing import define_and_assign_dependency, \
create_and_submit, \
submit_tilenight_and_redshifts, \
generate_calibration_dict, \
night_to_starting_iid, make_joint_prow, \
set_calibrator_flag, make_exposure_prow, \
all_calibs_submitted, \
update_and_recurvsively_submit, update_accounted_for_with_linking
update_and_recursively_submit, update_accounted_for_with_linking
from desispec.workflow.queue import update_from_queue, any_jobs_failed
from desispec.io.util import decode_camword, difference_camwords, \
create_camword, replace_prefix, erow_to_goodcamword, camword_union
Expand Down Expand Up @@ -336,6 +337,16 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
etable, ptable = load_tables(tablenames=table_pathnames, tabletypes=table_types)
full_etable = etable.copy()

## For I/O efficiency, pre-populate exposure table and processing table caches
## of all nights if doing cross-night redshifts so that future per-night "reads"
## will use the cache.
if z_submit_types is not None and 'cumulative' in z_submit_types:
## this shouldn't need to change since we've already updated the exptab
read_minimal_science_exptab_cols()
## this would become out of date for the current night except
## write_table will keep it up to date
read_minimal_tilenight_proctab_cols()
sbailey marked this conversation as resolved.
Show resolved Hide resolved

## Cut on OBSTYPES
log.info(f"Processing the following obstypes: {proc_obstypes}")
good_types = np.isin(np.array(etable['OBSTYPE']).astype(str), proc_obstypes)
Expand All @@ -346,7 +357,7 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
if tableng > 0:
ptable = update_from_queue(ptable, dry_run=dry_run_level)
if dry_run_level < 3:
write_table(ptable, tablename=proc_table_pathname)
write_table(ptable, tablename=proc_table_pathname, tabletype='proctable')
if any_jobs_failed(ptable['STATUS']):
## Try up to two times to resubmit failures, afterwards give up
## unless explicitly told to proceed with the failures
Expand All @@ -355,10 +366,10 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
if np.max([len(qids) for qids in ptable['ALL_QIDS']]) < 3:
log.info("Job failures were detected. Resubmitting those jobs "
+ "before continuing with new submissions.")
ptable, nsubmits = update_and_recurvsively_submit(ptable,
ptab_name=proc_table_pathname,
dry_run=dry_run,
reservation=reservation)
ptable, nsubmits = update_and_recursively_submit(ptable,
ptab_name=proc_table_pathname,
dry_run=dry_run,
reservation=reservation)
elif not ignore_proc_table_failures:
err = "Some jobs have an incomplete job status. This script " \
+ "will not fix them. You should remedy those first. "
Expand Down Expand Up @@ -472,7 +483,7 @@ def create_submit_add_and_save(prow, proctable, check_outputs=check_for_outputs,
## Add the processing row to the processing table
proctable.add_row(prow)
if len(proctable) > 0 and dry_run_level < 3:
write_table(proctable, tablename=proc_table_pathname)
write_table(proctable, tablename=proc_table_pathname, tabletype='proctable')
sleep_and_report(sub_wait_time,
message_suffix=f"to slow down the queue submission rate",
dry_run=dry_run, logfunc=log.info)
Expand Down Expand Up @@ -559,7 +570,7 @@ def create_submit_add_and_save(prow, proctable, check_outputs=check_for_outputs,
extra_job_args=extra_job_args)

if len(ptable) > 0 and dry_run_level < 3:
write_table(ptable, tablename=proc_table_pathname)
write_table(ptable, tablename=proc_table_pathname, tabletype='proctable')

sleep_and_report(sub_wait_time,
message_suffix=f"to slow down the queue submission rate",
Expand All @@ -573,9 +584,12 @@ def create_submit_add_and_save(prow, proctable, check_outputs=check_for_outputs,
unproc_table = None
if len(ptable) > 0:
## All jobs now submitted, update information from job queue and save
ptable = update_from_queue(ptable, dry_run=dry_run_level)
## But only if actually submitting or fully simulating, don't simulate
## outputs that will be written to disk (levels 1 and 2)
if dry_run_level < 1 or dry_run_level > 2:
ptable = update_from_queue(ptable, dry_run=dry_run_level)
if dry_run_level < 3:
write_table(ptable, tablename=proc_table_pathname)
write_table(ptable, tablename=proc_table_pathname, tabletype='proctable')
## Now that processing is complete, lets identify what we didn't process
if len(ptable) > 0:
processed = np.isin(full_etable['EXPID'], np.unique(np.concatenate(ptable['EXPID'])))
Expand Down
6 changes: 3 additions & 3 deletions py/desispec/scripts/submit_night.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from desispec.workflow.utils import pathjoin, sleep_and_report
from desispec.workflow.timing import what_night_is_it
from desispec.workflow.exptable import get_exposure_table_path, \
get_exposure_table_name, get_last_step_options
get_exposure_table_name, get_last_step_options, \
read_minimal_science_exptab_cols
from desispec.workflow.proctable import default_obstypes_for_proctable, get_processing_table_path, \
get_processing_table_name, erow_to_prow, table_row_to_dict, \
default_prow
Expand All @@ -25,7 +26,6 @@
checkfor_and_submit_joint_job, submit_tilenight_and_redshifts
from desispec.workflow.queue import update_from_queue, any_jobs_not_complete
from desispec.workflow.desi_proc_funcs import get_desi_proc_batch_file_path
from desispec.workflow.redshifts import read_minimal_exptables_columns
from desispec.io.util import decode_camword, difference_camwords, create_camword

def submit_night(night, proc_obstypes=None, z_submit_types=None, queue='realtime',
Expand Down Expand Up @@ -245,7 +245,7 @@ def submit_night(night, proc_obstypes=None, z_submit_types=None, queue='realtime
tiles_cumulative = list(tiles_this_night)
log.info(f'Submitting cumulative redshifts for all tiles: {tiles_cumulative}')
else:
allexp = read_minimal_exptables_columns(tileids=tiles_this_night)
allexp = read_minimal_science_exptab_cols(tileids=tiles_this_night)
for tileid in tiles_this_night:
nights_with_tile = allexp['NIGHT'][allexp['TILEID'] == tileid]
if len(nights_with_tile) > 0 and night == np.max(nights_with_tile):
Expand Down
62 changes: 36 additions & 26 deletions py/desispec/scripts/tile_redshifts.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from astropy.table import Table, vstack

from desispec.io.util import parse_cameras
from desispec.workflow.redshifts import read_minimal_exptables_columns, \
create_desi_zproc_batch_script
from desispec.workflow.redshifts import create_desi_zproc_batch_script
from desispec.workflow.exptable import read_minimal_science_exptab_cols
from desiutil.log import get_logger

from desispec.workflow import batch
Expand Down Expand Up @@ -60,17 +60,6 @@ def main(args=None):
num_error = len(failed_jobs)
sys.exit(num_error)

# _allexp is cache of all exposure tables stacked so that we don't have to read all
# of them every time we call generate_tile_redshift_scripts()
_allexp = None

def reset_allexp_cache():
"""
Utility script to reset the _allexp cache to ensure it is re-read from disk
"""
global _allexp
_allexp = None

def generate_tile_redshift_scripts(group, nights=None, tileid=None, expids=None, explist=None,
camword=None, max_gpuprocs=None, no_gpu=False,
run_zmtl=False, no_afterburners=False,
Expand Down Expand Up @@ -126,7 +115,7 @@ def generate_tile_redshift_scripts(group, nights=None, tileid=None, expids=None,
else:
log.info(f'Loading production exposure tables for all nights')

exptable = read_minimal_exptables_columns(nights)
exptable = read_minimal_science_exptab_cols(nights)

else:
log.info(f'Loading exposure list from {explist}')
Expand Down Expand Up @@ -190,28 +179,49 @@ def generate_tile_redshift_scripts(group, nights=None, tileid=None, expids=None,
# - NOTE: depending upon options, this might re-read all the exptables again
# - NOTE: this may not scale well several years into the survey
if group == 'cumulative':
if nights is not None:
lastnight = int(np.max(nights))
elif exptable is not None:
lastnight = int(np.max(exptable['NIGHT']))
else:
lastnight = None
log.info(f'{len(tileids)} tiles; searching for exposures on prior nights')
global _allexp
if _allexp is None:
log.info(f'Reading all exposure_tables from all nights')
_allexp = read_minimal_exptables_columns()
keep = np.in1d(_allexp['TILEID'], tileids)
newexptable = _allexp[keep]
log.info(f'Reading all exposure_tables from all nights')
newexptable = read_minimal_science_exptab_cols(tileids=tileids)
newexptable = newexptable[['EXPID', 'NIGHT', 'TILEID']]

if exptable is not None:
expids = exptable['EXPID']
missing_exps = np.in1d(expids, newexptable['EXPID'], invert=True)
if np.any(missing_exps):
latest_exptable = read_minimal_exptables_columns(nights=np.unique(exptable['NIGHT'][missing_exps]))
keep = np.in1d(latest_exptable['EXPID'], expids[missing_exps])
latest_exptable = latest_exptable[keep]
newexptable = vstack([newexptable, latest_exptable])
log.warning(f'Identified {np.sum(missing_exps)} missing exposures '
+ f'in the exposure cache. Resetting the cache to acquire'
+ f' them from all nights')
## reset_cache will remove cache but it won't be repopulated
## unless we request all nights. So let's request all nights
## then subselect to the nights we want
latest_exptable = read_minimal_science_exptab_cols(tileids=tileids,
reset_cache=True)
sbailey marked this conversation as resolved.
Show resolved Hide resolved
latest_exptable = latest_exptable[['EXPID', 'NIGHT', 'TILEID']]
missing_exps = np.in1d(expids, newexptable['EXPID'], invert=True)
if np.any(missing_exps):
log.error(f'Identified {np.sum(missing_exps)} missing exposures '
+ f'in the exposure cache even after updating. Using the '
+ f'appending the user provided exposures but this may '
+ f'indicate a problem.')
newexptable = vstack([latest_exptable, exptable[missing_exps]])
else:
newexptable = latest_exptable

newexptable.sort(['EXPID'])
exptable = newexptable

## Ensure we only include data for nights up to and including specified nights
if nights is not None:
lastnight = int(np.max(nights))
if lastnight is not None:
log.info(f'Selecting only those exposures on nights before or '
+ f'during the latest night provided: {lastnight}')
exptable = exptable[exptable['NIGHT'] <= lastnight]

#expids = np.array(exptable['EXPID'])
tileids = np.unique(np.array(exptable['TILEID']))

Expand Down
8 changes: 4 additions & 4 deletions py/desispec/scripts/tile_redshifts_bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import numpy as np
from astropy.table import Table, vstack

from desispec.workflow.redshifts import read_minimal_exptables_columns, \
get_ztile_script_pathname, get_ztile_relpath, \
from desispec.workflow.redshifts import get_ztile_script_pathname, get_ztile_relpath, \
get_ztile_script_suffix
from desispec.workflow.exptable import read_minimal_science_exptab_cols
from desiutil.log import get_logger

from desispec.workflow import batch
Expand Down Expand Up @@ -599,7 +599,7 @@ def generate_tile_redshift_scripts(group, night=None, tileid=None, expid=None, e
else:
log.info(f'Loading production exposure tables for all nights')

exptable = read_minimal_exptables_columns(night)
exptable = read_minimal_science_exptab_cols(night)

else:
log.info(f'Loading exposure list from {explist}')
Expand Down Expand Up @@ -656,7 +656,7 @@ def generate_tile_redshift_scripts(group, night=None, tileid=None, expid=None, e
# - NOTE: this may not scale well several years into the survey
if group == 'cumulative':
log.info(f'{len(tileids)} tiles; searching for exposures on prior nights')
allexp = read_minimal_exptables_columns()
allexp = read_minimal_science_exptab_cols()
keep = np.in1d(allexp['TILEID'], tileids)
exptable = allexp[keep]
## Ensure we only include data for nights up to and including specified nights
Expand Down
2 changes: 1 addition & 1 deletion py/desispec/scripts/update_exptable.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def update_exposure_table(night=None, specprod=None, exp_table_pathname=None,

## Only write out the table at the end and only if dry_run_level dictates
if dry_run_level < 3:
write_table(etable, tablename=exp_table_pathname)
write_table(etable, tablename=exp_table_pathname, tabletype='exptable')
else:
log.info(f"{dry_run_level=}, so not saving exposure table.\n{etable=}")

Expand Down
Loading
Loading