From c0102bf2ebd0db2d355a4932fecfe3ea9d8d8bf7 Mon Sep 17 00:00:00 2001 From: David Huber Date: Thu, 19 Sep 2024 16:34:24 +0000 Subject: [PATCH] Additions to allow APP to differ by RUN --- parm/config/gfs/config.base | 31 ++++++++++---- workflow/applications/applications.py | 47 ++++++++++++++-------- workflow/applications/gefs.py | 12 +++++- workflow/applications/gfs_cycled.py | 29 ++++++++++++- workflow/applications/gfs_forecast_only.py | 12 +++++- workflow/rocoto/gfs_tasks.py | 6 +-- workflow/rocoto/workflow_xml.py | 4 +- 7 files changed, 107 insertions(+), 34 deletions(-) diff --git a/parm/config/gfs/config.base b/parm/config/gfs/config.base index 784c334d82..a72a7a242d 100644 --- a/parm/config/gfs/config.base +++ b/parm/config/gfs/config.base @@ -151,8 +151,25 @@ export SENDDBN_NTC=${SENDDBN_NTC:-"NO"} export SENDDBN=${SENDDBN:-"NO"} export DBNROOT=${DBNROOT:-${UTILROOT:-}/fakedbn} -# APP settings -export APP=@APP@ +# APP settings; configurable by RUN +case "${RUN}" in + "gfs") + export APP=@APP@ + ;; + "gdas") + export APP=@APP@ + ;; + "enkfgfs") + export APP=@APP@ + ;; + "enkfgdas") + export APP=@APP@ + ;; + *) + echo "FATAL ERROR: Unrecognized RUN (${RUN})!" + exit 1 + ;; +esac shopt -s extglob # Adjust APP based on RUN @@ -217,7 +234,7 @@ case "${CASE}" in ;; *) echo "FATAL ERROR: Unrecognized CASE ${CASE}, ABORT!" - exit 1 + exit 2 ;; esac @@ -256,8 +273,8 @@ case "${APP}" in fi ;; *) - echo "Unrecognized APP: '${APP}'" - exit 1 + echo "FATAL ERROR: Unrecognized APP: '${APP}'" + exit 3 ;; esac @@ -461,8 +478,8 @@ export FHMAX_FITS=132 export HPSSARCH="@HPSSARCH@" # save data to HPSS archive export LOCALARCH="@LOCALARCH@" # save data to local archive if [[ ${HPSSARCH} = "YES" ]] && [[ ${LOCALARCH} = "YES" ]]; then - echo "Both HPSS and local archiving selected. Please choose one or the other." - exit 2 + echo "FATAL ERROR: Both HPSS and local archiving selected. Please choose one or the other." + exit 4 fi export ARCH_CYC=00 # Archive data at this cycle for warm_start capability export ARCH_WARMICFREQ=4 # Archive frequency in days for warm_start capability diff --git a/workflow/applications/applications.py b/workflow/applications/applications.py index a694129e38..ee721907e8 100644 --- a/workflow/applications/applications.py +++ b/workflow/applications/applications.py @@ -41,7 +41,6 @@ def __init__(self, conf: Configuration) -> None: f'{", ".join(self.VALID_MODES)}\n') self.net = base['NET'] - self.model_app = base.get('APP', 'ATM') self.do_atm = base.get('DO_ATM', True) self.do_wave = base.get('DO_WAVE', False) self.do_wave_bnd = base.get('DOBNDPNT_WAVE', False) @@ -63,6 +62,7 @@ def __init__(self, conf: Configuration) -> None: self.do_goes = base.get('DO_GOES', False) self.do_mos = base.get('DO_MOS', False) self.do_extractvars = base.get('DO_EXTRACTVARS', False) + self.gfs_cyc = base.get('gfs_cyc') self.do_hpssarch = base.get('HPSSARCH', False) @@ -97,28 +97,27 @@ def __init__(self, conf: Configuration) -> None: def _init_finalize(self, conf: Configuration): print("Finalizing initialize") - # Get a list of all possible config_files that would be part of the application + # Get a list of all possible config files that would be part of the application self.configs_names = self._get_app_configs() - # Source the config files for the jobs in the application without specifying a RUN - self.configs = {'_no_run': self._source_configs(conf)} - - # Update the base config dictionary based on application - self.configs['_no_run']['base'] = self._update_base(self.configs['_no_run']['base']) + # Get the list of valid runs for the configuration + self.runs = self.get_valid_runs() - # Save base in the internal state since it is often needed - base = self.configs['_no_run']['base'] + # Initialize the task_names, configs, and model_apps dictionaries + self.task_names = dict.fromkeys(self.runs) + self.model_apps = dict.fromkeys(self.runs) + self.configs = dict.fromkeys(self.runs) - # Get more configuration options into the class attributes - self.gfs_cyc = base.get('gfs_cyc') + # Now configure the experiment for each valid run + for run in self.runs: - # Get task names for the application - self.task_names = self.get_task_names() + # Get task names, configs, and APPs for the application + self.task_names[run] = self.get_task_names(run) - # Finally, source the configuration files for each valid `RUN` - for run in self.task_names.keys(): self.configs[run] = self._source_configs(conf, run=run, log=False) + self.model_apps[run] = self.configs[run]['base'].get('APP', 'ATM') + # Update the base config dictionary based on application and RUN self.configs[run]['base'] = self._update_base(self.configs[run]['base']) @@ -184,7 +183,23 @@ def _source_configs(self, conf: Configuration, run: str = "gfs", log: bool = Tru return configs @abstractmethod - def get_task_names(self, run="_no_run") -> Dict[str, List[str]]: + def get_valid_runs(self) -> List[str]: + ''' + Create a list of RUNs for the configuation. + + Parameters + ---------- + None + + Returns + ------- + Dict[str, List[str]]: Lists of tasks for each RUN. + + ''' + pass + + @abstractmethod + def get_task_names(self, run: str) -> Dict[str, List[str]]: ''' Create a list of task names for each RUN valid for the configuation. diff --git a/workflow/applications/gefs.py b/workflow/applications/gefs.py index afb4072596..3a27808045 100644 --- a/workflow/applications/gefs.py +++ b/workflow/applications/gefs.py @@ -47,7 +47,15 @@ def _update_base(base_in): return base_out - def get_task_names(self): + def get_valid_runs(self): + """ + Return the GEFS RUN. + """ + + # Only one RUN (should be gefs) is allowed as specified in __init__ + return [self.run] + + def get_task_names(self, run): tasks = ['stage_ic'] @@ -84,4 +92,4 @@ def get_task_names(self): tasks += ['arch', 'cleanup'] - return {f"{self.run}": tasks} + return tasks diff --git a/workflow/applications/gfs_cycled.py b/workflow/applications/gfs_cycled.py index 4bb473f454..87a6ab43e2 100644 --- a/workflow/applications/gfs_cycled.py +++ b/workflow/applications/gfs_cycled.py @@ -130,7 +130,28 @@ def _update_base(base_in): return GFSCycledAppConfig.get_gfs_cyc_dates(base_in) - def get_task_names(self): + def get_valid_runs(self): + """ + Get a list of valid RUNs in cycled MODE. + """ + + # The gdas run is always present for the cycled application + runs = ["gdas"] + + # Are we running the early cycle deterministic forecast? + if self.gfs_cyc > 0: + runs.append("gfs") + + # Ensembles? Add the valid run based on eupd_runs. + if self.do_hybvar: + if 'gdas' in self.eupd_runs: + runs.append("enkfgdas") + if 'gfs' in self.eupd_runs: + runs.append("enkfgfs") + + return runs + + def get_task_names(self, run: str): """ Get the task names for all the tasks in the cycled application. Note that the order of the task names matters in the XML. @@ -306,7 +327,11 @@ def get_task_names(self): enkfgfs_tasks.remove("esnowrecen") tasks['enkfgfs'] = enkfgfs_tasks - return tasks + if run not in tasks: + raise KeyError(f"FATAL ERROR: GFS cycled experiment is not configured " + f"for the input run ({run})") + + return tasks[run] @staticmethod def get_gfs_cyc_dates(base: Dict[str, Any]) -> Dict[str, Any]: diff --git a/workflow/applications/gfs_forecast_only.py b/workflow/applications/gfs_forecast_only.py index 93551ac0cc..421d046b80 100644 --- a/workflow/applications/gfs_forecast_only.py +++ b/workflow/applications/gfs_forecast_only.py @@ -82,7 +82,15 @@ def _update_base(base_in): return base_out - def get_task_names(self): + def get_valid_runs(self): + """ + Return the specified RUN for forecast-only MODE. + """ + + # Only one RUN is allowed for forecast-only mode as specified in __init__ + return [self.run] + + def get_task_names(self, run: str): """ Get the task names for all the tasks in the forecast-only application. Note that the order of the task names matters in the XML. @@ -157,4 +165,4 @@ def get_task_names(self): tasks += ['arch', 'cleanup'] # arch and cleanup **must** be the last tasks - return {f"{self.run}": tasks} + return tasks diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index 89da933d00..8f2f612ea6 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -855,8 +855,8 @@ def fcst(self): try: task = fcst_map[self.app_config.mode]() except KeyError: - raise NotImplementedError(f'{self.app_config.mode} is not a valid type.\n' + - 'Currently supported forecast types are:\n' + + raise NotImplementedError(f'{self.app_config.mode} is not a valid type.\n' + f'Currently supported forecast types are:\n' f'{" | ".join(fcst_map.keys())}') return task @@ -868,7 +868,7 @@ def _fcst_forecast_only(self): dependencies.append(rocoto.add_dependency(dep_dict)) if self.app_config.do_wave and self.run in self.app_config.wave_runs: - wave_job = 'waveprep' if self.app_config.model_app in ['ATMW'] else 'waveinit' + wave_job = 'waveprep' if self.app_config.model_apps[self.run] in ['ATMW'] else 'waveinit' dep_dict = {'type': 'task', 'name': f'{self.run}{wave_job}'} dependencies.append(rocoto.add_dependency(dep_dict)) diff --git a/workflow/rocoto/workflow_xml.py b/workflow/rocoto/workflow_xml.py index 3ad7c4bd91..03c7d8b4b9 100644 --- a/workflow/rocoto/workflow_xml.py +++ b/workflow/rocoto/workflow_xml.py @@ -18,8 +18,8 @@ def __init__(self, app_config: AppConfig, rocoto_config: Dict) -> None: self._app_config = app_config self.rocoto_config = rocoto_config - # Use the generic config.base (without RUN specified) - self._base = self._app_config.configs['_no_run']['base'] + # Use the first config.base (sourced with an arbitrary RUN) + self._base = self._app_config.configs[next(iter(self._app_config.configs))]['base'] self.preamble = self._get_preamble() self.definitions = self._get_definitions()