Skip to content

Commit

Permalink
Additions to allow APP to differ by RUN
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidHuber-NOAA committed Sep 19, 2024
1 parent 7588d2b commit c0102bf
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 34 deletions.
31 changes: 24 additions & 7 deletions parm/config/gfs/config.base
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,25 @@ export SENDDBN_NTC=${SENDDBN_NTC:-"NO"}
export SENDDBN=${SENDDBN:-"NO"}
export DBNROOT=${DBNROOT:-${UTILROOT:-}/fakedbn}

# APP settings
export APP=@APP@
# APP settings; configurable by RUN
case "${RUN}" in
"gfs")
export APP=@APP@
;;
"gdas")
export APP=@APP@
;;
"enkfgfs")
export APP=@APP@
;;
"enkfgdas")
export APP=@APP@
;;
*)
echo "FATAL ERROR: Unrecognized RUN (${RUN})!"
exit 1
;;
esac

shopt -s extglob
# Adjust APP based on RUN
Expand Down Expand Up @@ -217,7 +234,7 @@ case "${CASE}" in
;;
*)
echo "FATAL ERROR: Unrecognized CASE ${CASE}, ABORT!"
exit 1
exit 2
;;
esac

Expand Down Expand Up @@ -256,8 +273,8 @@ case "${APP}" in
fi
;;
*)
echo "Unrecognized APP: '${APP}'"
exit 1
echo "FATAL ERROR: Unrecognized APP: '${APP}'"
exit 3
;;
esac

Expand Down Expand Up @@ -461,8 +478,8 @@ export FHMAX_FITS=132
export HPSSARCH="@HPSSARCH@" # save data to HPSS archive
export LOCALARCH="@LOCALARCH@" # save data to local archive
if [[ ${HPSSARCH} = "YES" ]] && [[ ${LOCALARCH} = "YES" ]]; then
echo "Both HPSS and local archiving selected. Please choose one or the other."
exit 2
echo "FATAL ERROR: Both HPSS and local archiving selected. Please choose one or the other."
exit 4
fi
export ARCH_CYC=00 # Archive data at this cycle for warm_start capability
export ARCH_WARMICFREQ=4 # Archive frequency in days for warm_start capability
Expand Down
47 changes: 31 additions & 16 deletions workflow/applications/applications.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ def __init__(self, conf: Configuration) -> None:
f'{", ".join(self.VALID_MODES)}\n')

self.net = base['NET']
self.model_app = base.get('APP', 'ATM')
self.do_atm = base.get('DO_ATM', True)
self.do_wave = base.get('DO_WAVE', False)
self.do_wave_bnd = base.get('DOBNDPNT_WAVE', False)
Expand All @@ -63,6 +62,7 @@ def __init__(self, conf: Configuration) -> None:
self.do_goes = base.get('DO_GOES', False)
self.do_mos = base.get('DO_MOS', False)
self.do_extractvars = base.get('DO_EXTRACTVARS', False)
self.gfs_cyc = base.get('gfs_cyc')

self.do_hpssarch = base.get('HPSSARCH', False)

Expand Down Expand Up @@ -97,28 +97,27 @@ def __init__(self, conf: Configuration) -> None:
def _init_finalize(self, conf: Configuration):
print("Finalizing initialize")

# Get a list of all possible config_files that would be part of the application
# Get a list of all possible config files that would be part of the application
self.configs_names = self._get_app_configs()

# Source the config files for the jobs in the application without specifying a RUN
self.configs = {'_no_run': self._source_configs(conf)}

# Update the base config dictionary based on application
self.configs['_no_run']['base'] = self._update_base(self.configs['_no_run']['base'])
# Get the list of valid runs for the configuration
self.runs = self.get_valid_runs()

# Save base in the internal state since it is often needed
base = self.configs['_no_run']['base']
# Initialize the task_names, configs, and model_apps dictionaries
self.task_names = dict.fromkeys(self.runs)
self.model_apps = dict.fromkeys(self.runs)
self.configs = dict.fromkeys(self.runs)

# Get more configuration options into the class attributes
self.gfs_cyc = base.get('gfs_cyc')
# Now configure the experiment for each valid run
for run in self.runs:

# Get task names for the application
self.task_names = self.get_task_names()
# Get task names, configs, and APPs for the application
self.task_names[run] = self.get_task_names(run)

# Finally, source the configuration files for each valid `RUN`
for run in self.task_names.keys():
self.configs[run] = self._source_configs(conf, run=run, log=False)

self.model_apps[run] = self.configs[run]['base'].get('APP', 'ATM')

# Update the base config dictionary based on application and RUN
self.configs[run]['base'] = self._update_base(self.configs[run]['base'])

