Skip to content

Commit

Permalink
add proctable concatenation cache, update exptable cache, and add cro…
Browse files Browse the repository at this point in the history
…ss-night redshift dependencies
  • Loading branch information
akremin committed Jul 29, 2024
1 parent 71dfe82 commit a27a5fd
Show file tree
Hide file tree
Showing 12 changed files with 392 additions and 100 deletions.
19 changes: 15 additions & 4 deletions py/desispec/scripts/proc_night.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from desispec.scripts.link_calibnight import derive_include_exclude
from desispec.workflow.calibration_selection import \
determine_calibrations_to_proc
from desispec.workflow.redshifts import read_minimal_science_exptab_cols, \
read_minimal_tilenight_proctab_cols
from desispec.workflow.science_selection import determine_science_to_proc, \
get_tiles_cumulative
from desiutil.log import get_logger
Expand Down Expand Up @@ -336,6 +338,15 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
etable, ptable = load_tables(tablenames=table_pathnames, tabletypes=table_types)
full_etable = etable.copy()

## Pre-populate exposure table and processing table caches of all nights
## if doing cross-night redshifts
if 'cumulative' in z_submit_types:
## this shouldn't need to change since we've already updated the exptab
read_minimal_science_exptab_cols()
## this would become out of date for the current night except
## write_table will keep it up to date
read_minimal_tilenight_proctab_cols()

