From a27a5fd794f769a478a81d855968c4ea5db01c08 Mon Sep 17 00:00:00 2001 From: akremin Date: Sun, 28 Jul 2024 23:40:02 -0700 Subject: [PATCH] add proctable concatenation cache, update exptable cache, and add cross-night redshift dependencies --- py/desispec/scripts/proc_night.py | 19 +- py/desispec/scripts/submit_night.py | 4 +- py/desispec/scripts/tile_redshifts.py | 60 +++-- py/desispec/scripts/tile_redshifts_bash.py | 6 +- py/desispec/scripts/update_exptable.py | 2 +- py/desispec/scripts/zproc.py | 6 +- py/desispec/scripts/zprocdashboard.py | 6 +- py/desispec/test/test_proc_night.py | 2 +- py/desispec/workflow/processing.py | 123 ++++++---- py/desispec/workflow/redshifts.py | 251 +++++++++++++++++++-- py/desispec/workflow/science_selection.py | 4 +- py/desispec/workflow/tableio.py | 9 +- 12 files changed, 392 insertions(+), 100 deletions(-) diff --git a/py/desispec/scripts/proc_night.py b/py/desispec/scripts/proc_night.py index 36f514cc2..1f1928f5e 100644 --- a/py/desispec/scripts/proc_night.py +++ b/py/desispec/scripts/proc_night.py @@ -7,6 +7,8 @@ from desispec.scripts.link_calibnight import derive_include_exclude from desispec.workflow.calibration_selection import \ determine_calibrations_to_proc +from desispec.workflow.redshifts import read_minimal_science_exptab_cols, \ + read_minimal_tilenight_proctab_cols from desispec.workflow.science_selection import determine_science_to_proc, \ get_tiles_cumulative from desiutil.log import get_logger @@ -336,6 +338,15 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None, etable, ptable = load_tables(tablenames=table_pathnames, tabletypes=table_types) full_etable = etable.copy() + ## Pre-populate exposure table and processing table caches of all nights + ## if doing cross-night redshifts + if 'cumulative' in z_submit_types: + ## this shouldn't need to change since we've already updated the exptab + read_minimal_science_exptab_cols() + ## this would become out of date for the current night except + ## write_table will keep it up to date + read_minimal_tilenight_proctab_cols() + ## Cut on OBSTYPES log.info(f"Processing the following obstypes: {proc_obstypes}") good_types = np.isin(np.array(etable['OBSTYPE']).astype(str), proc_obstypes) @@ -346,7 +357,7 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None, if tableng > 0: ptable = update_from_queue(ptable, dry_run=dry_run_level) if dry_run_level < 3: - write_table(ptable, tablename=proc_table_pathname) + write_table(ptable, tablename=proc_table_pathname, tabletype='proctable') if any_jobs_failed(ptable['STATUS']): ## Try up to two times to resubmit failures, afterwards give up ## unless explicitly told to proceed with the failures @@ -472,7 +483,7 @@ def create_submit_add_and_save(prow, proctable, check_outputs=check_for_outputs, ## Add the processing row to the processing table proctable.add_row(prow) if len(proctable) > 0 and dry_run_level < 3: - write_table(proctable, tablename=proc_table_pathname) + write_table(proctable, tablename=proc_table_pathname, tabletype='proctable') sleep_and_report(sub_wait_time, message_suffix=f"to slow down the queue submission rate", dry_run=dry_run, logfunc=log.info) @@ -559,7 +570,7 @@ def create_submit_add_and_save(prow, proctable, check_outputs=check_for_outputs, extra_job_args=extra_job_args) if len(ptable) > 0 and dry_run_level < 3: - write_table(ptable, tablename=proc_table_pathname) + write_table(ptable, tablename=proc_table_pathname, tabletype='proctable') sleep_and_report(sub_wait_time, message_suffix=f"to slow down the queue submission rate", @@ -575,7 +586,7 @@ def create_submit_add_and_save(prow, proctable, check_outputs=check_for_outputs, ## All jobs now submitted, update information from job queue and save ptable = update_from_queue(ptable, dry_run=dry_run_level) if dry_run_level < 3: - write_table(ptable, tablename=proc_table_pathname) + write_table(ptable, tablename=proc_table_pathname, tabletype='proctable') ## Now that processing is complete, lets identify what we didn't process if len(ptable) > 0: processed = np.isin(full_etable['EXPID'], np.unique(np.concatenate(ptable['EXPID']))) diff --git a/py/desispec/scripts/submit_night.py b/py/desispec/scripts/submit_night.py index 2718e3eb0..9695a2a07 100644 --- a/py/desispec/scripts/submit_night.py +++ b/py/desispec/scripts/submit_night.py @@ -25,7 +25,7 @@ checkfor_and_submit_joint_job, submit_tilenight_and_redshifts from desispec.workflow.queue import update_from_queue, any_jobs_not_complete from desispec.workflow.desi_proc_funcs import get_desi_proc_batch_file_path -from desispec.workflow.redshifts import read_minimal_exptables_columns +from desispec.workflow.redshifts import read_minimal_science_exptab_cols from desispec.io.util import decode_camword, difference_camwords, create_camword def submit_night(night, proc_obstypes=None, z_submit_types=None, queue='realtime', @@ -245,7 +245,7 @@ def submit_night(night, proc_obstypes=None, z_submit_types=None, queue='realtime tiles_cumulative = list(tiles_this_night) log.info(f'Submitting cumulative redshifts for all tiles: {tiles_cumulative}') else: - allexp = read_minimal_exptables_columns(tileids=tiles_this_night) + allexp = read_minimal_science_exptab_cols(tileids=tiles_this_night) for tileid in tiles_this_night: nights_with_tile = allexp['NIGHT'][allexp['TILEID'] == tileid] if len(nights_with_tile) > 0 and night == np.max(nights_with_tile): diff --git a/py/desispec/scripts/tile_redshifts.py b/py/desispec/scripts/tile_redshifts.py index 8ddb54e1e..fbb203b60 100644 --- a/py/desispec/scripts/tile_redshifts.py +++ b/py/desispec/scripts/tile_redshifts.py @@ -11,7 +11,7 @@ from astropy.table import Table, vstack from desispec.io.util import parse_cameras -from desispec.workflow.redshifts import read_minimal_exptables_columns, \ +from desispec.workflow.redshifts import read_minimal_science_exptab_cols, \ create_desi_zproc_batch_script from desiutil.log import get_logger @@ -60,17 +60,6 @@ def main(args=None): num_error = len(failed_jobs) sys.exit(num_error) -# _allexp is cache of all exposure tables stacked so that we don't have to read all -# of them every time we call generate_tile_redshift_scripts() -_allexp = None - -def reset_allexp_cache(): - """ - Utility script to reset the _allexp cache to ensure it is re-read from disk - """ - global _allexp - _allexp = None - def generate_tile_redshift_scripts(group, nights=None, tileid=None, expids=None, explist=None, camword=None, max_gpuprocs=None, no_gpu=False, run_zmtl=False, no_afterburners=False, @@ -126,7 +115,7 @@ def generate_tile_redshift_scripts(group, nights=None, tileid=None, expids=None, else: log.info(f'Loading production exposure tables for all nights') - exptable = read_minimal_exptables_columns(nights) + exptable = read_minimal_science_exptab_cols(nights) else: log.info(f'Loading exposure list from {explist}') @@ -190,28 +179,49 @@ def generate_tile_redshift_scripts(group, nights=None, tileid=None, expids=None, # - NOTE: depending upon options, this might re-read all the exptables again # - NOTE: this may not scale well several years into the survey if group == 'cumulative': + if nights is not None: + lastnight = int(np.max(nights)) + elif exptable is not None: + lastnight = int(np.max(exptable['NIGHT'])) + else: + lastnight = None log.info(f'{len(tileids)} tiles; searching for exposures on prior nights') - global _allexp - if _allexp is None: - log.info(f'Reading all exposure_tables from all nights') - _allexp = read_minimal_exptables_columns() - keep = np.in1d(_allexp['TILEID'], tileids) - newexptable = _allexp[keep] + log.info(f'Reading all exposure_tables from all nights') + newexptable = read_minimal_science_exptab_cols(tileids=tileids) + newexptable = newexptable[['EXPID', 'NIGHT', 'TILEID']] + if exptable is not None: expids = exptable['EXPID'] missing_exps = np.in1d(expids, newexptable['EXPID'], invert=True) if np.any(missing_exps): - latest_exptable = read_minimal_exptables_columns(nights=np.unique(exptable['NIGHT'][missing_exps])) - keep = np.in1d(latest_exptable['EXPID'], expids[missing_exps]) - latest_exptable = latest_exptable[keep] - newexptable = vstack([newexptable, latest_exptable]) + log.warning(f'Identified {np.sum(missing_exps)} missing exposures ' + + f'in the exposure cache. Resetting the cache to acquire' + + f' them from all nights') + ## reset_cache will remove cache but it won't be repopulated + ## unless we request all nights. So let's request all nights + ## then subselect to the nights we want + latest_exptable = read_minimal_science_exptab_cols(tileids=tileids, + reset_cache=True) + latest_exptable = latest_exptable[['EXPID', 'NIGHT', 'TILEID']] + missing_exps = np.in1d(expids, newexptable['EXPID'], invert=True) + if np.any(missing_exps): + log.error(f'Identified {np.sum(missing_exps)} missing exposures ' + + f'in the exposure cache even after updating. Using the ' + + f'appending the user provided exposures but this may ' + + f'indicate a problem.') + newexptable = vstack([latest_exptable, exptable[missing_exps]]) + else: + newexptable = latest_exptable newexptable.sort(['EXPID']) exptable = newexptable + ## Ensure we only include data for nights up to and including specified nights - if nights is not None: - lastnight = int(np.max(nights)) + if lastnight is not None: + log.info(f'Selecting only those exposures on nights before or ' + + f'during the latest night provided: {lastnight}') exptable = exptable[exptable['NIGHT'] <= lastnight] + #expids = np.array(exptable['EXPID']) tileids = np.unique(np.array(exptable['TILEID'])) diff --git a/py/desispec/scripts/tile_redshifts_bash.py b/py/desispec/scripts/tile_redshifts_bash.py index 4561dc5ef..41ae5f160 100644 --- a/py/desispec/scripts/tile_redshifts_bash.py +++ b/py/desispec/scripts/tile_redshifts_bash.py @@ -10,7 +10,7 @@ import numpy as np from astropy.table import Table, vstack -from desispec.workflow.redshifts import read_minimal_exptables_columns, \ +from desispec.workflow.redshifts import read_minimal_science_exptab_cols, \ get_ztile_script_pathname, get_ztile_relpath, \ get_ztile_script_suffix from desiutil.log import get_logger @@ -599,7 +599,7 @@ def generate_tile_redshift_scripts(group, night=None, tileid=None, expid=None, e else: log.info(f'Loading production exposure tables for all nights') - exptable = read_minimal_exptables_columns(night) + exptable = read_minimal_science_exptab_cols(night) else: log.info(f'Loading exposure list from {explist}') @@ -656,7 +656,7 @@ def generate_tile_redshift_scripts(group, night=None, tileid=None, expid=None, e # - NOTE: this may not scale well several years into the survey if group == 'cumulative': log.info(f'{len(tileids)} tiles; searching for exposures on prior nights') - allexp = read_minimal_exptables_columns() + allexp = read_minimal_science_exptab_cols() keep = np.in1d(allexp['TILEID'], tileids) exptable = allexp[keep] ## Ensure we only include data for nights up to and including specified nights diff --git a/py/desispec/scripts/update_exptable.py b/py/desispec/scripts/update_exptable.py index 56d9d5224..961a9512a 100644 --- a/py/desispec/scripts/update_exptable.py +++ b/py/desispec/scripts/update_exptable.py @@ -180,7 +180,7 @@ def update_exposure_table(night=None, specprod=None, exp_table_pathname=None, ## Only write out the table at the end and only if dry_run_level dictates if dry_run_level < 3: - write_table(etable, tablename=exp_table_pathname) + write_table(etable, tablename=exp_table_pathname, tabletype='exptable') else: log.info(f"{dry_run_level=}, so not saving exposure table.\n{etable=}") diff --git a/py/desispec/scripts/zproc.py b/py/desispec/scripts/zproc.py index 8311ffcd9..f23760b70 100644 --- a/py/desispec/scripts/zproc.py +++ b/py/desispec/scripts/zproc.py @@ -29,7 +29,7 @@ import desiutil.iers from desispec.io.meta import get_nights_up_to_date -from desispec.workflow.redshifts import read_minimal_exptables_columns, \ +from desispec.workflow.redshifts import read_minimal_science_exptab_cols, \ create_desi_zproc_batch_script #- internal desispec imports @@ -357,8 +357,8 @@ def main(args=None, comm=None): ## Get list of only nights up to date of thrunight nights = get_nights_up_to_date(args.thrunight) - exposure_table = read_minimal_exptables_columns(nights=nights, - tileids=[tileid]) + exposure_table = read_minimal_science_exptab_cols(nights=nights, + tileids=[tileid]) if args.expids is not None: exposure_table = exposure_table[np.isin(exposure_table['EXPID'], args.expids)] diff --git a/py/desispec/scripts/zprocdashboard.py b/py/desispec/scripts/zprocdashboard.py index 13fd83f77..6f00ff398 100644 --- a/py/desispec/scripts/zprocdashboard.py +++ b/py/desispec/scripts/zprocdashboard.py @@ -27,7 +27,7 @@ from desispec.workflow.proctable import get_processing_table_pathname, \ erow_to_prow, instantiate_processing_table from desispec.workflow.tableio import load_table -from desispec.workflow.redshifts import read_minimal_exptables_columns +from desispec.workflow.redshifts import read_minimal_science_exptab_cols from desispec.io.meta import specprod_root, rawdata_root, findfile from desispec.io.util import decode_camword, camword_to_spectros, \ difference_camwords, parse_badamps, create_camword, camword_union, \ @@ -127,7 +127,7 @@ def main(args=None): log.info(f'Searching {prod_dir} for: {nights}') ## Get all the exposure tables for cross-night dependencies - all_exptabs = read_minimal_exptables_columns(nights=None) + all_exptabs = read_minimal_science_exptab_cols(nights=None) ## We don't want future days mixing in all_exptabs = all_exptabs[all_exptabs['NIGHT'] <= np.max(nights)] ## Restrict to only the exptabs relevant to the current dashboard @@ -199,7 +199,7 @@ def populate_night_zinfo(night, doem=True, doqso=True, dotileqa=True, skipd_tileids (list): List of tileids that should be skipped and not listed in the output dashboard. all_exptabs (astropy.table.Table): A stacked exposure table with minimal - columns returned from read_minimal_exptables_columns(). Used for + columns returned from read_minimal_science_exptab_cols(). Used for cumulative redshifts jobs to identify tile data from previous nights. Returns dict: diff --git a/py/desispec/test/test_proc_night.py b/py/desispec/test/test_proc_night.py index 1c0ad4a51..1ec6a638e 100644 --- a/py/desispec/test/test_proc_night.py +++ b/py/desispec/test/test_proc_night.py @@ -403,7 +403,7 @@ def test_proc_night_daily(self): while True: num_newlinks = link_rawdata(self.real_rawnight_dir, self.test_rawnight_dir, numexp=10) - desispec.scripts.tile_redshifts.reset_allexp_cache() + desispec.workflow.redshifts.reset_science_etab_cache() if num_newlinks == 0: break else: diff --git a/py/desispec/workflow/processing.py b/py/desispec/workflow/processing.py index 67c6372c7..bbd302f35 100644 --- a/py/desispec/workflow/processing.py +++ b/py/desispec/workflow/processing.py @@ -16,8 +16,9 @@ from desispec.scripts.link_calibnight import derive_include_exclude from desispec.scripts.tile_redshifts import generate_tile_redshift_scripts from desispec.workflow.redshifts import get_ztile_script_pathname, \ - get_ztile_relpath, \ - get_ztile_script_suffix + get_ztile_relpath, \ + get_ztile_script_suffix, read_minimal_tilenight_proctab_cols, \ + read_minimal_science_exptab_cols from desispec.workflow.queue import get_resubmission_states, update_from_queue, \ queue_info_from_qids, get_queue_states_from_qids, update_queue_state_cache from desispec.workflow.timing import what_night_is_it @@ -1546,23 +1547,59 @@ def submit_redshifts(ptable, prows, tnight, internal_id, queue, reservation, if len(zprows) > 0: for zsubtype in z_submit_types: + log.info(" ") + log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {zprows[0]['TILEID']}.") if zsubtype == 'perexp': for zprow in zprows: - log.info(" ") - log.info(f"Submitting redshift fit of type {zsubtype} for TILEID {zprow['TILEID']} and EXPID {zprow['EXPID']}.\n") + log.info(f"EXPID: {zprow['EXPID']}.\n") redshift_prow = make_redshift_prow([zprow], tnight, descriptor=zsubtype, internal_id=internal_id) internal_id += 1 redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run, strictly_successful=strictly_successful, check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete, system_name=system_name) ptable.add_row(redshift_prow) - else: - log.info(" ") - log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {zprows[0]['TILEID']}.") + elif zsubtype == 'cumulative': + tileids = np.unique([prow['TILEID'] for prow in zprows]) + if len(tileids) > 1: + msg = f"Error, more than one tileid provided for cumulative redshift job: {tileids}" + log.critical(msg) + raise ValueError(msg) + nights = np.unique([prow['NIGHT'] for prow in zprows]) + if len(nights) > 1: + msg = f"Error, more than one night provided for cumulative redshift job: {nights}" + log.critical(msg) + raise ValueError(msg) + tileid, night = tileids[0], nights[0] + ## For cumulative redshifts, get any existing processing rows for tile + matched_prows = read_minimal_tilenight_proctab_cols(tileids=tileids) + matched_prows = matched_prows[matched_prows['NIGHT']<=night] + ## Identify the processing rows that should be assigned as dependecies + ## tnight should be first such that the new job inherits the other metadata from it + tnights = [tnight] + for prow in matched_prows: + if matched_prows['INTID'] != tnight['INTID']: + tnights.append(prow) + log.info(f"Internal Processing IDs: {[prow['INTID'] for prow in tnights]}.\n") + ## Identify all exposures that should go into the fit expids = [prow['EXPID'][0] for prow in zprows] + ## note we can actually get the full list of exposures, but for now + ## we'll stay consistent with old processing where we only list exposures + ## from the current night + ## For cumulative redshifts, get valid expids from exptables + #matched_erows = read_minimal_science_exptab_cols(tileids=tileids) + #matched_erows = matched_erows[matched_erows['NIGHT']<=night] + #expids = list(set([prow['EXPID'][0] for prow in zprows])+set(matched_erows['EXPID'])) log.info(f"Expids: {expids}.\n") - redshift_prow = make_redshift_prow(zprows, tnight, descriptor=zsubtype, internal_id=internal_id) - internal_id += 1 + redshift_prow, internal_id = make_joint_prow(tnights, descriptor=zsubtype, internal_id=internal_id) + redshift_prow['EXPID'] = expids + redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run, + strictly_successful=strictly_successful, check_for_outputs=check_for_outputs, + resubmit_partial_complete=resubmit_partial_complete, system_name=system_name) + ptable.add_row(redshift_prow) + else: # pernight + expids = [prow['EXPID'][0] for prow in zprows] + log.info(f"Expids: {expids}.\n") + redshift_prow, internal_id = make_redshift_prow(zprows, tnight, descriptor=zsubtype, internal_id=internal_id) redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run, strictly_successful=strictly_successful, check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete, system_name=system_name) @@ -1700,6 +1737,7 @@ def make_joint_prow(prows, descriptor, internal_id): dict: Row of a processing table corresponding to the joint fit job. internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used). """ + log = get_logger() first_row = table_row_to_dict(prows[0]) joint_prow = first_row.copy() @@ -1711,38 +1749,41 @@ def make_joint_prow(prows, descriptor, internal_id): joint_prow['SUBMIT_DATE'] = -99 joint_prow['STATUS'] = 'U' joint_prow['SCRIPTNAME'] = '' - joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int) + joint_prow['EXPID'] = np.unique(np.concatenate([currow['EXPID'] for currow in prows])).astype(int) ## Assign the PROCCAMWORD based on the descriptor and the input exposures - if descriptor == 'stdstarfit': - pcamwords = [prow['PROCCAMWORD'] for prow in prows] + ## UPDATE 2024-04-24: badamps are now included in arc/flat joint fits, + ## so grab all PROCCAMWORDs instead of filtering out BADAMP cameras + ## For flats we want any camera that exists in all 12 exposures + ## For arcs we want any camera that exists in at least 3 exposures + pcamwords = [prow['PROCCAMWORD'] for prow in prows] + if descriptor in 'stdstarfit': joint_prow['PROCCAMWORD'] = camword_union(pcamwords, full_spectros_only=True) + elif descriptor in ['pernight', 'cumulative']: + joint_prow['PROCCAMWORD'] = camword_union(pcamwords, + full_spectros_only=False) + elif descriptor == 'nightlyflat': + joint_prow['PROCCAMWORD'] = camword_intersection(pcamwords, + full_spectros_only=False) + elif descriptor == 'psfnight': + ## Count number of exposures each camera is present for + camcheck = {} + for camword in pcamwords: + for cam in decode_camword(camword): + if cam in camcheck: + camcheck[cam] += 1 + else: + camcheck[cam] = 1 + ## if exists in 3 or more exposures, then include it + goodcams = [] + for cam,camcount in camcheck.items(): + if camcount >= 3: + goodcams.append(cam) + joint_prow['PROCCAMWORD'] = create_camword(goodcams) else: - ## UPDATE 2024-04-24: badamps are now included in arc/flat joint fits, - ## so grab all PROCCAMWORDs instead of filtering out BADAMP cameras - pcamwords = [prow['PROCCAMWORD'] for prow in prows] - - ## For flats we want any camera that exists in all 12 exposures - ## For arcs we want any camera that exists in at least 3 exposures - if descriptor == 'nightlyflat': - joint_prow['PROCCAMWORD'] = camword_intersection(pcamwords, - full_spectros_only=False) - elif descriptor == 'psfnight': - ## Count number of exposures each camera is present for - camcheck = {} - for camword in pcamwords: - for cam in decode_camword(camword): - if cam in camcheck: - camcheck[cam] += 1 - else: - camcheck[cam] = 1 - ## if exists in 3 or more exposures, then include it - goodcams = [] - for cam,camcount in camcheck.items(): - if camcount >= 3: - goodcams.append(cam) - joint_prow['PROCCAMWORD'] = create_camword(goodcams) + log.warning("Warning asked to produce joint proc table row for unknown" + + f" job description {descriptor}") joint_prow = assign_dependency(joint_prow, dependency=prows) return joint_prow, internal_id @@ -1787,11 +1828,11 @@ def make_tnight_prow(prows, calibjobs, internal_id): joint_prow['SCRIPTNAME'] = '' joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int) - joint_prow = define_and_assign_dependency(joint_prow,calibjobs,use_tilenight=True) + joint_prow = define_and_assign_dependency(joint_prow, calibjobs, use_tilenight=True) return joint_prow -def make_redshift_prow(prows, tnight, descriptor, internal_id): +def make_redshift_prow(prows, tnights, descriptor, internal_id): """ Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row. It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the @@ -1800,11 +1841,11 @@ def make_redshift_prow(prows, tnight, descriptor, internal_id): Args: prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are the first steps of tilenight. - tnight, Table.Row object. Row corresponding to the tilenight job on which the redshift job depends. + tnights, list or array of Table.Row objects. Rows corresponding to the tilenight jobs on which the redshift job depends. internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used). Returns: - dict: Row of a processing table corresponding to the tilenight job. + dict: Row of a processing table corresponding to the tilenight jobs. """ first_row = table_row_to_dict(prows[0]) redshift_prow = first_row.copy() @@ -1818,7 +1859,7 @@ def make_redshift_prow(prows, tnight, descriptor, internal_id): redshift_prow['SCRIPTNAME'] = '' redshift_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int) - redshift_prow = assign_dependency(redshift_prow,dependency=tnight) + redshift_prow = assign_dependency(redshift_prow,dependency=tnights) return redshift_prow diff --git a/py/desispec/workflow/redshifts.py b/py/desispec/workflow/redshifts.py index 0bae1f731..87cdba43d 100644 --- a/py/desispec/workflow/redshifts.py +++ b/py/desispec/workflow/redshifts.py @@ -10,6 +10,7 @@ import numpy as np from astropy.table import Table, vstack, Column +from desispec.io import findfile from desispec.io.util import parse_cameras, decode_camword from desispec.workflow.desi_proc_funcs import determine_resources from desiutil.log import get_logger @@ -21,7 +22,10 @@ from desispec.workflow import batch from desispec.util import parse_int_args - +# processing table row cache for tilenight selection +_tilenight_ptab_cache = None +# exposure table row cache for science exposure selection +_science_etab_cache = None def get_ztile_relpath(tileid,group,night=None,expid=None): """ @@ -360,13 +364,14 @@ def create_desi_zproc_batch_script(group, return scriptfile -def read_minimal_exptables_columns(nights=None, tileids=None): +def read_minimal_science_exptab_cols(nights=None, tileids=None, reset_cache=False): """ Read exposure tables while handling evolving formats Args: nights (list of int): nights to include (default all nights found) tileids (list of int): tileids to include (default all tiles found) + reset_cache (bool): If true, global cache is cleared Returns exptable with just columns TILEID, NIGHT, EXPID, 'CAMWORD', 'BADCAMWORD', filtered by science @@ -375,12 +380,27 @@ def read_minimal_exptables_columns(nights=None, tileids=None): Note: the returned table is the full pipeline exposures table. It is trimmed to science exposures that have LASTSTEP=='all' """ + global _science_etab_cache log = get_logger() + + ## If requested reset the science exposure table cache + if reset_cache: + reset_science_etab_cache() + + ## If the cache exists, use it speed up the search over tiles and nights + if _science_etab_cache is not None: + log.info(f'Using cached exposure table rows for science selection') + t = _science_etab_cache.copy() + if nights is not None: + t = t[np.isin(t['NIGHT'], nights)] + if tileids is not None: + t = t[np.isin(t['TILEID'], tileids)] + return t + + ## If not cached, then find all the relevant exposure tables and load them if nights is None: - exptab_path = get_exposure_table_path(night=None) - monthglob = '202???' - globname = get_exposure_table_name(night='202?????') - etab_files = glob.glob(os.path.join(exptab_path, monthglob, globname)) + etab_path = findfile('exptable', night='99999999', readonly=True) + etab_files = glob.glob(etab_path.replace('99999999', '202?????')) else: etab_files = list() for night in nights: @@ -393,23 +413,23 @@ def read_minimal_exptables_columns(nights=None, tileids=None): # - these are expected for the daily run, ok log.debug(f"Exposure table missing for night {night}") + ## Load each relevant exposure table file, subselect valid science's and + ## append to the full set etab_files = sorted(etab_files) exptables = list() for etab_file in etab_files: ## correct way but slower and we don't need multivalue columns #t = load_table(etab_file, tabletype='etable') t = Table.read(etab_file, format='ascii.csv') + + ## Subselect only valid science exposures + t = _select_sciences_from_etab(t) + ## For backwards compatibility if BADCAMWORD column does not ## exist then add a blank one if 'BADCAMWORD' not in t.colnames: t.add_column(Table.Column(['' for i in range(len(t))], dtype='S36', name='BADCAMWORD')) - keep = (t['OBSTYPE'] == 'science') & (t['TILEID'] >= 0) - if 'LASTSTEP' in t.colnames: - keep &= (t['LASTSTEP'] == 'all') - if tileids is not None: - # Default false - keep &= np.isin(t['TILEID'], tileids) - t = t[keep] + ## Need to ensure that the string columns are consistent for col in ['CAMWORD', 'BADCAMWORD']: ## Masked arrays need special handling @@ -425,4 +445,207 @@ def read_minimal_exptables_columns(nights=None, tileids=None): t[col] = Table.Column(t[col], dtype='S36', name=col) exptables.append(t['TILEID', 'NIGHT', 'EXPID', 'CAMWORD', 'BADCAMWORD']) - return vstack(exptables) + outtable = vstack(exptables) + + ## If we've loaded all nights, then cache the result + if nights is None: + log.info(f'Caching exposure table rows for science selection') + set_science_etab_cache(outtable.copy()) + + ## If requeted specific tileids, then subselect that + if tileids is not None: + outtable = outtable[np.isin(outtable['TILEID'], tileids)] + + return outtable + +def _select_sciences_from_etab(etab): + """ + takes an exposure table or combination of exposure tables and subselects + valid science jobs. Those that pass selection are returned as a table. + """ + t = etab.copy() + t = t[((t['OBSTYPE'] == 'science') & (t['TILEID'] >= 0))] + if 'LASTSTEP' in t.colnames: + t = t[t['LASTSTEP'] == 'all'] + return t + +def reset_science_etab_cache(): + """ + reset the global cache of science exposure tables stored in var _science_etab_cache + """ + global _science_etab_cache + log = get_logger() + log.info(f'Resetting science exposure table row cache') + _science_etab_cache = None + +def set_science_etab_cache(etab): + """ + sets the global cache of science exposure tables stored in var _science_etab_cache + """ + global _science_etab_cache + log = get_logger() + log.info(f'Assigning science exposure table row cache to new table') + _science_etab_cache = _select_sciences_from_etab(etab) + +def update_science_etab_cache(etab): + """ + updates the global cache of science exposure tables stored in var + _science_etab_cache. + + Notes: this will remove all current entries for any night in the input + """ + global _science_etab_cache + log = get_logger() + ## If the cache doesn't exist, don't update it. + if _science_etab_cache is None: + log.debug(f'Science exptab cache does not exist, so not updating') + return + cleaned_etab = _select_sciences_from_etab(etab) + new_nights = np.unique(cleaned_etab['NIGHT']) + log.info(f'Removing all current entries in science exposure ' + + f'table row cache for nights {new_nights}') + conflicting_entries = np.isin(_science_etab_cache['NIGHT'], new_nights) + log.info(f"Removing {len(conflicting_entries)} rows and adding {len(cleaned_etab)} rows " + + f"to science exposure table row cache.") + keep = np.bitwise_not(conflicting_entries) + _science_etab_cache = _science_etab_cache[keep] + _science_etab_cache = vstack([_science_etab_cache, cleaned_etab]) + + +def read_minimal_tilenight_proctab_cols(nights=None, tileids=None, reset_cache=False): + """ + Read processing tables while handling evolving formats + + Args: + nights (list of int): nights to include (default all nights found) + tileids (list of int): tileids to include (default all tiles found) + reset_cache (bool): If true, global cache is cleared + + Returns exptable with just columns EXPID, TILEID, NIGHT, PROCCAMWORD, + INTID, LATEST_QID + """ + global _tilenight_ptab_cache + log = get_logger() + + ## If requested reset the tilenight processing table cache + if reset_cache: + reset_tilenight_ptab_cache() + + ## If the cache exists, use it speed up the search over tiles and nights + if _tilenight_ptab_cache is not None: + log.info(f'Using cached processing table rows for tilenight selection') + t = _tilenight_ptab_cache.copy() + if nights is not None: + t = t[np.isin(t['NIGHT'], nights)] + if tileids is not None: + t = t[np.isin(t['TILEID'], tileids)] + return t + + ## If not cached, then find all the relevant processing tables and load them + if nights is None: + ptab_path = findfile('proctable', night='99999999', readonly=True) + ptab_files = glob.glob(ptab_path.replace('99999999', '202?????')) + else: + ptab_files = list() + for night in nights: + ptab_file = findfile('proctable', night=night) + if os.path.exists(ptab_file): + ptab_files.append(ptab_file) + elif night >= 20201201: + log.error(f"Processing table missing for night {night}") + else: + # - these are expected for the daily run, ok + log.debug(f"Processing table missing for night {night}") + + ## Load each relevant processing table file, subselect valid tilenight's and + ## append to the full set + ptab_files = sorted(ptab_files) + ptables = list() + for ptab_file in ptab_files: + ## correct way but slower and we don't need multivalue columns + t = load_table(tablename=ptab_file, tabletype='proctable') + t = _select_tilenights_from_ptab(t) + + ## Need to ensure that the string columns are consistent + for col in ['PROCCAMWORD']: + ## Masked arrays need special handling + ## else just reassign with consistent dtype + if isinstance(t[col], Table.MaskedColumn): + ## If compeltely empty it's loaded as type int + ## otherwise fill masked with '' + if t[col].dtype == int: + t[col] = Table.Column(['' for i in range(len(t))], dtype='S36', name=col) + else: + t[col] = Table.Column(t[col].filled(fill_value=''), dtype='S36', name=col) + else: + t[col] = Table.Column(t[col], dtype='S36', name=col) + ptables.append(t['EXPID', 'TILEID', 'NIGHT', 'PROCCAMWORD', + 'INTID', 'LATEST_QID']) + + outtable = vstack(ptables) + ## If we've loaded all nights, then cache the result + if nights is None: + log.info(f'Caching processing table rows for tilenight selection') + set_tilenight_ptab_cache(outtable) + + ## If requested specific tileids, then subselect that + if tileids is not None: + outtable = outtable[np.isin(outtable['TILEID'], tileids)] + + return outtable + +def _select_tilenights_from_ptab(ptab): + """ + takes a processing table or combination of processing tables and subselects + valid tilenight jobs. Those that pass selection are returned as a table. + """ + t = ptab.copy() + t = t[((t['OBSTYPE'] == 'science') & (t['JOBDESC'] == 'tilenight'))] + if 'LASTSTEP' in t.colnames: + t = t[t['LASTSTEP'] == 'all'] + return t + +def reset_tilenight_ptab_cache(): + """ + reset the global cache of tilenight processing tables stored in var _tilenight_ptab_cache + """ + global _tilenight_ptab_cache + log = get_logger() + log.info(f'Resetting processing table row cache for tilenight selection') + _tilenight_ptab_cache = None + +def set_tilenight_ptab_cache(ptab): + """ + sets the global cache of tilenight processing tables stored in var _tilenight_ptab_cache + """ + global _tilenight_ptab_cache + log = get_logger() + log.info(f'Asigning processing table row cache for tilenight selection to new table') + _tilenight_ptab_cache = _select_tilenights_from_ptab(ptab) + _tilenight_ptab_cache.sort(['INTID']) + +def update_tilenight_ptab_cache(ptab): + """ + updates the global cache of tilenight processing tables stored in var + _tilenight_ptab_cache. + + Notes: this will remove all current entries for any night in the input + """ + global _tilenight_ptab_cache + log = get_logger() + ## If the cache doesn't exist, don't update it. + if _tilenight_ptab_cache is None: + log.debug(f'Science exptab cache does not exist, so not updating') + return + cleaned_ptab = _select_tilenights_from_ptab(ptab) + new_nights = np.unique(cleaned_ptab['NIGHT']) + log.info(f'Removing all current entries in processing table tilenight ' + + f'selection cache for nights {new_nights}') + conflicting_entries = np.isin(_tilenight_ptab_cache['NIGHT'], new_nights) + log.info(f"Removing {len(conflicting_entries)} rows and adding " + + f"{len(cleaned_ptab)} rows " + + f"to processing table tilenight cache.") + keep = np.bitwise_not(conflicting_entries) + _tilenight_ptab_cache = _tilenight_ptab_cache[keep] + _tilenight_ptab_cache = vstack([_tilenight_ptab_cache, cleaned_ptab]) + _tilenight_ptab_cache.sort(['INTID']) \ No newline at end of file diff --git a/py/desispec/workflow/science_selection.py b/py/desispec/workflow/science_selection.py index e16b39cc6..04a555796 100644 --- a/py/desispec/workflow/science_selection.py +++ b/py/desispec/workflow/science_selection.py @@ -17,7 +17,7 @@ from desispec.scripts.tile_redshifts import generate_tile_redshift_scripts from desispec.workflow.redshifts import get_ztile_script_pathname, \ get_ztile_relpath, \ - get_ztile_script_suffix, read_minimal_exptables_columns + get_ztile_script_suffix, read_minimal_science_exptab_cols from desispec.workflow.queue import get_resubmission_states, update_from_queue, queue_info_from_qids from desispec.workflow.timing import what_night_is_it from desispec.workflow.desi_proc_funcs import get_desi_proc_batch_file_pathname, \ @@ -237,7 +237,7 @@ def get_tiles_cumulative(sci_etable, z_submit_types, all_cumulatives, night): tiles_cumulative = list(tiles_this_night) log.info(f'Submitting cumulative redshifts for all tiles: {tiles_cumulative}') else: - allexp = read_minimal_exptables_columns(tileids=tiles_this_night) + allexp = read_minimal_science_exptab_cols(tileids=tiles_this_night) for tileid in tiles_this_night: nights_with_tile = allexp['NIGHT'][allexp['TILEID'] == tileid] if len(nights_with_tile) > 0 and night == np.max(nights_with_tile): diff --git a/py/desispec/workflow/tableio.py b/py/desispec/workflow/tableio.py index 042dca835..89b6a0065 100644 --- a/py/desispec/workflow/tableio.py +++ b/py/desispec/workflow/tableio.py @@ -7,7 +7,8 @@ import numpy as np from astropy.table import Table - +from desispec.workflow.redshifts import update_science_etab_cache, \ + update_tilenight_ptab_cache ################################################### ################ Table Functions ################# ################################################### @@ -203,6 +204,12 @@ def write_table(origtable, tablename=None, tabletype=None, joinsymb='|', overwri if verbose: log.info("Written table: ", table.info) + if tabletype is not None: + if tabletype == 'exptable': + update_science_etab_cache(origtable) + elif tabletype == 'proctable': + update_tilenight_ptab_cache(origtable) + def standardize_tabletype(tabletype): """ Given the user defined type of table it returns the proper 'tabletype' expected by the pipeline