Expand Down Expand Up @@ -184,7 +183,23 @@ def _source_configs(self, conf: Configuration, run: str = "gfs", log: bool = Tru
return configs

@abstractmethod
def get_task_names(self, run="_no_run") -> Dict[str, List[str]]:
def get_valid_runs(self) -> List[str]:
'''
Create a list of RUNs for the configuation.
Parameters
----------
None
Returns
-------
Dict[str, List[str]]: Lists of tasks for each RUN.
'''
pass

@abstractmethod
def get_task_names(self, run: str) -> Dict[str, List[str]]:
'''
Create a list of task names for each RUN valid for the configuation.
Expand Down
12 changes: 10 additions & 2 deletions workflow/applications/gefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,15 @@ def _update_base(base_in):

return base_out

def get_task_names(self):
def get_valid_runs(self):
"""
Return the GEFS RUN.
"""

# Only one RUN (should be gefs) is allowed as specified in __init__
return [self.run]

def get_task_names(self, run):

tasks = ['stage_ic']

Expand Down Expand Up @@ -84,4 +92,4 @@ def get_task_names(self):

tasks += ['arch', 'cleanup']

return {f"{self.run}": tasks}
return tasks
29 changes: 27 additions & 2 deletions workflow/applications/gfs_cycled.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,28 @@ def _update_base(base_in):

return GFSCycledAppConfig.get_gfs_cyc_dates(base_in)

def get_task_names(self):
def get_valid_runs(self):
"""
Get a list of valid RUNs in cycled MODE.
"""

# The gdas run is always present for the cycled application
runs = ["gdas"]

# Are we running the early cycle deterministic forecast?
if self.gfs_cyc > 0:
runs.append("gfs")

# Ensembles? Add the valid run based on eupd_runs.
if self.do_hybvar:
if 'gdas' in self.eupd_runs:
runs.append("enkfgdas")
if 'gfs' in self.eupd_runs:
runs.append("enkfgfs")

return runs

def get_task_names(self, run: str):
"""
Get the task names for all the tasks in the cycled application.
Note that the order of the task names matters in the XML.
Expand Down Expand Up @@ -306,7 +327,11 @@ def get_task_names(self):
enkfgfs_tasks.remove("esnowrecen")
tasks['enkfgfs'] = enkfgfs_tasks

return tasks
if run not in tasks:
raise KeyError(f"FATAL ERROR: GFS cycled experiment is not configured "
f"for the input run ({run})")

return tasks[run]

@staticmethod
def get_gfs_cyc_dates(base: Dict[str, Any]) -> Dict[str, Any]:
Expand Down
12 changes: 10 additions & 2 deletions workflow/applications/gfs_forecast_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,15 @@ def _update_base(base_in):

return base_out

def get_task_names(self):
def get_valid_runs(self):
"""
Return the specified RUN for forecast-only MODE.
"""

# Only one RUN is allowed for forecast-only mode as specified in __init__
return [self.run]

def get_task_names(self, run: str):
"""
Get the task names for all the tasks in the forecast-only application.
Note that the order of the task names matters in the XML.
Expand Down Expand Up @@ -157,4 +165,4 @@ def get_task_names(self):

tasks += ['arch', 'cleanup'] # arch and cleanup **must** be the last tasks

return {f"{self.run}": tasks}
return tasks
6 changes: 3 additions & 3 deletions workflow/rocoto/gfs_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -855,8 +855,8 @@ def fcst(self):
try:
task = fcst_map[self.app_config.mode]()
except KeyError:
raise NotImplementedError(f'{self.app_config.mode} is not a valid type.\n' +
'Currently supported forecast types are:\n' +
raise NotImplementedError(f'{self.app_config.mode} is not a valid type.\n'
f'Currently supported forecast types are:\n'
f'{" | ".join(fcst_map.keys())}')

return task
Expand All @@ -868,7 +868,7 @@ def _fcst_forecast_only(self):
dependencies.append(rocoto.add_dependency(dep_dict))

if self.app_config.do_wave and self.run in self.app_config.wave_runs:
wave_job = 'waveprep' if self.app_config.model_app in ['ATMW'] else 'waveinit'
wave_job = 'waveprep' if self.app_config.model_apps[self.run] in ['ATMW'] else 'waveinit'
dep_dict = {'type': 'task', 'name': f'{self.run}{wave_job}'}
dependencies.append(rocoto.add_dependency(dep_dict))

Expand Down
4 changes: 2 additions & 2 deletions workflow/rocoto/workflow_xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ def __init__(self, app_config: AppConfig, rocoto_config: Dict) -> None:
self._app_config = app_config
self.rocoto_config = rocoto_config

# Use the generic config.base (without RUN specified)
self._base = self._app_config.configs['_no_run']['base']
# Use the first config.base (sourced with an arbitrary RUN)
self._base = self._app_config.configs[next(iter(self._app_config.configs))]['base']

self.preamble = self._get_preamble()
self.definitions = self._get_definitions()
Expand Down

0 comments on commit c0102bf

Please sign in to comment.