## Cut on OBSTYPES
log.info(f"Processing the following obstypes: {proc_obstypes}")
good_types = np.isin(np.array(etable['OBSTYPE']).astype(str), proc_obstypes)
Expand All @@ -346,7 +357,7 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
if tableng > 0:
ptable = update_from_queue(ptable, dry_run=dry_run_level)
if dry_run_level < 3:
write_table(ptable, tablename=proc_table_pathname)
write_table(ptable, tablename=proc_table_pathname, tabletype='proctable')
if any_jobs_failed(ptable['STATUS']):
## Try up to two times to resubmit failures, afterwards give up
## unless explicitly told to proceed with the failures
Expand Down Expand Up @@ -472,7 +483,7 @@ def create_submit_add_and_save(prow, proctable, check_outputs=check_for_outputs,
## Add the processing row to the processing table
proctable.add_row(prow)
if len(proctable) > 0 and dry_run_level < 3:
write_table(proctable, tablename=proc_table_pathname)
write_table(proctable, tablename=proc_table_pathname, tabletype='proctable')
sleep_and_report(sub_wait_time,
message_suffix=f"to slow down the queue submission rate",
dry_run=dry_run, logfunc=log.info)
Expand Down Expand Up @@ -559,7 +570,7 @@ def create_submit_add_and_save(prow, proctable, check_outputs=check_for_outputs,
extra_job_args=extra_job_args)

if len(ptable) > 0 and dry_run_level < 3:
write_table(ptable, tablename=proc_table_pathname)
write_table(ptable, tablename=proc_table_pathname, tabletype='proctable')

sleep_and_report(sub_wait_time,
message_suffix=f"to slow down the queue submission rate",
Expand All @@ -575,7 +586,7 @@ def create_submit_add_and_save(prow, proctable, check_outputs=check_for_outputs,
## All jobs now submitted, update information from job queue and save
ptable = update_from_queue(ptable, dry_run=dry_run_level)
if dry_run_level < 3:
write_table(ptable, tablename=proc_table_pathname)
write_table(ptable, tablename=proc_table_pathname, tabletype='proctable')
## Now that processing is complete, lets identify what we didn't process
if len(ptable) > 0:
processed = np.isin(full_etable['EXPID'], np.unique(np.concatenate(ptable['EXPID'])))
Expand Down
4 changes: 2 additions & 2 deletions py/desispec/scripts/submit_night.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
checkfor_and_submit_joint_job, submit_tilenight_and_redshifts
from desispec.workflow.queue import update_from_queue, any_jobs_not_complete
from desispec.workflow.desi_proc_funcs import get_desi_proc_batch_file_path
from desispec.workflow.redshifts import read_minimal_exptables_columns
from desispec.workflow.redshifts import read_minimal_science_exptab_cols
from desispec.io.util import decode_camword, difference_camwords, create_camword

def submit_night(night, proc_obstypes=None, z_submit_types=None, queue='realtime',
Expand Down Expand Up @@ -245,7 +245,7 @@ def submit_night(night, proc_obstypes=None, z_submit_types=None, queue='realtime
tiles_cumulative = list(tiles_this_night)
log.info(f'Submitting cumulative redshifts for all tiles: {tiles_cumulative}')
else:
allexp = read_minimal_exptables_columns(tileids=tiles_this_night)
allexp = read_minimal_science_exptab_cols(tileids=tiles_this_night)
for tileid in tiles_this_night:
nights_with_tile = allexp['NIGHT'][allexp['TILEID'] == tileid]
if len(nights_with_tile) > 0 and night == np.max(nights_with_tile):
Expand Down
60 changes: 35 additions & 25 deletions py/desispec/scripts/tile_redshifts.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from astropy.table import Table, vstack

from desispec.io.util import parse_cameras
from desispec.workflow.redshifts import read_minimal_exptables_columns, \
from desispec.workflow.redshifts import read_minimal_science_exptab_cols, \
create_desi_zproc_batch_script
from desiutil.log import get_logger

Expand Down Expand Up @@ -60,17 +60,6 @@ def main(args=None):
num_error = len(failed_jobs)
sys.exit(num_error)

# _allexp is cache of all exposure tables stacked so that we don't have to read all
# of them every time we call generate_tile_redshift_scripts()
_allexp = None

def reset_allexp_cache():
"""
Utility script to reset the _allexp cache to ensure it is re-read from disk
"""
global _allexp
_allexp = None

def generate_tile_redshift_scripts(group, nights=None, tileid=None, expids=None, explist=None,
camword=None, max_gpuprocs=None, no_gpu=False,
run_zmtl=False, no_afterburners=False,
Expand Down Expand Up @@ -126,7 +115,7 @@ def generate_tile_redshift_scripts(group, nights=None, tileid=None, expids=None,
else:
log.info(f'Loading production exposure tables for all nights')

exptable = read_minimal_exptables_columns(nights)
exptable = read_minimal_science_exptab_cols(nights)

else:
log.info(f'Loading exposure list from {explist}')
Expand Down Expand Up @@ -190,28 +179,49 @@ def generate_tile_redshift_scripts(group, nights=None, tileid=None, expids=None,
# - NOTE: depending upon options, this might re-read all the exptables again
# - NOTE: this may not scale well several years into the survey
if group == 'cumulative':
if nights is not None:
lastnight = int(np.max(nights))
elif exptable is not None:
lastnight = int(np.max(exptable['NIGHT']))
else:
lastnight = None
log.info(f'{len(tileids)} tiles; searching for exposures on prior nights')
global _allexp
if _allexp is None:
log.info(f'Reading all exposure_tables from all nights')
_allexp = read_minimal_exptables_columns()
keep = np.in1d(_allexp['TILEID'], tileids)
newexptable = _allexp[keep]
log.info(f'Reading all exposure_tables from all nights')
newexptable = read_minimal_science_exptab_cols(tileids=tileids)
newexptable = newexptable[['EXPID', 'NIGHT', 'TILEID']]

if exptable is not None:
expids = exptable['EXPID']
missing_exps = np.in1d(expids, newexptable['EXPID'], invert=True)
if np.any(missing_exps):
latest_exptable = read_minimal_exptables_columns(nights=np.unique(exptable['NIGHT'][missing_exps]))
keep = np.in1d(latest_exptable['EXPID'], expids[missing_exps])
latest_exptable = latest_exptable[keep]
newexptable = vstack([newexptable, latest_exptable])
log.warning(f'Identified {np.sum(missing_exps)} missing exposures '
+ f'in the exposure cache. Resetting the cache to acquire'
+ f' them from all nights')
## reset_cache will remove cache but it won't be repopulated
## unless we request all nights. So let's request all nights
## then subselect to the nights we want
latest_exptable = read_minimal_science_exptab_cols(tileids=tileids,
reset_cache=True)
latest_exptable = latest_exptable[['EXPID', 'NIGHT', 'TILEID']]
missing_exps = np.in1d(expids, newexptable['EXPID'], invert=True)
if np.any(missing_exps):
log.error(f'Identified {np.sum(missing_exps)} missing exposures '
+ f'in the exposure cache even after updating. Using the '
+ f'appending the user provided exposures but this may '
+ f'indicate a problem.')
newexptable = vstack([latest_exptable, exptable[missing_exps]])
else:
newexptable = latest_exptable

newexptable.sort(['EXPID'])
exptable = newexptable

## Ensure we only include data for nights up to and including specified nights
if nights is not None:
lastnight = int(np.max(nights))
if lastnight is not None:
log.info(f'Selecting only those exposures on nights before or '
+ f'during the latest night provided: {lastnight}')
exptable = exptable[exptable['NIGHT'] <= lastnight]

#expids = np.array(exptable['EXPID'])
tileids = np.unique(np.array(exptable['TILEID']))

Expand Down
6 changes: 3 additions & 3 deletions py/desispec/scripts/tile_redshifts_bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import numpy as np
from astropy.table import Table, vstack

from desispec.workflow.redshifts import read_minimal_exptables_columns, \
from desispec.workflow.redshifts import read_minimal_science_exptab_cols, \
get_ztile_script_pathname, get_ztile_relpath, \
get_ztile_script_suffix
from desiutil.log import get_logger
Expand Down Expand Up @@ -599,7 +599,7 @@ def generate_tile_redshift_scripts(group, night=None, tileid=None, expid=None, e
else:
log.info(f'Loading production exposure tables for all nights')

exptable = read_minimal_exptables_columns(night)
exptable = read_minimal_science_exptab_cols(night)

else:
log.info(f'Loading exposure list from {explist}')
Expand Down Expand Up @@ -656,7 +656,7 @@ def generate_tile_redshift_scripts(group, night=None, tileid=None, expid=None, e
# - NOTE: this may not scale well several years into the survey
if group == 'cumulative':
log.info(f'{len(tileids)} tiles; searching for exposures on prior nights')
allexp = read_minimal_exptables_columns()
allexp = read_minimal_science_exptab_cols()
keep = np.in1d(allexp['TILEID'], tileids)
exptable = allexp[keep]
## Ensure we only include data for nights up to and including specified nights
Expand Down
2 changes: 1 addition & 1 deletion py/desispec/scripts/update_exptable.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def update_exposure_table(night=None, specprod=None, exp_table_pathname=None,

## Only write out the table at the end and only if dry_run_level dictates
if dry_run_level < 3:
write_table(etable, tablename=exp_table_pathname)
write_table(etable, tablename=exp_table_pathname, tabletype='exptable')
else:
log.info(f"{dry_run_level=}, so not saving exposure table.\n{etable=}")

Expand Down
6 changes: 3 additions & 3 deletions py/desispec/scripts/zproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import desiutil.iers

from desispec.io.meta import get_nights_up_to_date
from desispec.workflow.redshifts import read_minimal_exptables_columns, \
from desispec.workflow.redshifts import read_minimal_science_exptab_cols, \
create_desi_zproc_batch_script

#- internal desispec imports
Expand Down Expand Up @@ -357,8 +357,8 @@ def main(args=None, comm=None):
## Get list of only nights up to date of thrunight
nights = get_nights_up_to_date(args.thrunight)

exposure_table = read_minimal_exptables_columns(nights=nights,
tileids=[tileid])
exposure_table = read_minimal_science_exptab_cols(nights=nights,
tileids=[tileid])
if args.expids is not None:
exposure_table = exposure_table[np.isin(exposure_table['EXPID'],
args.expids)]
Expand Down
6 changes: 3 additions & 3 deletions py/desispec/scripts/zprocdashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from desispec.workflow.proctable import get_processing_table_pathname, \
erow_to_prow, instantiate_processing_table
from desispec.workflow.tableio import load_table
from desispec.workflow.redshifts import read_minimal_exptables_columns
from desispec.workflow.redshifts import read_minimal_science_exptab_cols
from desispec.io.meta import specprod_root, rawdata_root, findfile
from desispec.io.util import decode_camword, camword_to_spectros, \
difference_camwords, parse_badamps, create_camword, camword_union, \
Expand Down Expand Up @@ -127,7 +127,7 @@ def main(args=None):
log.info(f'Searching {prod_dir} for: {nights}')

## Get all the exposure tables for cross-night dependencies
all_exptabs = read_minimal_exptables_columns(nights=None)
all_exptabs = read_minimal_science_exptab_cols(nights=None)
## We don't want future days mixing in
all_exptabs = all_exptabs[all_exptabs['NIGHT'] <= np.max(nights)]
## Restrict to only the exptabs relevant to the current dashboard
Expand Down Expand Up @@ -199,7 +199,7 @@ def populate_night_zinfo(night, doem=True, doqso=True, dotileqa=True,
skipd_tileids (list): List of tileids that should be skipped and not
listed in the output dashboard.
all_exptabs (astropy.table.Table): A stacked exposure table with minimal
columns returned from read_minimal_exptables_columns(). Used for
columns returned from read_minimal_science_exptab_cols(). Used for
cumulative redshifts jobs to identify tile data from previous nights.
Returns dict:
Expand Down
2 changes: 1 addition & 1 deletion py/desispec/test/test_proc_night.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ def test_proc_night_daily(self):

while True:
num_newlinks = link_rawdata(self.real_rawnight_dir, self.test_rawnight_dir, numexp=10)
desispec.scripts.tile_redshifts.reset_allexp_cache()
desispec.workflow.redshifts.reset_science_etab_cache()
if num_newlinks == 0:
break
else:
Expand Down
Loading

0 comments on commit a27a5fd

Please sign in to comment.