Skip to content

Commit

Permalink
move prod night parsing to a func
Browse files Browse the repository at this point in the history
  • Loading branch information
akremin committed Aug 14, 2024
1 parent 59f95e6 commit c314650
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 43 deletions.
5 changes: 4 additions & 1 deletion bin/desi_submit_prod
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ def parse_args(): # options=None):
"""
parser = argparse.ArgumentParser(description="Submit a full production run of the DESI data pipeline for processing.")

parser.add_argument("--production-yaml", type=str, required=True,
parser.add_argument("-p", "--production-yaml", type=str, required=True,
help="Relative or absolute pathname to the yaml file summarizing the production.")
parser.add_argument("-p", "--queue-threshold", type=int, default=4800,
help="The number of jobs for the current user in the queue at which the"
+ " at which the script stops submitting new jobs.")

# Code Flags
parser.add_argument("--dry-run-level", type=int, default=0,
Expand Down
119 changes: 77 additions & 42 deletions py/desispec/scripts/submit_prod.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
from desispec.workflow.queue import check_queue_count


def get_all_nights(first_night, last_night):
def get_nights_in_date_range(first_night, last_night):
"""
Returns a full list of all nights that have an exposure table
exposure
Inputs:
Args:
first_night, int. First night to include (inclusive).
last_night, int. Last night to include (inclusive).
Expand All @@ -52,7 +52,7 @@ def get_all_valid_nights(first_night, last_night):
Returns a full list of all nights that have at least one valid science
exposure
Inputs:
Args:
first_night, int. First night to include (inclusive).
last_night, int. Last night to include (inclusive).
Expand All @@ -65,14 +65,80 @@ def get_all_valid_nights(first_night, last_night):
nights = nights[((nights>=first_night)&(nights<=last_night))]
return nights

def get_nights_to_process(production_yaml, verbose=False):
"""
Derives the nights to be processed based on a production yaml file and
returns a list of int nights.
Args:
production_yaml (str or dict): Production yaml or pathname of the
yaml file that defines the production.
verbose (bool): Whether to be verbose in log outputs.
Returns:
nights, list. A list of nights on or after Jan 1 2020 in which data exists at NERSC.
"""
log = get_logger()
## If production_yaml not loaded, load the file
if isinstance(production_yaml, str):
if not os.path.exists(production_yaml):
raise IOError(f"Prod yaml file doesn't exist: {production_yaml} not found.")
with open(production_yaml, 'rb') as yamlfile:
config = yaml.safe_load(yamlfile)
else:
config = production_yaml

all_nights, first_night = None, None
if 'NIGHTS' in config and 'LAST_NIGHT' in config:
log.error(f"Both NIGHTS and LAST_NIGHT specified. Using NIGHTS "
+ f"and ignoring LAST_NIGHT.")
if 'NIGHTS' in config:
all_nights = np.array(list(config['NIGHTS'])).astype(int)
if verbose:
log.info(f"Setting all_nights to NIGHTS: {all_nights}")
log.info("Setting first_night to earliest night in NIGHTS:"
+ f" {np.min(all_nights)}")
first_night = np.min(all_nights)
if verbose:
log.info("Setting last_night to latest night in NIGHTS: "
+ f"{np.max(all_nights)}")
last_night = np.max(all_nights)
elif 'LAST_NIGHT' in config:
last_night = int(config['LAST_NIGHT'])
if verbose:
log.info(f"Setting last_night to LATEST_NIGHT: {last_night}")
else:
raise ValueError("Either NIGHT or LAST_NIGHT required in yaml "
+ f"file {production_yaml}")

if first_night is None:
if 'FIRST_NIGHT' in config:
first_night = int(config['FIRST_NIGHT'])
if verbose:
log.info(f"Setting first_night to FIRST_NIGHT: {first_night}")
else:
if verbose:
log.info("Setting first_night to earliest in a normal prod: 20201214")
first_night = 20201214

if all_nights is None:
# all_nights = get_nights_in_date_range(first_night, last_night)
if verbose:
log.info("Populating all_nights with all of the nights with valid science "
+ f"exposures between {first_night} and {last_night} inclusive")
all_nights = get_all_valid_nights(first_night, last_night)
return sorted(all_nights)


