Skip to content

Commit

Permalink
Merge pull request #2348 from desihub/kibo_pipe_tweaks
Browse files Browse the repository at this point in the history
Minor Kibo workflow fixes
  • Loading branch information
sbailey committed Aug 27, 2024
2 parents 019d6fa + c38e46f commit fdbd151
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 8 deletions.
14 changes: 14 additions & 0 deletions py/desispec/scripts/proc_night.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,13 @@ def submit_calibrations(cal_etable, ptable, cal_override, calibjobs, int_id,
arc_prows = []
for arc_erow in arcs:
if arc_erow['EXPID'] in processed_cal_expids:
matches = np.where([arc_erow['EXPID'] in itterprow['EXPID']
for itterprow in ptable])[0]
if len(matches) == 1:
prow = ptable[matches[0]]
log.info("Found existing arc prow in ptable, "
+ f"including it for psfnight job: {list(prow)}")
arc_prows.append(prow)
continue
prow, int_id = make_exposure_prow(arc_erow, int_id, calibjobs)
prow, ptable = create_submit_add_and_save(prow, ptable)
Expand All @@ -775,6 +782,13 @@ def submit_calibrations(cal_etable, ptable, cal_override, calibjobs, int_id,
flat_prows = []
for flat_erow in flats:
if flat_erow['EXPID'] in processed_cal_expids:
matches = np.where([flat_erow['EXPID'] in itterprow['EXPID']
for itterprow in ptable])[0]
if len(matches) == 1:
prow = ptable[matches[0]]
log.info("Found existing flat prow in ptable, "
+ f"including it for nightlyflat job: {list(prow)}")
flat_prows.append(prow)
continue

jobdesc = 'flat'
Expand Down
15 changes: 10 additions & 5 deletions py/desispec/scripts/submit_prod.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,17 +245,22 @@ def submit_production(production_yaml, queue_threshold=4500, dry_run_level=False
all_nights = sorted(all_nights)
log.info(f"Processing {all_nights=}")
for night in sorted(all_nights):
## If proctable exists, assume we've already completed that night
if os.path.exists(findfile('proctable', night=night, readonly=True)):
skipped_nights.append(night)
log.info(f"{night=} already has a proctable, skipping.")
continue

## If the queue is too full, stop submitting nights
num_in_queue = check_queue_count(user=user, include_scron=False,
dry_run_level=dry_run_level)
## In Jura the largest night had 115 jobs, to be conservative say 200 by default
## In Jura the largest night had 115 jobs, to be conservative we submit
## up to 4500 jobs (out of a 5000 limit) by default
if num_in_queue > queue_threshold:
log.info(f"{num_in_queue} jobs in the queue > {queue_threshold},"
+ " so stopping the job submissions.")
break
if os.path.exists(findfile('proctable', night=night, readonly=True)):
skipped_nights.append(night)
log.info(f"{night=} already has a proctable, skipping.")
continue

## We don't expect exposure tables to change during code execution here
## but we do expect processing tables to evolve, so clear that cache
log.info(f"Processing {night=}")
Expand Down
2 changes: 1 addition & 1 deletion py/desispec/workflow/calibration_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def determine_calibrations_to_proc(etable, do_cte_flats=True,
+ f"more information is known.")
return etable[[]]
else:
log.error(f"Only found {Counter(exptypes)} calibrations "
log.warning(f"Only found {Counter(exptypes)} calibrations "
+ "and not acquiring new data, so this may be fatal "
+ "if you aren't using an override file.")

Expand Down
26 changes: 24 additions & 2 deletions py/desispec/workflow/proctable.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,21 @@ def read_minimal_tilenight_proctab_cols(nights=None, tileids=None,
## Load each relevant processing table file, subselect valid tilenight's and
## append to the full set
ptab_files = sorted(ptab_files)
## Less intrusive logging of files we're reading in
if len(ptab_files) > 0:
dirname = os.path.dirname(ptab_files[0])
shortnames = [fil.replace(dirname+"/", '') for fil in ptab_files]
else:
dirname = ''
shortnames = []
log.info(f"Loading the following processing tables for tilenight processing"
+ f" table cache from directory: {dirname}, filenames: {shortnames}")

ptables = list()
for ptab_file in ptab_files:
## correct way but slower and we don't need multivalue columns
t = load_table(tablename=ptab_file, tabletype='proctable')
t = load_table(tablename=ptab_file, tabletype='proctable',
suppress_logging=True)
t = _select_tilenights_from_ptab(t)

## Need to ensure that the string columns are consistent
Expand Down Expand Up @@ -626,10 +637,21 @@ def read_minimal_full_proctab_cols(nights=None, tileids=None,
## Load each relevant processing table file, subselect valid tilenight's and
## append to the full set
ptab_files = sorted(ptab_files)
## Less intrusive logging of files we're reading in
if len(ptab_files) > 0:
dirname = os.path.dirname(ptab_files[0])
shortnames = [fil.replace(dirname+"/", '') for fil in ptab_files]
else:
dirname = ''
shortnames = []
log.info(f"Loading the following processing tables for full processing "
+ f"table cache from directory: {dirname}, filenames: {shortnames}")

ptables = list()
for ptab_file in ptab_files:
## correct way but slower and we don't need multivalue columns
t = load_table(tablename=ptab_file, tabletype='proctable')
t = load_table(tablename=ptab_file, tabletype='proctable',
suppress_logging=True)

## Need to ensure that the string columns are consistent
for col in ['PROCCAMWORD']:
Expand Down

0 comments on commit fdbd151

Please sign in to comment.