diff --git a/ci/Jenkinsfile b/ci/Jenkinsfile index b3bd6a917a..3c889e51d5 100644 --- a/ci/Jenkinsfile +++ b/ci/Jenkinsfile @@ -5,7 +5,7 @@ def cases = '' def GH = 'none' // Location of the custom workspaces for each machine in the CI system. They are persitent for each iteration of the PR. def NodeName = [hera: 'Hera-EMC', orion: 'Orion-EMC', hercules: 'Hercules-EMC', gaea: 'Gaea'] -def custom_workspace = [hera: '/scratch1/NCEPDEV/global/CI', orion: '/work2/noaa/stmp/CI/ORION', hercules: '/work2/noaa/global/CI/HERCULES', gaea: '/gpfs/f5/epic/proj-shared/global/CI'] +def custom_workspace = [hera: '/scratch1/NCEPDEV/global/CI_dh', orion: '/work2/noaa/stmp/CI/ORION', hercules: '/work2/noaa/global/CI/HERCULES', gaea: '/gpfs/f5/epic/proj-shared/global/CI'] def repo_url = 'git@github.com:NOAA-EMC/global-workflow.git' def STATUS = 'Passed' diff --git a/jobs/JGLOBAL_FORECAST b/jobs/JGLOBAL_FORECAST index e64a91d21c..0572f1d2d9 100755 --- a/jobs/JGLOBAL_FORECAST +++ b/jobs/JGLOBAL_FORECAST @@ -77,7 +77,7 @@ if [[ "${DO_ICE}" == "YES" ]]; then COMIN_ICE_RESTART_PREV:COM_ICE_RESTART_TMPL fi -if [[ "${DO_AERO}" == "YES" ]]; then +if [[ "${DO_AERO_FCST}" == "YES" ]]; then YMD="${PDY}" HH="${cyc}" declare_from_tmpl -rx \ COMOUT_CHEM_HISTORY:COM_CHEM_HISTORY_TMPL fi diff --git a/parm/archive/gdas.yaml.j2 b/parm/archive/gdas.yaml.j2 index 7a9e402138..fa8919a589 100644 --- a/parm/archive/gdas.yaml.j2 +++ b/parm/archive/gdas.yaml.j2 @@ -67,7 +67,7 @@ gdas: - "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}oznstat" - "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}radstat" {% endif %} - {% if DO_AERO and (AERO_ANL_RUN == "gdas" or AERO_ANL_RUN == "both") %} + {% if DO_AERO_ANL %} - "{{ COMIN_CHEM_ANALYSIS | relpath(ROTDIR) }}/{{ head }}aerostat" {% endif %} {% if DO_PREP_OBS_AERO %} diff --git a/parm/archive/gfs_arcdir.yaml.j2 b/parm/archive/gfs_arcdir.yaml.j2 index 58e2cdc699..98803b473c 100644 --- a/parm/archive/gfs_arcdir.yaml.j2 +++ b/parm/archive/gfs_arcdir.yaml.j2 @@ -50,7 +50,7 @@ ARCDIR ~ "/snowstat." ~ RUN ~ "." ~ cycle_YMDH ~ ".tgz"]) %} {% endif %} - {% if DO_AERO and (AERO_ANL_RUN == RUN or AERO_ANL_RUN == "both") %} + {% if DO_AERO_ANL %} {% do det_anl_files.append([COMIN_CHEM_ANALYSIS ~ "/" ~ head ~ "aerostat", ARCDIR ~ "/aerostat." ~ RUN ~ "." ~ cycle_YMDH ]) %} {% endif %} diff --git a/parm/archive/gfsa.yaml.j2 b/parm/archive/gfsa.yaml.j2 index 7cb3c770fd..4efe281120 100644 --- a/parm/archive/gfsa.yaml.j2 +++ b/parm/archive/gfsa.yaml.j2 @@ -38,7 +38,7 @@ gfsa: {% else %} - "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}gsistat" {% endif %} - {% if DO_AERO and (AERO_ANL_RUN == "gfs" or AERO_ANL_RUN == "both") %} + {% if DO_AERO_ANL %} - "{{ COMIN_CHEM_ANALYSIS | relpath(ROTDIR) }}/{{ head }}aerostat" {% endif %} {% if DO_PREP_OBS_AERO %} diff --git a/parm/archive/master_gfs.yaml.j2 b/parm/archive/master_gfs.yaml.j2 index 3f7c2e9d14..e7187d70d5 100644 --- a/parm/archive/master_gfs.yaml.j2 +++ b/parm/archive/master_gfs.yaml.j2 @@ -33,7 +33,7 @@ datasets: {% endfilter %} {% endif %} -{% if DO_AERO and (AERO_FCST_RUN == "gfs" or AERO_FCST_RUN == "both") %} +{% if DO_AERO_FCST %} # Aerosol forecasts {% filter indent(width=4) %} {% include "chem.yaml.j2" %} diff --git a/parm/config/gefs/config.base b/parm/config/gefs/config.base index 848cdcfbda..acfc39db33 100644 --- a/parm/config/gefs/config.base +++ b/parm/config/gefs/config.base @@ -135,10 +135,8 @@ export DO_COUPLED="NO" export DO_WAVE="NO" export DO_OCN="NO" export DO_ICE="NO" -export DO_AERO="NO" export DO_EXTRACTVARS="@DO_EXTRACTVARS@" # Option to process and extract a subset of products to save on disk -export AERO_FCST_RUN="gefs" # When to run aerosol forecast: gdas, gfs, or both -export WAVE_RUN="gefs" # When to include wave suite: gdas, gfs, or both +export DO_AERO_FCST="NO" export DOBNDPNT_WAVE="NO" # The GEFS buoys file does not currently have any boundary points export DOIBP_WAV="NO" # Option to create point outputs from input boundary points export FRAC_GRID=".true." @@ -183,7 +181,7 @@ case "${APP}" in ATM) ;; ATMA) - export DO_AERO="YES" + export DO_AERO_FCST="YES" ;; ATMW) export DO_COUPLED="YES" @@ -200,7 +198,7 @@ case "${APP}" in export DO_ICE="YES" if [[ "${APP}" =~ A$ ]]; then - export DO_AERO="YES" + export DO_AERO_FCST="YES" fi if [[ "${APP}" =~ ^S2SW ]]; then diff --git a/parm/config/gefs/config.efcs b/parm/config/gefs/config.efcs index 27d7be235d..6bf0ed0a18 100644 --- a/parm/config/gefs/config.efcs +++ b/parm/config/gefs/config.efcs @@ -6,7 +6,7 @@ echo "BEGIN: config.efcs" # Turn off components in ensemble -# export DO_AERO="NO" +# export DO_AERO_FCST="NO" # export DO_OCN="NO" # export DO_ICE="NO" # export DO_WAVE="NO" @@ -19,7 +19,7 @@ string="--fv3 ${CASE}" [[ "${DO_OCN}" == "YES" ]] && string="${string} --mom6 ${OCNRES}" [[ "${DO_ICE}" == "YES" ]] && string="${string} --cice6 ${ICERES}" [[ "${DO_WAVE}" == "YES" ]] && string="${string} --ww3 ${waveGRD// /;}" -[[ "${DO_AERO}" == "YES" ]] && string="${string} --gocart" +[[ "${DO_AERO_FCST}" == "YES" ]] && string="${string} --gocart" # shellcheck disable=SC2086 source "${EXPDIR}/config.ufs" ${string} diff --git a/parm/config/gefs/config.fcst b/parm/config/gefs/config.fcst index c600c8edbf..0461c7e909 100644 --- a/parm/config/gefs/config.fcst +++ b/parm/config/gefs/config.fcst @@ -8,24 +8,12 @@ echo "BEGIN: config.fcst" export USE_ESMF_THREADING="YES" # Toggle to use ESMF-managed threading or traditional threading in UFSWM export COPY_FINAL_RESTARTS="NO" # Toggle to copy restarts from the end of GFS/GEFS Run (GDAS is handled seperately) -# Turn off waves if not used for this RUN -case ${WAVE_RUN} in - both | "${RUN/enkf}" ) ;; # Don't change - *) DO_WAVE="NO" ;; # Turn waves off -esac - -# Turn off aerosols if not used for this RUN -case ${AERO_FCST_RUN} in - both | "${RUN/enkf}" ) ;; # Don't change - *) DO_AERO="NO" ;; # Turn waves off -esac - # Source model specific information that is resolution dependent string="--fv3 ${CASE}" [[ "${DO_OCN}" == "YES" ]] && string="${string} --mom6 ${OCNRES}" [[ "${DO_ICE}" == "YES" ]] && string="${string} --cice6 ${ICERES}" [[ "${DO_WAVE}" == "YES" ]] && string="${string} --ww3 ${waveGRD// /;}" -[[ "${DO_AERO}" == "YES" ]] && string="${string} --gocart" +[[ "${DO_AERO_FCST}" == "YES" ]] && string="${string} --gocart" # We are counting on $string being multiple arguments # shellcheck disable=SC2086 source "${EXPDIR}/config.ufs" ${string} @@ -142,7 +130,7 @@ tbp="" if [[ "${progsigma}" == ".true." ]]; then tbp="_progsigma" ; fi # Radiation options -if [[ "${DO_AERO}" == "YES" ]]; then +if [[ "${DO_AERO_FCST}" == "YES" ]]; then export IAER=2011 # spectral band mapping method for aerosol optical properties else export IAER=1011 diff --git a/parm/config/gefs/config.resources b/parm/config/gefs/config.resources index 690fdf919a..a730ea401c 100644 --- a/parm/config/gefs/config.resources +++ b/parm/config/gefs/config.resources @@ -65,7 +65,7 @@ case ${step} in export ntasks=1 export tasks_per_node=1 export threads_per_task=1 - export is_exclusive=True + export memory="4096M" ;; "waveinit") @@ -144,7 +144,7 @@ case ${step} in echo "MEDIATOR using (threads, PETS) = (${MEDTHREADS}, ${MEDPETS})" CHMPETS=0; CHMTHREADS=0 - if [[ "${DO_AERO}" == "YES" ]]; then + if [[ "${DO_AERO_FCST}" == "YES" ]]; then # GOCART shares the same grid and forecast tasks as FV3 (do not add write grid component tasks). (( CHMTHREADS = ATMTHREADS )) (( CHMPETS = FV3PETS )) diff --git a/parm/config/gfs/config.base b/parm/config/gfs/config.base index 8150d2e39c..91f353360f 100644 --- a/parm/config/gfs/config.base +++ b/parm/config/gfs/config.base @@ -160,6 +160,7 @@ export APP=@APP@ shopt -s extglob # Adjust APP based on RUN +# If a component (WAVES, etc) needs to be turned off by RUN, set it here case "${RUN}" in enkf*) # Turn off aerosols and waves APP="${APP/%+([WA])}" @@ -175,11 +176,12 @@ export DO_COUPLED="NO" export DO_WAVE="NO" export DO_OCN="NO" export DO_ICE="NO" -export DO_AERO="NO" +DO_AERO="NO" export DO_PREP_OBS_AERO="NO" -export AERO_FCST_RUN="gdas" # When to run aerosol forecast: gdas, gfs, or both -export AERO_ANL_RUN="both" # When to run aerosol analysis: gdas, gfs, or both -export WAVE_RUN="both" # When to include wave suite: gdas, gfs, or both +aero_fcst_runs="gdas" # When to run aerosol forecast: gdas, gfs, or both +aero_anl_runs="gdas gfs" # When to run aerosol analysis: gdas, gfs, or both +export DO_AERO_FCST="NO" +export DO_AERO_ANL="NO" export DOBNDPNT_WAVE="NO" export DOIBP_WAV="NO" # Option to create point outputs from input boundary points export FRAC_GRID=".true." @@ -221,7 +223,7 @@ case "${CASE}" in ;; *) echo "FATAL ERROR: Unrecognized CASE ${CASE}, ABORT!" - exit 1 + exit 2 ;; esac @@ -229,7 +231,7 @@ case "${APP}" in ATM) ;; ATMA) - export DO_AERO="YES" + DO_AERO="YES" ;; ATMW) export DO_COUPLED="YES" @@ -246,7 +248,7 @@ case "${APP}" in export DO_ICE="YES" if [[ "${APP}" =~ A$ ]]; then - export DO_AERO="YES" + DO_AERO="YES" fi if [[ "${APP}" =~ ^S2SW ]]; then @@ -254,11 +256,21 @@ case "${APP}" in fi ;; *) - echo "Unrecognized APP: '${APP}'" - exit 2 + echo "FATAL ERROR: Unrecognized APP: '${APP}'" + exit 3 ;; esac +# Aerosol forecasts and analyses may be RUN-dependent +if [[ "${DO_AERO}" == "YES" ]]; then + if [[ ${aero_anl_runs} =~ ${RUN} ]]; then + export DO_AERO_ANL="YES" + fi + if [[ ${aero_fcst_runs} =~ ${RUN} ]]; then + export DO_AERO_FCST="YES" + fi +fi + # Surface cycle update frequency if [[ "${RUN}" =~ "gdas" ]] ; then export FHCYC=1 @@ -457,8 +469,8 @@ export FHMAX_FITS=132 export HPSSARCH="@HPSSARCH@" # save data to HPSS archive export LOCALARCH="@LOCALARCH@" # save data to local archive if [[ ${HPSSARCH} = "YES" ]] && [[ ${LOCALARCH} = "YES" ]]; then - echo "Both HPSS and local archiving selected. Please choose one or the other." - exit 3 + echo "FATAL ERROR: Both HPSS and local archiving selected. Please choose one or the other." + exit 4 fi export ARCH_CYC=00 # Archive data at this cycle for warm_start capability export ARCH_WARMICFREQ=4 # Archive frequency in days for warm_start capability diff --git a/parm/config/gfs/config.efcs b/parm/config/gfs/config.efcs index 1837cf0619..d27fd13cfa 100644 --- a/parm/config/gfs/config.efcs +++ b/parm/config/gfs/config.efcs @@ -13,7 +13,7 @@ string="--fv3 ${CASE}" [[ "${DO_OCN}" == "YES" ]] && string="${string} --mom6 ${OCNRES}" [[ "${DO_ICE}" == "YES" ]] && string="${string} --cice6 ${ICERES}" [[ "${DO_WAVE}" == "YES" ]] && string="${string} --ww3 ${waveGRD// /;}" -[[ "${DO_AERO}" == "YES" ]] && string="${string} --gocart" +[[ "${DO_AERO_FCST}" == "YES" ]] && string="${string} --gocart" # We are counting on $string being multiple arguments # shellcheck disable=SC2086 source "${EXPDIR}/config.ufs" ${string} diff --git a/parm/config/gfs/config.fcst b/parm/config/gfs/config.fcst index 571e6cafb5..b154d37114 100644 --- a/parm/config/gfs/config.fcst +++ b/parm/config/gfs/config.fcst @@ -8,24 +8,12 @@ echo "BEGIN: config.fcst" export USE_ESMF_THREADING="YES" # Toggle to use ESMF-managed threading or traditional threading in UFSWM export COPY_FINAL_RESTARTS="NO" # Toggle to copy restarts from the end of GFS/GEFS Run (GDAS is handled seperately) -# Turn off waves if not used for this RUN -case ${WAVE_RUN} in - both | "${RUN/enkf}" ) ;; # Don't change - *) DO_WAVE="NO" ;; # Turn waves off -esac - -# Turn off aerosols if not used for this RUN -case ${AERO_FCST_RUN} in - both | "${RUN/enkf}" ) ;; # Don't change - *) DO_AERO="NO" ;; # Turn aerosols off -esac - # Source model specific information that is resolution dependent string="--fv3 ${CASE}" [[ "${DO_OCN}" == "YES" ]] && string="${string} --mom6 ${OCNRES}" [[ "${DO_ICE}" == "YES" ]] && string="${string} --cice6 ${ICERES}" [[ "${DO_WAVE}" == "YES" ]] && string="${string} --ww3 ${waveGRD// /;}" -[[ "${DO_AERO}" == "YES" ]] && string="${string} --gocart" +[[ "${DO_AERO_FCST}" == "YES" ]] && string="${string} --gocart" # We are counting on $string being multiple arguments # shellcheck disable=SC2086 source "${EXPDIR}/config.ufs" ${string} @@ -157,7 +145,7 @@ tbp="" if [[ "${progsigma}" == ".true." ]]; then tbp="_progsigma" ; fi # Radiation options -if [[ "${DO_AERO}" == "YES" ]]; then +if [[ "${DO_AERO_FCST}" == "YES" ]]; then export IAER=2011 # spectral band mapping method for aerosol optical properties else export IAER=1011 diff --git a/parm/config/gfs/config.resources b/parm/config/gfs/config.resources index cddd1643fd..bc2a89054e 100644 --- a/parm/config/gfs/config.resources +++ b/parm/config/gfs/config.resources @@ -809,7 +809,7 @@ case ${step} in echo "MEDIATOR using (threads, PETS) = (${MEDTHREADS}, ${MEDPETS})" CHMPETS=0; CHMTHREADS=0 - if [[ "${DO_AERO}" == "YES" ]]; then + if [[ "${DO_AERO_FCST}" == "YES" ]]; then # GOCART shares the same grid and forecast tasks as FV3 (do not add write grid component tasks). (( CHMTHREADS = ATMTHREADS )) (( CHMPETS = FV3PETS )) @@ -1036,7 +1036,7 @@ case ${step} in ntasks=1 tasks_per_node=1 threads_per_task=1 - export is_exclusive=True + memory="4096M" ;; "atmensanlinit") diff --git a/scripts/exglobal_archive.py b/scripts/exglobal_archive.py index 4ee9e5ed0e..2d3fa58313 100755 --- a/scripts/exglobal_archive.py +++ b/scripts/exglobal_archive.py @@ -3,7 +3,7 @@ import os from pygfs.task.archive import Archive -from wxflow import AttrDict, Logger, cast_strdict_as_dtypedict, chdir, logit +from wxflow import AttrDict, Logger, cast_strdict_as_dtypedict, logit # initialize root logger logger = Logger(level=os.environ.get("LOGGING_LEVEL", "DEBUG"), colored_log=True) @@ -19,7 +19,7 @@ def main(): # Pull out all the configuration keys needed to run the rest of archive steps keys = ['ATARDIR', 'current_cycle', 'FHMIN', 'FHMAX', 'FHOUT', 'RUN', 'PDY', - 'DO_VERFRAD', 'DO_VMINMON', 'DO_VERFOZN', 'DO_ICE', 'DO_AERO', 'DO_PREP_OBS_AERO', + 'DO_VERFRAD', 'DO_VMINMON', 'DO_VERFOZN', 'DO_ICE', 'DO_PREP_OBS_AERO', 'PARMgfs', 'DO_OCN', 'DO_WAVE', 'WRITE_DOPOST', 'PSLOT', 'HPSSARCH', 'DO_MOS', 'DO_JEDISNOWDA', 'LOCALARCH', 'REALTIME', 'ROTDIR', 'ARCH_WARMICFREQ', 'ARCH_FCSTICFREQ', 'ARCH_CYC', 'assim_freq', 'ARCDIR', 'SDATE', @@ -29,7 +29,7 @@ def main(): 'DOIAU', 'OCNRES', 'ICERES', 'NUM_SND_COLLECTIVES', 'FHOUT_WAV', 'FHOUT_HF_WAV', 'FHMAX_WAV', 'FHMAX_HF_WAV', 'FHMAX_WAV_GFS', 'restart_interval_gdas', 'restart_interval_gfs', - 'AERO_ANL_RUN', 'AERO_FCST_RUN', 'DOIBP_WAV', 'DO_JEDIOCNVAR', + 'DO_AERO_ANL', 'DO_AERO_FCST', 'DOIBP_WAV', 'DO_JEDIOCNVAR', 'NMEM_ENS', 'DO_JEDIATMVAR', 'DO_VRFY_OCEANDA', 'FHMAX_FITS', 'waveGRD', 'IAUFHRS', 'DO_FIT2OBS', 'NET', 'FHOUT_HF_GFS', 'FHMAX_HF_GFS', 'REPLAY_ICS', 'OFFSET_START_HOUR'] diff --git a/ush/python/pygfs/task/archive.py b/ush/python/pygfs/task/archive.py index 108cd2ed27..f1d8cdf865 100644 --- a/ush/python/pygfs/task/archive.py +++ b/ush/python/pygfs/task/archive.py @@ -50,7 +50,7 @@ def configure(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str Parameters ---------- arch_dict : Dict[str, Any] - Task specific keys, e.g. runtime options (DO_AERO, DO_ICE, etc) + Task specific keys, e.g. runtime options (DO_AERO_FCST, DO_ICE, etc) Return ------ diff --git a/workflow/applications/applications.py b/workflow/applications/applications.py index ecd320d708..22e299df20 100644 --- a/workflow/applications/applications.py +++ b/workflow/applications/applications.py @@ -1,9 +1,8 @@ #!/usr/bin/env python3 from typing import Dict, List, Any -from datetime import timedelta from hosts import Host -from wxflow import Configuration, to_timedelta +from wxflow import Configuration from abc import ABC, ABCMeta, abstractmethod __all__ = ['AppConfig'] @@ -31,94 +30,83 @@ def __init__(self, conf: Configuration) -> None: self.scheduler = Host().scheduler + # Get the most basic settings from config.base to determine + # experiment type ({NET}_{MODE}) base = conf.parse_config('config.base') self.mode = base['MODE'] - if self.mode not in self.VALID_MODES: raise NotImplementedError(f'{self.mode} is not a valid application mode.\n' f'Valid application modes are:\n' f'{", ".join(self.VALID_MODES)}\n') self.net = base['NET'] - self.model_app = base.get('APP', 'ATM') - self.do_atm = base.get('DO_ATM', True) - self.do_wave = base.get('DO_WAVE', False) - self.do_wave_bnd = base.get('DOBNDPNT_WAVE', False) - self.do_ocean = base.get('DO_OCN', False) - self.do_ice = base.get('DO_ICE', False) - self.do_aero = base.get('DO_AERO', False) - self.do_prep_obs_aero = base.get('DO_PREP_OBS_AERO', False) - self.do_bufrsnd = base.get('DO_BUFRSND', False) - self.do_gempak = base.get('DO_GEMPAK', False) - self.do_awips = base.get('DO_AWIPS', False) - self.do_verfozn = base.get('DO_VERFOZN', True) - self.do_verfrad = base.get('DO_VERFRAD', True) - self.do_vminmon = base.get('DO_VMINMON', True) - self.do_tracker = base.get('DO_TRACKER', True) - self.do_genesis = base.get('DO_GENESIS', True) - self.do_genesis_fsu = base.get('DO_GENESIS_FSU', False) - self.do_metp = base.get('DO_METP', False) - self.do_upp = not base.get('WRITE_DOPOST', True) - self.do_goes = base.get('DO_GOES', False) - self.do_mos = base.get('DO_MOS', False) - self.do_extractvars = base.get('DO_EXTRACTVARS', False) - - self.do_hpssarch = base.get('HPSSARCH', False) - - self.nens = base.get('NMEM_ENS', 0) - self.fcst_segments = base.get('FCST_SEGMENTS', None) - self.interval_gfs = to_timedelta(f"{base.get('INTERVAL_GFS')}H") - - if not AppConfig.is_monotonic(self.fcst_segments): - raise ValueError(f'Forecast segments do not increase monotonically: {",".join(self.fcst_segments)}') - - self.wave_runs = None - if self.do_wave: - wave_run = base.get('WAVE_RUN', 'BOTH').lower() - if wave_run in ['both']: - self.wave_runs = ['gfs', 'gdas'] - elif wave_run in ['gfs', 'gdas']: - self.wave_runs = [wave_run] - - self.aero_anl_runs = None - self.aero_fcst_runs = None - if self.do_aero: - aero_anl_run = base.get('AERO_ANL_RUN', 'BOTH').lower() - if aero_anl_run in ['both']: - self.aero_anl_runs = ['gfs', 'gdas'] - elif aero_anl_run in ['gfs', 'gdas']: - self.aero_anl_runs = [aero_anl_run] - aero_fcst_run = base.get('AERO_FCST_RUN', None).lower() - if aero_fcst_run in ['both']: - self.aero_fcst_runs = ['gfs', 'gdas'] - elif aero_fcst_run in ['gfs', 'gdas']: - self.aero_fcst_runs = [aero_fcst_run] + print(f"Generating the XML for a {self.mode}_{self.net} case") def _init_finalize(self, conf: Configuration): - print("Finalizing initialize") - - # Get a list of all possible config_files that would be part of the application - self.configs_names = self._get_app_configs() - - # Source the config files for the jobs in the application without specifying a RUN - self.configs = {'_no_run': self._source_configs(conf)} - - # Update the base config dictionary based on application - self.configs['_no_run']['base'] = self._update_base(self.configs['_no_run']['base']) + ''' + Finalize object initialization calling subclass methods + ''' - # Save base in the internal state since it is often needed - base = self.configs['_no_run']['base'] + # Get run-, net-, and mode-based options + self.run_options = self._get_run_options(conf) - # Get task names for the application + # Get task names and runs for the application self.task_names = self.get_task_names() - # Finally, source the configuration files for each valid `RUN` - for run in self.task_names.keys(): + # Initialize the configs and model_apps dictionaries + self.configs = dict.fromkeys(self.runs) + + # Now configure the experiment for each valid run + for run in self.runs: self.configs[run] = self._source_configs(conf, run=run, log=False) - # Update the base config dictionary based on application and RUN - self.configs[run]['base'] = self._update_base(self.configs[run]['base']) + def _get_run_options(self, conf: Configuration) -> Dict[str, Any]: + ''' + Determine the do_* and APP options for each RUN by sourcing config.base + for each RUN and collecting the flags into self.run_options. Note that + this method is overloaded so additional NET- and MODE-dependent flags + can be set. + ''' + + run_options = {run: {} for run in dict.fromkeys(self.runs)} + for run in self.runs: + # Read config.base with RUN specified + run_base = conf.parse_config('config.base', RUN=run) + + run_options[run]['app'] = run_base.get('APP', 'ATM') + run_options[run]['do_wave_bnd'] = run_base.get('DOBNDPNT_WAVE', False) + run_options[run]['do_bufrsnd'] = run_base.get('DO_BUFRSND', False) + run_options[run]['do_gempak'] = run_base.get('DO_GEMPAK', False) + run_options[run]['do_awips'] = run_base.get('DO_AWIPS', False) + run_options[run]['do_verfozn'] = run_base.get('DO_VERFOZN', True) + run_options[run]['do_verfrad'] = run_base.get('DO_VERFRAD', True) + run_options[run]['do_vminmon'] = run_base.get('DO_VMINMON', True) + run_options[run]['do_tracker'] = run_base.get('DO_TRACKER', True) + run_options[run]['do_genesis'] = run_base.get('DO_GENESIS', True) + run_options[run]['do_genesis_fsu'] = run_base.get('DO_GENESIS_FSU', False) + run_options[run]['do_metp'] = run_base.get('DO_METP', False) + run_options[run]['do_upp'] = not run_base.get('WRITE_DOPOST', True) + run_options[run]['do_goes'] = run_base.get('DO_GOES', False) + run_options[run]['do_mos'] = run_base.get('DO_MOS', False) + run_options[run]['do_extractvars'] = run_base.get('DO_EXTRACTVARS', False) + + run_options[run]['do_atm'] = run_base.get('DO_ATM', True) + run_options[run]['do_wave'] = run_base.get('DO_WAVE', False) + run_options[run]['do_ocean'] = run_base.get('DO_OCN', False) + run_options[run]['do_ice'] = run_base.get('DO_ICE', False) + run_options[run]['do_prep_obs_aero'] = run_base.get('DO_PREP_OBS_AERO', False) + run_options[run]['do_aero_anl'] = run_base.get('DO_AERO_ANL', False) + run_options[run]['do_aero_fcst'] = run_base.get('DO_AERO_FCST', False) + + run_options[run]['do_hpssarch'] = run_base.get('HPSSARCH', False) + run_options[run]['fcst_segments'] = run_base.get('FCST_SEGMENTS', None) + + if not AppConfig.is_monotonic(run_options[run]['fcst_segments']): + raise ValueError(f'Forecast segments do not increase monotonically: {",".join(self.fcst_segments)}') + + # Return the dictionary of run options + return run_options @abstractmethod def _get_app_configs(self): @@ -150,13 +138,12 @@ def _source_configs(self, conf: Configuration, run: str = "gfs", log: bool = Tru Every config depends on "config.base" """ - configs = dict() - - # Return config.base as well - configs['base'] = conf.parse_config('config.base', RUN=run) + # Include config.base by its lonesome and update it + configs = {'base': conf.parse_config('config.base', RUN=run)} + configs['base'] = self._update_base(configs['base']) # Source the list of all config_files involved in the application - for config in self.configs_names: + for config in self._get_app_configs(run): # All must source config.base first files = ['config.base'] @@ -182,9 +169,9 @@ def _source_configs(self, conf: Configuration, run: str = "gfs", log: bool = Tru return configs @abstractmethod - def get_task_names(self, run="_no_run") -> Dict[str, List[str]]: + def get_task_names(self, run: str) -> Dict[str, List[str]]: ''' - Create a list of task names for each RUN valid for the configuation. + Create a list of valid RUNs and a dict of task names for each RUN valid for the configuation. Parameters ---------- @@ -192,7 +179,7 @@ def get_task_names(self, run="_no_run") -> Dict[str, List[str]]: Returns ------- - Dict[str, List[str]]: Lists of tasks for each RUN. + Dict[str, List[str]]: Lists of all tasks for each RUN. ''' pass diff --git a/workflow/applications/gefs.py b/workflow/applications/gefs.py index 9d1d5c3dc4..33545eb2ec 100644 --- a/workflow/applications/gefs.py +++ b/workflow/applications/gefs.py @@ -1,4 +1,5 @@ from applications.applications import AppConfig +from typing import Dict, Any from wxflow import Configuration @@ -12,28 +13,38 @@ def __init__(self, conf: Configuration): base = conf.parse_config('config.base') self.run = base.get('RUN', 'gefs') + self.runs = [self.run] - def _get_app_configs(self): + def _get_run_options(self, conf: Configuration) -> Dict[str, Any]: + + run_options = super()._get_run_options(conf) + + run_options[self.run]['nens'] = conf.parse_config('config.base').get('NMEM_ENS', 0) + + return run_options + + def _get_app_configs(self, run): """ Returns the config_files that are involved in gefs """ + options = self.run_options[run] configs = ['stage_ic', 'fcst', 'atmos_products', 'arch', 'cleanup'] - if self.nens > 0: + if options['nens'] > 0: configs += ['efcs', 'atmos_ensstat'] - if self.do_wave: + if options['do_wave']: configs += ['waveinit', 'wavepostsbs', 'wavepostpnt'] - if self.do_wave_bnd: + if options['do_wave_bnd']: configs += ['wavepostbndpnt', 'wavepostbndpntbll'] - if self.do_ocean or self.do_ice: + if options['do_ocean'] or options['do_ice']: configs += ['oceanice_products'] - if self.do_aero: + if options['do_aero_fcst']: configs += ['prep_emissions'] - if self.do_extractvars: + if options['do_extractvars']: configs += ['extractvars'] return configs @@ -48,37 +59,38 @@ def _update_base(base_in): def get_task_names(self): + options = self.run_options[self.run] tasks = ['stage_ic'] - if self.do_wave: + if options['do_wave']: tasks += ['waveinit'] - if self.do_aero: + if options['do_aero_fcst']: tasks += ['prep_emissions'] tasks += ['fcst'] - if self.nens > 0: + if options['nens'] > 0: tasks += ['efcs'] tasks += ['atmos_prod'] - if self.nens > 0: + if options['nens'] > 0: tasks += ['atmos_ensstat'] - if self.do_ocean: + if options['do_ocean']: tasks += ['ocean_prod'] - if self.do_ice: + if options['do_ice']: tasks += ['ice_prod'] - if self.do_wave: + if options['do_wave']: tasks += ['wavepostsbs'] - if self.do_wave_bnd: + if options['do_wave_bnd']: tasks += ['wavepostbndpnt', 'wavepostbndpntbll'] tasks += ['wavepostpnt'] - if self.do_extractvars: + if options['do_extractvars']: tasks += ['extractvars', 'arch'] tasks += ['cleanup'] diff --git a/workflow/applications/gfs_cycled.py b/workflow/applications/gfs_cycled.py index e85e8b159f..2d16b6a59c 100644 --- a/workflow/applications/gfs_cycled.py +++ b/workflow/applications/gfs_cycled.py @@ -1,7 +1,6 @@ -from typing import Dict, Any from applications.applications import AppConfig -from wxflow import Configuration, to_timedelta -from datetime import timedelta +from typing import Dict, Any +from wxflow import Configuration class GFSCycledAppConfig(AppConfig): @@ -11,113 +10,136 @@ class GFSCycledAppConfig(AppConfig): def __init__(self, conf: Configuration): super().__init__(conf) + # Re-read config.base without RUN specified to get the basic settings for + # cycled cases to be able to determine valid runs base = conf.parse_config('config.base') - self.do_hybvar = base.get('DOHYBVAR', False) - self.do_fit2obs = base.get('DO_FIT2OBS', True) - self.do_jediatmvar = base.get('DO_JEDIATMVAR', False) - self.do_jediatmens = base.get('DO_JEDIATMENS', False) - self.do_jediocnvar = base.get('DO_JEDIOCNVAR', False) - self.do_jedisnowda = base.get('DO_JEDISNOWDA', False) - self.do_mergensst = base.get('DO_MERGENSST', False) - self.do_vrfy_oceanda = base.get('DO_VRFY_OCEANDA', False) - - self.lobsdiag_forenkf = False - self.eupd_runs = None - if self.do_hybvar: - self.lobsdiag_forenkf = base.get('lobsdiag_forenkf', False) - eupd_run = base.get('EUPD_CYC', 'gdas').lower() - if eupd_run in ['both']: - self.eupd_runs = ['gfs', 'gdas'] - elif eupd_run in ['gfs', 'gdas']: - self.eupd_runs = [eupd_run] - - def _get_app_configs(self): + + self.ens_runs = [] + + if base.get('DOHYBVAR', False): + ens_run = base.get('EUPD_CYC', 'gdas').lower() + if ens_run in ['both']: + self.ens_runs = ['gfs', 'gdas'] + elif ens_run in ['gfs', 'gdas']: + self.ens_runs = [ens_run] + + # Now construct self.runs the desired XML order (gdas, enkfgdas, gfs, enkfgfs) + self.runs = ["gdas"] # We always have a 'gdas' run + self.runs.append('enkfgdas') if 'gdas' in self.ens_runs else 0 + self.runs.append("gfs") if base['INTERVAL_GFS'] > 0 else 0 + self.runs.append('enkfgfs') if 'gfs' in self.ens_runs and "gfs" in self.runs else 0 + + def _get_run_options(self, conf: Configuration) -> Dict[str, Any]: + + run_options = super()._get_run_options(conf) + + for run in self.runs: + base = conf.parse_config('config.base', RUN=run) + + run_options[run]['do_hybvar'] = base.get('DOHYBVAR', False) + run_options[run]['nens'] = base.get('NMEM_ENS', 0) + if run_options[run]['do_hybvar']: + run_options[run]['lobsdiag_forenkf'] = base.get('lobsdiag_forenkf', False) + + run_options[run]['do_fit2obs'] = base.get('DO_FIT2OBS', True) + run_options[run]['do_jediatmvar'] = base.get('DO_JEDIATMVAR', False) + run_options[run]['do_jediatmens'] = base.get('DO_JEDIATMENS', False) + run_options[run]['do_jediocnvar'] = base.get('DO_JEDIOCNVAR', False) + run_options[run]['do_jedisnowda'] = base.get('DO_JEDISNOWDA', False) + run_options[run]['do_mergensst'] = base.get('DO_MERGENSST', False) + run_options[run]['do_vrfy_oceanda'] = base.get('DO_VRFY_OCEANDA', False) + + return run_options + + def _get_app_configs(self, run): """ - Returns the config_files that are involved in the cycled app + Returns the config files that are involved in the cycled app """ + options = self.run_options[run] configs = ['prep'] - if self.do_jediatmvar: + if options['do_jediatmvar']: configs += ['prepatmiodaobs', 'atmanlinit', 'atmanlvar', 'atmanlfv3inc', 'atmanlfinal'] else: configs += ['anal', 'analdiag'] - if self.do_jediocnvar: + if options['do_jediocnvar']: configs += ['prepoceanobs', 'marineanlinit', 'marinebmat', 'marineanlvar'] - if self.do_hybvar: + if options['do_hybvar']: configs += ['marineanlletkf', 'ocnanalecen'] configs += ['marineanlchkpt', 'marineanlfinal'] - if self.do_vrfy_oceanda: + if options['do_vrfy_oceanda']: configs += ['ocnanalvrfy'] - if self.do_ocean or self.do_ice: + if options['do_ocean'] or options['do_ice']: configs += ['oceanice_products'] configs += ['stage_ic', 'sfcanl', 'analcalc', 'fcst', 'upp', 'atmos_products', 'arch', 'cleanup'] - if self.do_hybvar: - if self.do_jediatmens: - configs += ['atmensanlinit', 'atmensanlobs', 'atmensanlsol', 'atmensanlletkf', 'atmensanlfv3inc', 'atmensanlfinal'] + if options['do_hybvar']: + if options['do_jediatmens']: + configs += ['atmensanlinit', 'atmensanlobs', 'atmensanlsol', + 'atmensanlletkf', 'atmensanlfv3inc', 'atmensanlfinal'] else: configs += ['eobs', 'eomg', 'ediag', 'eupd'] configs += ['ecen', 'esfc', 'efcs', 'echgres', 'epos', 'earc'] - if self.do_fit2obs: + if options['do_fit2obs']: configs += ['fit2obs'] - if self.do_verfozn: + if options['do_verfozn']: configs += ['verfozn'] - if self.do_verfrad: + if options['do_verfrad']: configs += ['verfrad'] - if self.do_vminmon: + if options['do_vminmon']: configs += ['vminmon'] - if self.do_tracker: + if options['do_tracker']: configs += ['tracker'] - if self.do_genesis: + if options['do_genesis']: configs += ['genesis'] - if self.do_genesis_fsu: + if options['do_genesis_fsu']: configs += ['genesis_fsu'] - if self.do_metp: + if options['do_metp']: configs += ['metp'] - if self.do_gempak: + if options['do_gempak']: configs += ['gempak'] - if self.do_goes: + if options['do_goes']: configs += ['npoess'] - if self.do_bufrsnd: + if options['do_bufrsnd']: configs += ['postsnd'] - if self.do_awips: + if options['do_awips']: configs += ['awips'] - if self.do_wave: + if options['do_wave']: configs += ['waveinit', 'waveprep', 'wavepostsbs', 'wavepostpnt'] - if self.do_wave_bnd: + if options['do_wave_bnd']: configs += ['wavepostbndpnt', 'wavepostbndpntbll'] - if self.do_gempak: + if options['do_gempak']: configs += ['wavegempak'] - if self.do_awips: + if options['do_awips']: configs += ['waveawipsbulls', 'waveawipsgridded'] - if self.do_aero: + if options['do_aero_anl']: configs += ['aeroanlgenb', 'aeroanlinit', 'aeroanlvar', 'aeroanlfinal'] - if self.do_prep_obs_aero: + if options['do_prep_obs_aero']: configs += ['prepobsaero'] - if self.do_jedisnowda: + if options['do_jedisnowda']: configs += ['snowanl'] - if self.do_hybvar: + if options['do_hybvar']: configs += ['esnowrecen'] - if self.do_mos: + if options['do_mos']: configs += ['mos_stn_prep', 'mos_grd_prep', 'mos_ext_stn_prep', 'mos_ext_grd_prep', 'mos_stn_fcst', 'mos_grd_fcst', 'mos_ext_stn_fcst', 'mos_ext_grd_fcst', 'mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', 'mos_ext_grd_prdgen', @@ -132,178 +154,169 @@ def _update_base(base_in): def get_task_names(self): """ - Get the task names for all the tasks in the cycled application. - Note that the order of the task names matters in the XML. - This is the place where that order is set. + Get the task names for each valid run in this cycled configuration. + NOTE: The order of the task names matters in the XML. + This is the place where that order is set. """ - gdas_gfs_common_tasks_before_fcst = ['prep'] - gdas_gfs_common_cleanup_tasks = ['arch', 'cleanup'] - - if self.do_jediatmvar: - gdas_gfs_common_tasks_before_fcst += ['prepatmiodaobs', 'atmanlinit', 'atmanlvar', 'atmanlfv3inc', 'atmanlfinal'] - else: - gdas_gfs_common_tasks_before_fcst += ['anal'] - - if self.do_jediocnvar: - gdas_gfs_common_tasks_before_fcst += ['prepoceanobs', 'marineanlinit', 'marinebmat', 'marineanlvar'] - if self.do_hybvar: - gdas_gfs_common_tasks_before_fcst += ['marineanlletkf', 'ocnanalecen'] - gdas_gfs_common_tasks_before_fcst += ['marineanlchkpt', 'marineanlfinal'] - if self.do_vrfy_oceanda: - gdas_gfs_common_tasks_before_fcst += ['ocnanalvrfy'] - - gdas_gfs_common_tasks_before_fcst += ['sfcanl', 'analcalc'] - - if self.do_jedisnowda: - gdas_gfs_common_tasks_before_fcst += ['snowanl'] - - wave_prep_tasks = ['waveinit', 'waveprep'] - wave_bndpnt_tasks = ['wavepostbndpnt', 'wavepostbndpntbll'] - wave_post_tasks = ['wavepostsbs', 'wavepostpnt'] - - hybrid_tasks = [] - hybrid_after_eupd_tasks = [] - if self.do_hybvar: - if self.do_jediatmens: - hybrid_tasks += ['atmensanlinit', 'atmensanlfv3inc', 'atmensanlfinal', 'echgres'] - hybrid_tasks += ['atmensanlobs', 'atmensanlsol'] if self.lobsdiag_forenkf else ['atmensanlletkf'] - else: - hybrid_tasks += ['eobs', 'eupd', 'echgres'] - hybrid_tasks += ['ediag'] if self.lobsdiag_forenkf else ['eomg'] - if self.do_jedisnowda: - hybrid_tasks += ['esnowrecen'] - hybrid_after_eupd_tasks += ['stage_ic', 'ecen', 'esfc', 'efcs', 'epos', 'earc', 'cleanup'] - - # Collect all "gdas" cycle tasks - gdas_tasks = gdas_gfs_common_tasks_before_fcst.copy() - - if not self.do_jediatmvar: - gdas_tasks += ['analdiag'] - - if self.do_wave and 'gdas' in self.wave_runs: - gdas_tasks += wave_prep_tasks - - if self.do_aero and 'gdas' in self.aero_anl_runs: - gdas_tasks += ['aeroanlgenb', 'aeroanlinit', 'aeroanlvar', 'aeroanlfinal'] - if self.do_prep_obs_aero: - gdas_tasks += ['prepobsaero'] - - gdas_tasks += ['stage_ic', 'atmanlupp', 'atmanlprod', 'fcst'] - - if self.do_upp: - gdas_tasks += ['atmupp'] - gdas_tasks += ['atmos_prod'] - - if self.do_wave and 'gdas' in self.wave_runs: - if self.do_wave_bnd: - gdas_tasks += wave_bndpnt_tasks - gdas_tasks += wave_post_tasks - - if self.do_fit2obs: - gdas_tasks += ['fit2obs'] - - if self.do_verfozn: - gdas_tasks += ['verfozn'] - - if self.do_verfrad: - gdas_tasks += ['verfrad'] - - if self.do_vminmon: - gdas_tasks += ['vminmon'] - - if self.do_gempak: - gdas_tasks += ['gempak', 'gempakmetancdc'] - - gdas_tasks += gdas_gfs_common_cleanup_tasks - - # Collect "gfs" cycle tasks - gfs_tasks = gdas_gfs_common_tasks_before_fcst.copy() - - if self.do_wave and 'gfs' in self.wave_runs: - gfs_tasks += wave_prep_tasks - - if self.do_aero and 'gfs' in self.aero_anl_runs: - gfs_tasks += ['aeroanlinit', 'aeroanlvar', 'aeroanlfinal'] - if self.do_prep_obs_aero: - gfs_tasks += ['prepobsaero'] - - gfs_tasks += ['atmanlupp', 'atmanlprod', 'fcst'] - - if self.do_ocean: - gfs_tasks += ['ocean_prod'] - - if self.do_ice: - gfs_tasks += ['ice_prod'] - - if self.do_upp: - gfs_tasks += ['atmupp'] - gfs_tasks += ['atmos_prod'] - - if self.do_goes: - gfs_tasks += ['goesupp'] - - if self.do_vminmon: - gfs_tasks += ['vminmon'] - - if self.do_tracker: - gfs_tasks += ['tracker'] - - if self.do_genesis: - gfs_tasks += ['genesis'] - - if self.do_genesis_fsu: - gfs_tasks += ['genesis_fsu'] - - if self.do_metp: - gfs_tasks += ['metp'] - - if self.do_wave and 'gfs' in self.wave_runs: - if self.do_wave_bnd: - gfs_tasks += wave_bndpnt_tasks - gfs_tasks += wave_post_tasks - if self.do_gempak: - gfs_tasks += ['wavegempak'] - if self.do_awips: - gfs_tasks += ['waveawipsbulls', 'waveawipsgridded'] - - if self.do_bufrsnd: - gfs_tasks += ['postsnd'] - - if self.do_gempak: - gfs_tasks += ['gempak'] - gfs_tasks += ['gempakmeta'] - gfs_tasks += ['gempakncdcupapgif'] - if self.do_goes: - gfs_tasks += ['npoess_pgrb2_0p5deg'] - gfs_tasks += ['gempakpgrb2spec'] - - if self.do_awips: - gfs_tasks += ['awips_20km_1p0deg', 'fbwind'] - - if self.do_mos: - gfs_tasks += ['mos_stn_prep', 'mos_grd_prep', 'mos_ext_stn_prep', 'mos_ext_grd_prep', - 'mos_stn_fcst', 'mos_grd_fcst', 'mos_ext_stn_fcst', 'mos_ext_grd_fcst', - 'mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', 'mos_ext_grd_prdgen', - 'mos_wx_prdgen', 'mos_wx_ext_prdgen'] - - gfs_tasks += gdas_gfs_common_cleanup_tasks - - tasks = dict() - tasks['gdas'] = gdas_tasks - - if self.do_hybvar and 'gdas' in self.eupd_runs: - enkfgdas_tasks = hybrid_tasks + hybrid_after_eupd_tasks - tasks['enkfgdas'] = enkfgdas_tasks - - # Add RUN=gfs tasks if running early cycle - if self.interval_gfs > to_timedelta("0H"): - tasks['gfs'] = gfs_tasks + # Start with a dictionary of empty task lists for each valid run + task_names = {run: [] for run in self.runs} - if self.do_hybvar and 'gfs' in self.eupd_runs: - enkfgfs_tasks = hybrid_tasks + hybrid_after_eupd_tasks - enkfgfs_tasks.remove("echgres") - enkfgfs_tasks.remove("esnowrecen") - tasks['enkfgfs'] = enkfgfs_tasks + for run in self.runs: + options = self.run_options[run] - return tasks + # Common gdas and gfs tasks before fcst + if run in ['gdas', 'gfs']: + task_names[run] += ['prep'] + if options['do_jediatmvar']: + task_names[run] += ['prepatmiodaobs', 'atmanlinit', 'atmanlvar', 'atmanlfv3inc', 'atmanlfinal'] + else: + task_names[run] += ['anal'] + + if options['do_jediocnvar']: + task_names[run] += ['prepoceanobs', 'marineanlinit', 'marinebmat', 'marineanlvar'] + if options['do_hybvar']: + task_names[run] += ['marineanlletkf', 'ocnanalecen'] + task_names[run] += ['marineanlchkpt', 'marineanlfinal'] + if options['do_vrfy_oceanda']: + task_names[run] += ['ocnanalvrfy'] + + task_names[run] += ['sfcanl', 'analcalc'] + + if options['do_jedisnowda']: + task_names[run] += ['snowanl'] + + wave_prep_tasks = ['waveinit', 'waveprep'] + wave_bndpnt_tasks = ['wavepostbndpnt', 'wavepostbndpntbll'] + wave_post_tasks = ['wavepostsbs', 'wavepostpnt'] + + # gdas- and gfs-specific analysis tasks + if run == 'gdas': + if not options['do_jediatmvar']: + task_names[run] += ['analdiag'] + + if options['do_wave']: + task_names[run] += wave_prep_tasks + + if options['do_aero_anl']: + task_names[run] += ['aeroanlgenb'] + + else: + if options['do_wave']: + task_names[run] += wave_prep_tasks + + if options['do_aero_anl']: + task_names[run] += ['aeroanlinit', 'aeroanlvar', 'aeroanlfinal'] + + if options['do_prep_obs_aero']: + task_names[run] += ['prepobsaero'] + + # Staging is gdas-specific + if run == 'gdas': + task_names[run] += ['stage_ic'] + + task_names[run] += ['atmanlupp', 'atmanlprod', 'fcst'] + + # gfs-specific products + if run == 'gfs': + if options['do_ocean']: + task_names[run] += ['ocean_prod'] + + if options['do_ice']: + task_names[run] += ['ice_prod'] + + if options['do_upp']: + task_names[run] += ['atmupp'] + task_names[run] += ['atmos_prod'] + + # GOES post-processing (gfs only) + if run == 'gfs': + if options['do_goes']: + task_names[run] += ['goesupp'] + + # Only fit to obs and verify ozone and radiance during gdas cycles + if run == "gdas": + if options['do_fit2obs']: + task_names[run] += ['fit2obs'] + if options['do_verfozn']: + task_names[run] += ['verfozn'] + if options['do_verfrad']: + task_names[run] += ['verfrad'] + + if options['do_vminmon']: + task_names[run] += ['vminmon'] + + # gfs-only verification/tracking + if run == 'gfs': + if options['do_tracker']: + task_names[run] += ['tracker'] + + if options['do_genesis']: + task_names[run] += ['genesis'] + + if options['do_genesis_fsu']: + task_names[run] += ['genesis_fsu'] + + if options['do_metp']: + task_names[run] += ['metp'] + + if options['do_wave']: + if options['do_wave_bnd']: + task_names[run] += wave_bndpnt_tasks + task_names[run] += wave_post_tasks + # wave gempak and awips jobs are gfs-specific + if run == 'gfs': + if options['do_gempak']: + task_names[run] += ['wavegempak'] + if options['do_awips']: + task_names[run] += ['waveawipsbulls', 'waveawipsgridded'] + + # gdas- and gfs-specific downstream products + if run == 'gdas': + if options['do_gempak']: + task_names[run] += ['gempak', 'gempakmetancdc'] + else: + if options['do_bufrsnd']: + task_names[run] += ['postsnd'] + + if options['do_gempak']: + task_names[run] += ['gempak'] + task_names[run] += ['gempakmeta'] + task_names[run] += ['gempakncdcupapgif'] + if options['do_goes']: + task_names[run] += ['npoess_pgrb2_0p5deg'] + task_names[run] += ['gempakpgrb2spec'] + + if options['do_awips']: + task_names[run] += ['awips_20km_1p0deg', 'fbwind'] + + if options['do_mos']: + task_names[run] += ['mos_stn_prep', 'mos_grd_prep', 'mos_ext_stn_prep', 'mos_ext_grd_prep', + 'mos_stn_fcst', 'mos_grd_fcst', 'mos_ext_stn_fcst', 'mos_ext_grd_fcst', + 'mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', + 'mos_ext_grd_prdgen', 'mos_wx_prdgen', 'mos_wx_ext_prdgen'] + + # Last two items + task_names[run] += ['arch', 'cleanup'] + + # Ensemble tasks + elif 'enkf' in run: + + if options['do_jediatmens']: + task_names[run] += ['atmensanlinit', 'atmensanlfv3inc', 'atmensanlfinal'] + # Only run echgres for the gdas cycle + task_names[run] += ['echgres'] if 'gdas' in run else 0 + if options['lobsdiag_forenkf']: + task_names[run] += ['atmensanlobs', 'atmensanlsol'] + else: + task_names[run] += ['atmensanlletkf'] + + else: + task_names[run] += ['eobs', 'eupd'] + task_names[run].append('echgres') if 'gdas' in run else 0 + task_names[run] += ['ediag'] if options['lobsdiag_forenkf'] else ['eomg'] + task_names[run].append('esnowrecen') if options['do_jedisnowda'] and 'gdas' in run else 0 + + task_names[run] += ['stage_ic', 'ecen', 'esfc', 'efcs', 'epos', 'earc', 'cleanup'] + + return task_names diff --git a/workflow/applications/gfs_forecast_only.py b/workflow/applications/gfs_forecast_only.py index fb1d2cdb8f..fffdab6ef9 100644 --- a/workflow/applications/gfs_forecast_only.py +++ b/workflow/applications/gfs_forecast_only.py @@ -1,5 +1,6 @@ from applications.applications import AppConfig from wxflow import Configuration +from typing import Dict, Any class GFSForecastOnlyAppConfig(AppConfig): @@ -11,62 +12,70 @@ def __init__(self, conf: Configuration): super().__init__(conf) base = conf.parse_config('config.base') - self.aero_fcst_run = base.get('AERO_FCST_RUN', 'BOTH').lower() self.run = base.get('RUN', 'gfs') - self.exp_warm_start = base.get('EXP_WARM_START', False) + self.runs = [self.run] - def _get_app_configs(self): + def _get_run_options(self, conf: Configuration) -> Dict[str, Any]: + + run_options = super()._get_run_options(conf) + + run_options[self.run]['exp_warm_start'] = conf.parse_config('config.base').get('EXP_WARM_START', False) + + return run_options + + def _get_app_configs(self, run): """ Returns the config_files that are involved in the forecast-only app """ + options = self.run_options[run] configs = ['stage_ic', 'fcst', 'arch', 'cleanup'] - if self.do_atm: + if options['do_atm']: - if self.do_upp or self.do_goes: + if options['do_upp'] or options['do_goes']: configs += ['upp'] configs += ['atmos_products'] - if self.do_aero: - if not self.exp_warm_start: + if options['do_aero_fcst']: + if not options['exp_warm_start']: configs += ['aerosol_init'] - if self.do_tracker: + if options['do_tracker']: configs += ['tracker'] - if self.do_genesis: + if options['do_genesis']: configs += ['genesis'] - if self.do_genesis_fsu: + if options['do_genesis_fsu']: configs += ['genesis_fsu'] - if self.do_metp: + if options['do_metp']: configs += ['metp'] - if self.do_bufrsnd: + if options['do_bufrsnd']: configs += ['postsnd'] - if self.do_gempak: + if options['do_gempak']: configs += ['gempak'] - if self.do_awips: + if options['do_awips']: configs += ['awips'] - if self.do_ocean or self.do_ice: + if options['do_ocean'] or options['do_ice']: configs += ['oceanice_products'] - if self.do_wave: + if options['do_wave']: configs += ['waveinit', 'waveprep', 'wavepostsbs', 'wavepostpnt'] - if self.do_wave_bnd: + if options['do_wave_bnd']: configs += ['wavepostbndpnt', 'wavepostbndpntbll'] - if self.do_gempak: + if options['do_gempak']: configs += ['wavegempak'] - if self.do_awips: + if options['do_awips']: configs += ['waveawipsbulls', 'waveawipsgridded'] - if self.do_mos: + if options['do_mos']: configs += ['mos_stn_prep', 'mos_grd_prep', 'mos_ext_stn_prep', 'mos_ext_grd_prep', 'mos_stn_fcst', 'mos_grd_fcst', 'mos_ext_stn_fcst', 'mos_ext_grd_fcst', 'mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', 'mos_ext_grd_prdgen', @@ -90,66 +99,64 @@ def get_task_names(self): """ tasks = ['stage_ic'] + options = self.run_options[self.run] - if self.do_aero: - aero_fcst_run = self.aero_fcst_run - if self.run in aero_fcst_run or aero_fcst_run == "both": - if not self.exp_warm_start: - tasks += ['aerosol_init'] + if options['do_aero_fcst'] and not options['exp_warm_start']: + tasks += ['aerosol_init'] - if self.do_wave: + if options['do_wave']: tasks += ['waveinit'] # tasks += ['waveprep'] # TODO - verify if waveprep is executed in forecast-only mode when APP=ATMW|S2SW tasks += ['fcst'] - if self.do_atm: + if options['do_atm']: - if self.do_upp: + if options['do_upp']: tasks += ['atmupp'] tasks += ['atmos_prod'] - if self.do_goes: + if options['do_goes']: tasks += ['goesupp'] - if self.do_tracker: + if options['do_tracker']: tasks += ['tracker'] - if self.do_genesis: + if options['do_genesis']: tasks += ['genesis'] - if self.do_genesis_fsu: + if options['do_genesis_fsu']: tasks += ['genesis_fsu'] - if self.do_metp: + if options['do_metp']: tasks += ['metp'] - if self.do_bufrsnd: + if options['do_bufrsnd']: tasks += ['postsnd'] - if self.do_gempak: + if options['do_gempak']: tasks += ['gempak', 'gempakmeta', 'gempakncdcupapgif', 'gempakpgrb2spec'] - if self.do_awips: + if options['do_awips']: tasks += ['awips_20km_1p0deg', 'fbwind'] - if self.do_ocean: + if options['do_ocean']: tasks += ['ocean_prod'] - if self.do_ice: + if options['do_ice']: tasks += ['ice_prod'] - if self.do_wave: - if self.do_wave_bnd: + if options['do_wave']: + if options['do_wave_bnd']: tasks += ['wavepostbndpnt', 'wavepostbndpntbll'] tasks += ['wavepostsbs', 'wavepostpnt'] - if self.do_gempak: + if options['do_gempak']: tasks += ['wavegempak'] - if self.do_awips: + if options['do_awips']: tasks += ['waveawipsbulls', 'waveawipsgridded'] - if self.do_mos: + if options['do_mos']: tasks += ['mos_stn_prep', 'mos_grd_prep', 'mos_ext_stn_prep', 'mos_ext_grd_prep', 'mos_stn_fcst', 'mos_grd_fcst', 'mos_ext_stn_fcst', 'mos_ext_grd_fcst', 'mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', 'mos_ext_grd_prdgen', diff --git a/workflow/rocoto/gefs_tasks.py b/workflow/rocoto/gefs_tasks.py index e9338c90df..468ce01008 100644 --- a/workflow/rocoto/gefs_tasks.py +++ b/workflow/rocoto/gefs_tasks.py @@ -1,7 +1,6 @@ from applications.applications import AppConfig from rocoto.tasks import Tasks import rocoto.rocoto as rocoto -from datetime import datetime, timedelta class GEFSTasks(Tasks): @@ -44,10 +43,6 @@ def waveinit(self): return task def prep_emissions(self): - deps = [] - dep_dict = {'type': 'task', 'name': f'gefs_stage_ic'} - deps.append(rocoto.add_dependency(dep_dict)) - dependencies = rocoto.create_dependency(dep=deps) resources = self.get_resource('prep_emissions') task_name = 'gefs_prep_emissions' @@ -69,17 +64,17 @@ def fcst(self): dep_dict = {'type': 'task', 'name': f'gefs_stage_ic'} dependencies.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_wave: + if self.options['do_wave']: dep_dict = {'type': 'task', 'name': f'gefs_wave_init'} dependencies.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_aero: + if self.options['do_aero_fcst']: dep_dict = {'type': 'task', 'name': f'gefs_prep_emissions'} dependencies.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies) - num_fcst_segments = len(self.app_config.fcst_segments) - 1 + num_fcst_segments = len(self.options['fcst_segments']) - 1 fcst_vars = self.envars.copy() fcst_envars_dict = {'FCST_SEGMENT': '#seg#'} @@ -115,17 +110,17 @@ def efcs(self): dep_dict = {'type': 'task', 'name': f'gefs_stage_ic'} dependencies.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_wave: + if self.options['do_wave']: dep_dict = {'type': 'task', 'name': f'gefs_wave_init'} dependencies.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_aero: + if self.options['do_aero_fcst']: dep_dict = {'type': 'task', 'name': f'gefs_prep_emissions'} dependencies.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies) - num_fcst_segments = len(self.app_config.fcst_segments) - 1 + num_fcst_segments = len(self.options['fcst_segments']) - 1 resources = self.get_resource('efcs') # Kludge to work around bug in rocoto with serial metatasks nested @@ -434,7 +429,7 @@ def wavepostpnt(self): deps = [] dep_dict = {'type': 'metatask', 'name': f'gefs_fcst_mem#member#'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_wave_bnd: + if self.options['do_wave_bnd']: dep_dict = {'type': 'task', 'name': f'gefs_wave_post_bndpnt_bull_mem#member#'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) @@ -471,16 +466,16 @@ def wavepostpnt(self): def extractvars(self): deps = [] - if self.app_config.do_wave: + if self.options['do_wave']: dep_dict = {'type': 'task', 'name': 'gefs_wave_post_grid_mem#member#'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_ocean: + if self.options['do_ocean']: dep_dict = {'type': 'metatask', 'name': 'gefs_ocean_prod_#member#'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_ice: + if self.options['do_ice']: dep_dict = {'type': 'metatask', 'name': 'gefs_ice_prod_#member#'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_atm: + if self.options['do_atm']: dep_dict = {'type': 'metatask', 'name': 'gefs_atmos_prod_#member#'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) @@ -520,23 +515,23 @@ def arch(self): deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'metatask', 'name': 'gefs_atmos_ensstat'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_ice: + if self.options['do_ice']: dep_dict = {'type': 'metatask', 'name': 'gefs_ice_prod'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_ocean: + if self.options['do_ocean']: dep_dict = {'type': 'metatask', 'name': 'gefs_ocean_prod'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_wave: + if self.options['do_wave']: dep_dict = {'type': 'metatask', 'name': 'gefs_wave_post_grid'} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'metatask', 'name': 'gefs_wave_post_pnt'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_wave_bnd: + if self.options['do_wave_bnd']: dep_dict = {'type': 'metatask', 'name': 'gefs_wave_post_bndpnt'} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'metatask', 'name': 'gefs_wave_post_bndpnt_bull'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_extractvars: + if self.options['do_extractvars']: dep_dict = {'type': 'metatask', 'name': 'gefs_extractvars'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps, dep_condition='and') @@ -560,7 +555,7 @@ def arch(self): def cleanup(self): deps = [] - if self.app_config.do_extractvars: + if self.options['do_extractvars']: dep_dict = {'type': 'task', 'name': 'arch'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) @@ -569,18 +564,18 @@ def cleanup(self): deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'metatask', 'name': 'gefs_atmos_ensstat'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_ice: + if self.options['do_ice']: dep_dict = {'type': 'metatask', 'name': 'gefs_ice_prod'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_ocean: + if self.options['do_ocean']: dep_dict = {'type': 'metatask', 'name': 'gefs_ocean_prod'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_wave: + if self.options['do_wave']: dep_dict = {'type': 'metatask', 'name': 'gefs_wave_post_grid'} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'metatask', 'name': 'gefs_wave_post_pnt'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_wave_bnd: + if self.options['do_wave_bnd']: dep_dict = {'type': 'metatask', 'name': 'gefs_wave_post_bndpnt'} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'metatask', 'name': 'gefs_wave_post_bndpnt_bull'} diff --git a/workflow/rocoto/gefs_xml.py b/workflow/rocoto/gefs_xml.py index a5dfd5140e..f0ea407e34 100644 --- a/workflow/rocoto/gefs_xml.py +++ b/workflow/rocoto/gefs_xml.py @@ -16,7 +16,7 @@ def __init__(self, app_config: AppConfig, rocoto_config: Dict) -> None: def get_cycledefs(self): sdate = self._base['SDATE_GFS'] edate = self._base['EDATE'] - interval = self._app_config.interval_gfs + interval = self._base['interval_gfs'] sdate_str = sdate.strftime("%Y%m%d%H%M") edate_str = edate.strftime("%Y%m%d%H%M") interval_str = timedelta_to_HMS(interval) diff --git a/workflow/rocoto/gfs_cycled_xml.py b/workflow/rocoto/gfs_cycled_xml.py index eef77ba7fc..dfeefd1402 100644 --- a/workflow/rocoto/gfs_cycled_xml.py +++ b/workflow/rocoto/gfs_cycled_xml.py @@ -24,7 +24,7 @@ def get_cycledefs(self): sdate_str = sdate.strftime("%Y%m%d%H%M") strings.append(f'\t{sdate_str} {edate_str} {interval_str}') - interval_gfs = self._app_config.interval_gfs + interval_gfs = self._base['interval_gfs'] if interval_gfs > to_timedelta("0H"): sdate_gfs = self._base['SDATE_GFS'] diff --git a/workflow/rocoto/gfs_forecast_only_xml.py b/workflow/rocoto/gfs_forecast_only_xml.py index a4d5b0878b..018bdfaef2 100644 --- a/workflow/rocoto/gfs_forecast_only_xml.py +++ b/workflow/rocoto/gfs_forecast_only_xml.py @@ -14,15 +14,15 @@ def __init__(self, app_config: AppConfig, rocoto_config: Dict) -> None: def get_cycledefs(self): sdate_gfs = self._base['SDATE_GFS'] edate_gfs = self._base['EDATE'] - interval_gfs = self._app_config.interval_gfs + interval_gfs = self._base['interval_gfs'] strings = [] sdate_gfs_str = sdate_gfs.strftime("%Y%m%d%H%M") edate_gfs_str = edate_gfs.strftime("%Y%m%d%H%M") interval_gfs_str = timedelta_to_HMS(interval_gfs) strings.append(f'\t{sdate_gfs_str} {edate_gfs_str} {interval_gfs_str}') - date2 = sdate_gfs + interval_gfs - if date2 <= edate_gfs: + date2_gfs = sdate_gfs + interval_gfs + if date2_gfs <= edate_gfs: date2_gfs_str = date2_gfs.strftime("%Y%m%d%H%M") strings.append(f'\t{date2_gfs_str} {edate_gfs_str} {interval_gfs_str}') diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index 461241450e..616248c110 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -44,7 +44,7 @@ def prep(self): dump_path = self._template_to_rocoto_cycstring(self._base["COM_OBSDMP_TMPL"], {'DMPDIR': dmpdir, 'DUMP_SUFFIX': dump_suffix}) - gfs_enkf = True if self.app_config.do_hybvar and 'gfs' in self.app_config.eupd_runs else False + gfs_enkf = True if self.options['do_hybvar'] and 'gfs' in self.app_config.ens_runs else False deps = [] dep_dict = {'type': 'metatask', 'name': 'gdas_atmos_prod', 'offset': f"-{timedelta_to_HMS(self._base['interval_gdas'])}"} @@ -58,7 +58,7 @@ def prep(self): dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) cycledef = self.run - if self.run in ['gfs'] and gfs_enkf and self.app_config.interval_gfs != 6: + if self.run in ['gfs'] and gfs_enkf and self._base['INTERVAL_GFS'] != 6: cycledef = 'gdas' resources = self.get_resource('prep') @@ -148,9 +148,9 @@ def aerosol_init(self): # Calculate offset based on RUN = gfs | gdas interval = None if self.run in ['gfs']: - interval = self._base['INTERVAL_GFS'] + interval = self._base['interval_gfs'] elif self.run in ['gdas']: - interval = self._base['INTERVAL'] + interval = self._base['interval'] offset = timedelta_to_HMS(-interval) # Files from previous cycle @@ -187,7 +187,7 @@ def anal(self): deps = [] dep_dict = {'type': 'task', 'name': f'{self.run}_prep'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_hybvar: + if self.options['do_hybvar']: dep_dict = {'type': 'metatask', 'name': 'enkfgdas_epmn', 'offset': f"-{timedelta_to_HMS(self._base['interval_gdas'])}"} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) @@ -214,12 +214,12 @@ def anal(self): def sfcanl(self): deps = [] - if self.app_config.do_jediatmvar: + if self.options['do_jediatmvar']: dep_dict = {'type': 'task', 'name': f'{self.run}_atmanlfinal'} else: dep_dict = {'type': 'task', 'name': f'{self.run}_anal'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_jedisnowda: + if self.options['do_jedisnowda']: dep_dict = {'type': 'task', 'name': f'{self.run}_snowanl'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) @@ -246,14 +246,14 @@ def sfcanl(self): def analcalc(self): deps = [] - if self.app_config.do_jediatmvar: + if self.options['do_jediatmvar']: dep_dict = {'type': 'task', 'name': f'{self.run}_atmanlfinal'} else: dep_dict = {'type': 'task', 'name': f'{self.run}_anal'} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'task', 'name': f'{self.run}_sfcanl'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_hybvar and self.run in ['gdas']: + if self.options['do_hybvar'] and self.run in ['gdas']: dep_dict = {'type': 'task', 'name': 'enkfgdas_echgres', 'offset': f"-{timedelta_to_HMS(self._base['interval_gdas'])}"} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) @@ -328,7 +328,7 @@ def atmanlinit(self): deps = [] dep_dict = {'type': 'task', 'name': f'{self.run}_prepatmiodaobs'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_hybvar: + if self.options['do_hybvar']: dep_dict = {'type': 'metatask', 'name': 'enkfgdas_epmn', 'offset': f"-{timedelta_to_HMS(self._base['interval_gdas'])}"} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) @@ -336,7 +336,7 @@ def atmanlinit(self): dependencies = rocoto.create_dependency(dep=deps) interval_gfs = self._base["INTERVAL_GFS"] - gfs_enkf = True if self.app_config.do_hybvar and 'gfs' in self.app_config.eupd_runs else False + gfs_enkf = True if self.options['do_hybvar'] and 'gfs' in self.app_config.ens_runs else False cycledef = self.run if self.run in ['gfs'] and gfs_enkf and interval_gfs != 6: @@ -486,7 +486,7 @@ def aeroanlinit(self): dep_dict = {'type': 'task', 'name': f'{self.run}_prep'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_prep_obs_aero: + if self.options['do_prep_obs_aero']: dep_dict = {'type': 'task', 'name': f'{self.run}_prepobsaero'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) @@ -770,12 +770,12 @@ def ocnanalecen(self): def marineanlchkpt(self): deps = [] - if self.app_config.do_hybvar: + if self.options['do_hybvar']: dep_dict = {'type': 'task', 'name': f'{self.run}_ocnanalecen'} else: dep_dict = {'type': 'task', 'name': f'{self.run}_marineanlvar'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_mergensst: + if self.options['do_mergensst']: data = f'&ROTDIR;/{self.run}.@Y@m@d/@H/atmos/{self.run}.t@Hz.sfcanl.nc' dep_dict = {'type': 'data', 'data': data} deps.append(rocoto.add_dependency(dep_dict)) @@ -866,18 +866,16 @@ def _fcst_forecast_only(self): dep_dict = {'type': 'task', 'name': f'{self.run}_stage_ic'} dependencies.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_wave and self.run in self.app_config.wave_runs: - wave_job = 'waveprep' if self.app_config.model_app in ['ATMW'] else 'waveinit' + if self.options['do_wave']: + wave_job = 'waveprep' if self.options['app'] in ['ATMW'] else 'waveinit' dep_dict = {'type': 'task', 'name': f'{self.run}_{wave_job}'} dependencies.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_aero and \ - self.run in self.app_config.aero_fcst_runs and \ - not self._base['EXP_WARM_START']: + if self.options['do_aero_fcst'] and not self._base['EXP_WARM_START']: # Calculate offset based on RUN = gfs | gdas interval = None if self.run in ['gfs']: - interval = to_timedelta(f"{self._base['INTERVAL_GFS']}H") + interval = self._base['interval_gfs'] elif self.run in ['gdas']: interval = self._base['assim_freq'] offset = timedelta_to_HMS(-interval) @@ -891,7 +889,7 @@ def _fcst_forecast_only(self): dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies) if self.run in ['gfs']: - num_fcst_segments = len(self.app_config.fcst_segments) - 1 + num_fcst_segments = len(self.options['fcst_segments']) - 1 else: num_fcst_segments = 1 @@ -930,15 +928,15 @@ def _fcst_cycled(self): dep = rocoto.add_dependency(dep_dict) dependencies = rocoto.create_dependency(dep=dep) - if self.app_config.do_jediocnvar: + if self.options['do_jediocnvar']: dep_dict = {'type': 'task', 'name': f'{self.run}_marineanlfinal'} dependencies.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_aero and self.run in self.app_config.aero_anl_runs: + if self.options['do_aero_anl']: dep_dict = {'type': 'task', 'name': f'{self.run}_aeroanlfinal'} dependencies.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_jedisnowda: + if self.options['do_jedisnowda']: dep_dict = {'type': 'task', 'name': f'{self.run}_snowanl'} dependencies.append(rocoto.add_dependency(dep_dict)) @@ -949,7 +947,7 @@ def _fcst_cycled(self): dependencies.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='or', dep=dependencies) - if self.app_config.do_wave and self.run in self.app_config.wave_runs: + if self.options['do_wave']: dep_dict = {'type': 'task', 'name': f'{self.run}_waveprep'} dependencies.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies) @@ -957,7 +955,7 @@ def _fcst_cycled(self): cycledef = 'gdas_half,gdas' if self.run in ['gdas'] else self.run if self.run in ['gfs']: - num_fcst_segments = len(self.app_config.fcst_segments) - 1 + num_fcst_segments = len(self.options['fcst_segments']) - 1 else: num_fcst_segments = 1 @@ -1264,7 +1262,7 @@ def wavepostpnt(self): deps = [] dep_dict = {'type': 'metatask', 'name': f'{self.run}_fcst'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_wave_bnd: + if self.options['do_wave_bnd']: dep_dict = {'type': 'task', 'name': f'{self.run}_wavepostbndpntbll'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) @@ -1834,8 +1832,8 @@ def metp(self): deps = [] dep_dict = {'type': 'task', 'name': f'{self.run}_arch'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.interval_gfs < to_timedelta('24H'): - n_lookback = self.app_config.interval_gfs // to_timedelta('6H') + if self._base["interval_gfs"] < to_timedelta("24H"): + n_lookback = self._base["interval_gfs"] // to_timedelta("6H") for lookback in range(1, n_lookback + 1): deps2 = [] dep_dict = {'type': 'taskvalid', 'name': f'{self.run}_arch', 'condition': 'not'} @@ -2242,54 +2240,54 @@ def arch(self): if self.run in ['gfs']: dep_dict = {'type': 'task', 'name': f'{self.run}_atmanlprod'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_vminmon: + if self.options['do_vminmon']: dep_dict = {'type': 'task', 'name': f'{self.run}_vminmon'} deps.append(rocoto.add_dependency(dep_dict)) elif self.run in ['gdas']: dep_dict = {'type': 'task', 'name': f'{self.run}_atmanlprod'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_fit2obs: + if self.options['do_fit2obs']: dep_dict = {'type': 'task', 'name': f'{self.run}_fit2obs'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_verfozn: + if self.options['do_verfozn']: dep_dict = {'type': 'task', 'name': f'{self.run}_verfozn'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_verfrad: + if self.options['do_verfrad']: dep_dict = {'type': 'task', 'name': f'{self.run}_verfrad'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_vminmon: + if self.options['do_vminmon']: dep_dict = {'type': 'task', 'name': f'{self.run}_vminmon'} deps.append(rocoto.add_dependency(dep_dict)) - if self.run in ['gfs'] and self.app_config.do_tracker: + if self.run in ['gfs'] and self.options['do_tracker']: dep_dict = {'type': 'task', 'name': f'{self.run}_tracker'} deps.append(rocoto.add_dependency(dep_dict)) - if self.run in ['gfs'] and self.app_config.do_genesis: + if self.run in ['gfs'] and self.options['do_genesis']: dep_dict = {'type': 'task', 'name': f'{self.run}_genesis'} deps.append(rocoto.add_dependency(dep_dict)) - if self.run in ['gfs'] and self.app_config.do_genesis_fsu: + if self.run in ['gfs'] and self.options['do_genesis_fsu']: dep_dict = {'type': 'task', 'name': f'{self.run}_genesis_fsu'} deps.append(rocoto.add_dependency(dep_dict)) # Post job dependencies dep_dict = {'type': 'metatask', 'name': f'{self.run}_atmos_prod'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_wave: + if self.options['do_wave']: dep_dict = {'type': 'task', 'name': f'{self.run}_wavepostsbs'} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'task', 'name': f'{self.run}_wavepostpnt'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_wave_bnd: + if self.options['do_wave_bnd']: dep_dict = {'type': 'task', 'name': f'{self.run}_wavepostbndpnt'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_ocean: + if self.options['do_ocean']: if self.run in ['gfs']: dep_dict = {'type': 'metatask', 'name': f'{self.run}_ocean_prod'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_ice: + if self.options['do_ice']: if self.run in ['gfs']: dep_dict = {'type': 'metatask', 'name': f'{self.run}_ice_prod'} deps.append(rocoto.add_dependency(dep_dict)) # MOS job dependencies - if self.run in ['gfs'] and self.app_config.do_mos: + if self.run in ['gfs'] and self.options['do_mos']: mos_jobs = ["stn_prep", "grd_prep", "ext_stn_prep", "ext_grd_prep", "stn_fcst", "grd_fcst", "ext_stn_fcst", "ext_grd_fcst", "stn_prdgen", "grd_prdgen", "ext_stn_prdgen", "ext_grd_prdgen", @@ -2327,7 +2325,7 @@ def cleanup(self): dep_dict = {'type': 'task', 'name': f'{self.run}_arch'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_gempak: + if self.options['do_gempak']: if self.run in ['gdas']: dep_dict = {'type': 'task', 'name': f'{self.run}_gempakmetancdc'} deps.append(rocoto.add_dependency(dep_dict)) @@ -2336,13 +2334,13 @@ def cleanup(self): deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'task', 'name': f'{self.run}_gempakncdcupapgif'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_goes: + if self.options['do_goes']: dep_dict = {'type': 'metatask', 'name': f'{self.run}_gempakgrb2spec'} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'task', 'name': f'{self.run}_npoess_pgrb2_0p5deg'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_metp and self.run in ['gfs']: + if self.options['do_metp'] and self.run in ['gfs']: deps2 = [] # taskvalid only handles regular tasks, so just check the first metp job exists dep_dict = {'type': 'taskvalid', 'name': f'{self.run}_metpg2g1', 'condition': 'not'} @@ -2457,7 +2455,7 @@ def ediag(self): def eupd(self): deps = [] - if self.app_config.lobsdiag_forenkf: + if self.options['lobsdiag_forenkf']: dep_dict = {'type': 'task', 'name': f'{self.run}_ediag'} else: dep_dict = {'type': 'metatask', 'name': f'{self.run}_eomg'} @@ -2588,7 +2586,7 @@ def atmensanlletkf(self): def atmensanlfv3inc(self): deps = [] - if self.app_config.lobsdiag_forenkf: + if self.options['lobsdiag_forenkf']: dep_dict = {'type': 'task', 'name': f'{self.run}_atmensanlsol'} else: dep_dict = {'type': 'task', 'name': f'{self.run}_atmensanlletkf'} @@ -2666,7 +2664,7 @@ def _get_ecengroups(): deps = [] dep_dict = {'type': 'task', 'name': f'{self.run.replace("enkf","")}_analcalc'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_jediatmens: + if self.options['do_jediatmens']: dep_dict = {'type': 'task', 'name': f'{self.run}_atmensanlfinal'} else: dep_dict = {'type': 'task', 'name': f'{self.run}_eupd'} @@ -2707,17 +2705,15 @@ def _get_ecengroups(): def esfc(self): - # eupd_run = 'gdas' if 'gdas' in self.app_config.eupd_runs else 'gfs' - deps = [] dep_dict = {'type': 'task', 'name': f'{self.run.replace("enkf","")}_analcalc'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_jediatmens: + if self.options['do_jediatmens']: dep_dict = {'type': 'task', 'name': f'{self.run}_atmensanlfinal'} else: dep_dict = {'type': 'task', 'name': f'{self.run}_eupd'} deps.append(rocoto.add_dependency(dep_dict)) - if self.app_config.do_jedisnowda: + if self.options['do_jedisnowda']: dep_dict = {'type': 'task', 'name': f'{self.run}_esnowrecen'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) diff --git a/workflow/rocoto/tasks.py b/workflow/rocoto/tasks.py index df56f90718..2aee48835f 100644 --- a/workflow/rocoto/tasks.py +++ b/workflow/rocoto/tasks.py @@ -10,7 +10,7 @@ class Tasks: - SERVICE_TASKS = ['arch', 'earc'] + SERVICE_TASKS = ['arch', 'earc', 'stage_ic', 'cleanup'] VALID_TASKS = ['aerosol_init', 'stage_ic', 'prep', 'anal', 'sfcanl', 'analcalc', 'analdiag', 'arch', "cleanup", 'prepatmiodaobs', 'atmanlinit', 'atmanlvar', 'atmanlfv3inc', 'atmanlfinal', @@ -44,6 +44,9 @@ def __init__(self, app_config: AppConfig, run: str) -> None: # Get the configs for the specified RUN self._configs = self.app_config.configs[run] + # Get the workflow options for the specified RUN + self.options = self.app_config.run_options[run] + # Update the base config for the application self._configs['base'] = self.app_config._update_base(self._configs['base']) @@ -245,6 +248,6 @@ def get_task(self, task_name, *args, **kwargs): try: return getattr(self, task_name, *args, **kwargs)() except AttributeError: - raise AttributeError(f'"{task_name}" is not a valid task.\n' + - 'Valid tasks are:\n' + + raise AttributeError(f'"{task_name}" is not a valid task.\n' + f'Valid tasks are:\n' f'{", ".join(Tasks.VALID_TASKS)}') diff --git a/workflow/rocoto/workflow_xml.py b/workflow/rocoto/workflow_xml.py index 3ad7c4bd91..bed19ad5ee 100644 --- a/workflow/rocoto/workflow_xml.py +++ b/workflow/rocoto/workflow_xml.py @@ -7,6 +7,7 @@ from typing import Dict from applications.applications import AppConfig from rocoto.workflow_tasks import get_wf_tasks +from wxflow import to_timedelta import rocoto.rocoto as rocoto from abc import ABC, abstractmethod @@ -18,8 +19,10 @@ def __init__(self, app_config: AppConfig, rocoto_config: Dict) -> None: self._app_config = app_config self.rocoto_config = rocoto_config - # Use the generic config.base (without RUN specified) - self._base = self._app_config.configs['_no_run']['base'] + # Use the first config.base (sourced with an arbitrary RUN) + self._base = self._app_config.configs[next(iter(self._app_config.configs))]['base'] + self._base['interval_gdas'] = to_timedelta(f'{self._base["assim_freq"]}H') + self._base['interval_gfs'] = to_timedelta(f'{self._base["INTERVAL_GFS"]}H') self.preamble = self._get_preamble() self.definitions = self._get_definitions()