From c314650cd3d1ceddc785b7856816245cc89d8521 Mon Sep 17 00:00:00 2001 From: kremin Date: Wed, 14 Aug 2024 16:04:15 -0700 Subject: [PATCH] move prod night parsing to a func --- bin/desi_submit_prod | 5 +- py/desispec/scripts/submit_prod.py | 119 +++++++++++++++++++---------- py/desispec/workflow/queue.py | 5 ++ 3 files changed, 86 insertions(+), 43 deletions(-) diff --git a/bin/desi_submit_prod b/bin/desi_submit_prod index 460d3dd56..9c3102a92 100755 --- a/bin/desi_submit_prod +++ b/bin/desi_submit_prod @@ -12,8 +12,11 @@ def parse_args(): # options=None): """ parser = argparse.ArgumentParser(description="Submit a full production run of the DESI data pipeline for processing.") - parser.add_argument("--production-yaml", type=str, required=True, + parser.add_argument("-p", "--production-yaml", type=str, required=True, help="Relative or absolute pathname to the yaml file summarizing the production.") + parser.add_argument("-p", "--queue-threshold", type=int, default=4800, + help="The number of jobs for the current user in the queue at which the" + + " at which the script stops submitting new jobs.") # Code Flags parser.add_argument("--dry-run-level", type=int, default=0, diff --git a/py/desispec/scripts/submit_prod.py b/py/desispec/scripts/submit_prod.py index 253aefcdd..eef83700e 100644 --- a/py/desispec/scripts/submit_prod.py +++ b/py/desispec/scripts/submit_prod.py @@ -22,12 +22,12 @@ from desispec.workflow.queue import check_queue_count -def get_all_nights(first_night, last_night): +def get_nights_in_date_range(first_night, last_night): """ Returns a full list of all nights that have an exposure table exposure - Inputs: + Args: first_night, int. First night to include (inclusive). last_night, int. Last night to include (inclusive). @@ -52,7 +52,7 @@ def get_all_valid_nights(first_night, last_night): Returns a full list of all nights that have at least one valid science exposure - Inputs: + Args: first_night, int. First night to include (inclusive). last_night, int. Last night to include (inclusive). @@ -65,14 +65,80 @@ def get_all_valid_nights(first_night, last_night): nights = nights[((nights>=first_night)&(nights<=last_night))] return nights +def get_nights_to_process(production_yaml, verbose=False): + """ + Derives the nights to be processed based on a production yaml file and + returns a list of int nights. + + Args: + production_yaml (str or dict): Production yaml or pathname of the + yaml file that defines the production. + verbose (bool): Whether to be verbose in log outputs. + + Returns: + nights, list. A list of nights on or after Jan 1 2020 in which data exists at NERSC. + """ + log = get_logger() + ## If production_yaml not loaded, load the file + if isinstance(production_yaml, str): + if not os.path.exists(production_yaml): + raise IOError(f"Prod yaml file doesn't exist: {production_yaml} not found.") + with open(production_yaml, 'rb') as yamlfile: + config = yaml.safe_load(yamlfile) + else: + config = production_yaml + + all_nights, first_night = None, None + if 'NIGHTS' in config and 'LAST_NIGHT' in config: + log.error(f"Both NIGHTS and LAST_NIGHT specified. Using NIGHTS " + + f"and ignoring LAST_NIGHT.") + if 'NIGHTS' in config: + all_nights = np.array(list(config['NIGHTS'])).astype(int) + if verbose: + log.info(f"Setting all_nights to NIGHTS: {all_nights}") + log.info("Setting first_night to earliest night in NIGHTS:" + + f" {np.min(all_nights)}") + first_night = np.min(all_nights) + if verbose: + log.info("Setting last_night to latest night in NIGHTS: " + + f"{np.max(all_nights)}") + last_night = np.max(all_nights) + elif 'LAST_NIGHT' in config: + last_night = int(config['LAST_NIGHT']) + if verbose: + log.info(f"Setting last_night to LATEST_NIGHT: {last_night}") + else: + raise ValueError("Either NIGHT or LAST_NIGHT required in yaml " + + f"file {production_yaml}") + + if first_night is None: + if 'FIRST_NIGHT' in config: + first_night = int(config['FIRST_NIGHT']) + if verbose: + log.info(f"Setting first_night to FIRST_NIGHT: {first_night}") + else: + if verbose: + log.info("Setting first_night to earliest in a normal prod: 20201214") + first_night = 20201214 + + if all_nights is None: + # all_nights = get_nights_in_date_range(first_night, last_night) + if verbose: + log.info("Populating all_nights with all of the nights with valid science " + + f"exposures between {first_night} and {last_night} inclusive") + all_nights = get_all_valid_nights(first_night, last_night) + return sorted(all_nights) + -def submit_production(production_yaml, dry_run_level=False): +def submit_production(production_yaml, queue_threshold=4800, dry_run_level=False): """ Interprets a production_yaml file and submits the respective nights for processing within the defined production. Args: production_yaml (str): Pathname of the yaml file that defines the production. + queue_threshold (int): The number of jobs for the current user in the queue + at which the script stops submitting new jobs. dry_run_level (int, optional): Default is 0. Should the jobs written to the processing table actually be submitted for processing. This is passed directly to desi_proc_night. @@ -103,47 +169,15 @@ def submit_production(production_yaml, dry_run_level=False): return 0 ## Load the nights to process - all_nights, first_night = None, None - if 'NIGHTS' in conf and 'LAST_NIGHT' in conf: - log.error(f"Both NIGHTS and LAST_NIGHT specified. Using NIGHTS " - + f"and ignoring LAST_NIGHT.") - if 'NIGHTS' in conf: - all_nights = np.array(list(conf['NIGHTS'])).astype(int) - log.info(f"Setting all_nights to NIGHTS: {all_nights}") - log.info("Setting first_night to earliest night in NIGHTS:" - + f" {np.min(all_nights)}") - first_night = np.min(all_nights) - log.info("Setting last_night to latest night in NIGHTS: " - + f"{np.max(all_nights)}") - last_night = np.max(all_nights) - elif 'LAST_NIGHT' in conf: - last_night = int(conf['LAST_NIGHT']) - log.info(f"Setting last_night to LATEST_NIGHT: {last_night}") - else: - raise ValueError("Either NIGHT or LAST_NIGHT required in yaml " - + f"file {production_yaml}") - - if first_night is None: - if 'FIRST_NIGHT' in conf: - first_night = int(conf['FIRST_NIGHT']) - log.info(f"Setting first_night to FIRST_NIGHT: {first_night}") - else: - log.info("Setting first_night to earliest in a normal prod: 20201214") - first_night = 20201214 - - if all_nights is None: - # all_nights = get_all_nights(first_night, last_night) - log.info("Populating all_nights with all of the nights with valid science " - + f"exposures between {first_night} and {last_night} inclusive") - all_nights = get_all_valid_nights(first_night, last_night) + all_nights = get_nights_to_process(production_yaml=conf, verbose=True) ## Load the other parameters for running desi_proc_night if 'THRU_NIGHT' in conf: thru_night = int(conf['THRU_NIGHT']) log.info(f"Setting thru_night to THRU_NIGHT: {thru_night}") else: - thru_night = last_night - log.warning(f"Setting thru_night to last_night: {thru_night}") + thru_night = np.max(all_nights) + log.warning(f"Setting thru_night to last night: {thru_night}") ## If not specified, run "cumulative" redshifts, otherwise do ## as directed @@ -207,9 +241,10 @@ def submit_production(production_yaml, dry_run_level=False): for night in sorted(all_nights): num_in_queue = check_queue_count(user=user, include_scron=False, dry_run_level=dry_run_level) - ## In Jura the largest night had 115 jobs, to be conservative say 200 - if num_in_queue > 4800: - log.info(f"{num_in_queue} jobs in the queue, so stopping the job submissions.") + ## In Jura the largest night had 115 jobs, to be conservative say 200 by default + if num_in_queue > queue_threshold: + log.info(f"{num_in_queue} jobs in the queue > {queue_threshold}," + + " so stopping the job submissions.") break if os.path.exists(findfile('proctable', night=night, readonly=True)): skipped_nights.append(night) diff --git a/py/desispec/workflow/queue.py b/py/desispec/workflow/queue.py index 21864b5d5..4f4a2d41d 100644 --- a/py/desispec/workflow/queue.py +++ b/py/desispec/workflow/queue.py @@ -539,6 +539,11 @@ def get_jobs_in_queue(user=None, include_scron=False, dry_run_level=0): for col in queue_info_table.colnames: queue_info_table.rename_column(col, col.upper()) + ## If the table is empty, return it immediately, otherwise perform + ## sanity check and cuts + if len(queue_info_table) == 0: + return queue_info_table + if np.any(queue_info_table['USER']!=user): msg = f"Warning {np.sum(queue_info_table['USER']!=user)} " \ + f"jobs returned were not {user=}\n" \