def submit_production(production_yaml, dry_run_level=False):
def submit_production(production_yaml, queue_threshold=4800, dry_run_level=False):
"""
Interprets a production_yaml file and submits the respective nights for processing
within the defined production.
Args:
production_yaml (str): Pathname of the yaml file that defines the production.
queue_threshold (int): The number of jobs for the current user in the queue
at which the script stops submitting new jobs.
dry_run_level (int, optional): Default is 0. Should the jobs written to the processing table actually be submitted
for processing. This is passed directly to desi_proc_night.
Expand Down Expand Up @@ -103,47 +169,15 @@ def submit_production(production_yaml, dry_run_level=False):
return 0

## Load the nights to process
all_nights, first_night = None, None
if 'NIGHTS' in conf and 'LAST_NIGHT' in conf:
log.error(f"Both NIGHTS and LAST_NIGHT specified. Using NIGHTS "
+ f"and ignoring LAST_NIGHT.")
if 'NIGHTS' in conf:
all_nights = np.array(list(conf['NIGHTS'])).astype(int)
log.info(f"Setting all_nights to NIGHTS: {all_nights}")
log.info("Setting first_night to earliest night in NIGHTS:"
+ f" {np.min(all_nights)}")
first_night = np.min(all_nights)
log.info("Setting last_night to latest night in NIGHTS: "
+ f"{np.max(all_nights)}")
last_night = np.max(all_nights)
elif 'LAST_NIGHT' in conf:
last_night = int(conf['LAST_NIGHT'])
log.info(f"Setting last_night to LATEST_NIGHT: {last_night}")
else:
raise ValueError("Either NIGHT or LAST_NIGHT required in yaml "
+ f"file {production_yaml}")

if first_night is None:
if 'FIRST_NIGHT' in conf:
first_night = int(conf['FIRST_NIGHT'])
log.info(f"Setting first_night to FIRST_NIGHT: {first_night}")
else:
log.info("Setting first_night to earliest in a normal prod: 20201214")
first_night = 20201214

if all_nights is None:
# all_nights = get_all_nights(first_night, last_night)
log.info("Populating all_nights with all of the nights with valid science "
+ f"exposures between {first_night} and {last_night} inclusive")
all_nights = get_all_valid_nights(first_night, last_night)
all_nights = get_nights_to_process(production_yaml=conf, verbose=True)

## Load the other parameters for running desi_proc_night
if 'THRU_NIGHT' in conf:
thru_night = int(conf['THRU_NIGHT'])
log.info(f"Setting thru_night to THRU_NIGHT: {thru_night}")
else:
thru_night = last_night
log.warning(f"Setting thru_night to last_night: {thru_night}")
thru_night = np.max(all_nights)
log.warning(f"Setting thru_night to last night: {thru_night}")

## If not specified, run "cumulative" redshifts, otherwise do
## as directed
Expand Down Expand Up @@ -207,9 +241,10 @@ def submit_production(production_yaml, dry_run_level=False):
for night in sorted(all_nights):
num_in_queue = check_queue_count(user=user, include_scron=False,
dry_run_level=dry_run_level)
## In Jura the largest night had 115 jobs, to be conservative say 200
if num_in_queue > 4800:
log.info(f"{num_in_queue} jobs in the queue, so stopping the job submissions.")
## In Jura the largest night had 115 jobs, to be conservative say 200 by default
if num_in_queue > queue_threshold:
log.info(f"{num_in_queue} jobs in the queue > {queue_threshold},"
+ " so stopping the job submissions.")
break
if os.path.exists(findfile('proctable', night=night, readonly=True)):
skipped_nights.append(night)
Expand Down
5 changes: 5 additions & 0 deletions py/desispec/workflow/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,11 @@ def get_jobs_in_queue(user=None, include_scron=False, dry_run_level=0):
for col in queue_info_table.colnames:
queue_info_table.rename_column(col, col.upper())

## If the table is empty, return it immediately, otherwise perform
## sanity check and cuts
if len(queue_info_table) == 0:
return queue_info_table

if np.any(queue_info_table['USER']!=user):
msg = f"Warning {np.sum(queue_info_table['USER']!=user)} " \
+ f"jobs returned were not {user=}\n" \
Expand Down

0 comments on commit c314650

Please sign in to comment.