From e62a3a73cd22701962b7bf210fdfee37f37a5474 Mon Sep 17 00:00:00 2001 From: "Henry R. Winterbottom" <49202169+HenryWinterbottom-NOAA@users.noreply.github.com> Date: Tue, 5 Dec 2023 09:08:12 -0700 Subject: [PATCH] Splits AWIPS jobs into seperate tasks (#2094) This PR addresses issue #1228. The following is accomplished: - Separate rocoto jobs have been created beneath `jobs/rocoto/` -- `awips_20sh` and `awips_g2sh`; these jobs replace `awips.sh` which was calling multiple J-jobs within the respective `awips.sh` scripts; - New tasks has been added to `workflow/rocoto/tasks.py` for the new AWIPS scripts; - The `gfs_cycled` and `gfs_forecast_only` modules beneath `workflow/rocoto` have been updated accordingly. Resolves #1228 --- .../rocoto/{awips.sh => awips_20km_1p0deg.sh} | 20 ++-- jobs/rocoto/awips_g2.sh | 57 ++++++++++++ parm/config/gfs/config.awips | 6 +- workflow/applications/gfs_cycled.py | 3 +- workflow/applications/gfs_forecast_only.py | 3 +- workflow/rocoto/gfs_tasks.py | 91 ++++++++++++------- workflow/rocoto/tasks.py | 2 +- 7 files changed, 130 insertions(+), 52 deletions(-) rename jobs/rocoto/{awips.sh => awips_20km_1p0deg.sh} (77%) create mode 100755 jobs/rocoto/awips_g2.sh diff --git a/jobs/rocoto/awips.sh b/jobs/rocoto/awips_20km_1p0deg.sh similarity index 77% rename from jobs/rocoto/awips.sh rename to jobs/rocoto/awips_20km_1p0deg.sh index 57fbd92572..e1bf623883 100755 --- a/jobs/rocoto/awips.sh +++ b/jobs/rocoto/awips_20km_1p0deg.sh @@ -19,14 +19,13 @@ source "${HOMEgfs}/ush/load_fv3gfs_modules.sh" status=$? (( status != 0 )) && exit "${status}" -export job="awips" +export job="awips_20km_1p0deg" export jobid="${job}.$$" -# TODO (#1228) - This script is doing more than just calling a j-job -# Also, this forces us to call the config files here instead of the j-job source "${HOMEgfs}/ush/jjob_header.sh" -e "awips" -c "base awips" -fhrlst=$(echo ${FHRLST} | sed -e 's/_/ /g; s/f/ /g; s/,/ /g') +# shellcheck disable=SC2153 +fhrlst=$(echo "${FHRLST}" | sed -e 's/_/ /g; s/f/ /g; s/,/ /g') ############################################################### @@ -45,12 +44,8 @@ for fhr3 in ${fhrlst}; do fhmax=84 if (( fhr >= fhmin && fhr <= fhmax )); then if ((fhr % 3 == 0)); then - export fcsthrs=${fhr3} - ${AWIPS20SH} - fi - - if ((fhr % 6 == 0)); then - ${AWIPSG2SH} + export fcsthrs="${fhr3}" + "${AWIPS20KM1P0DEGSH}" fi fi @@ -58,9 +53,8 @@ for fhr3 in ${fhrlst}; do fhmax=240 if (( fhr >= fhmin && fhr <= fhmax )); then if ((fhr % 6 == 0)); then - export fcsthrs=${fhr3} - ${AWIPS20SH} - ${AWIPSG2SH} + export fcsthrs="${fhr3}" + "${AWIPS20KM1P0DEGSH}" fi fi done diff --git a/jobs/rocoto/awips_g2.sh b/jobs/rocoto/awips_g2.sh new file mode 100755 index 0000000000..121c96d63f --- /dev/null +++ b/jobs/rocoto/awips_g2.sh @@ -0,0 +1,57 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/preamble.sh" + +############################################################### +## Abstract: +## Inline awips driver script +## HOMEgfs : /full/path/to/workflow +## EXPDIR : /full/path/to/config/files +## CDATE : current analysis date (YYYYMMDDHH) +## CDUMP : cycle name (gdas / gfs) +## PDY : current date (YYYYMMDD) +## cyc : current cycle (HH) +############################################################### + +############################################################### +# Source FV3GFS workflow modules +source "${HOMEgfs}/ush/load_fv3gfs_modules.sh" +status=$? +(( status != 0 )) && exit "${status}" + +export job="awips_g2" +export jobid="${job}.$$" + +source "${HOMEgfs}/ush/jjob_header.sh" -e "awips" -c "base awips" + +# shellcheck disable=SC2153 +fhrlst=$(echo "${FHRLST}" | sed -e "s/_/ /g; s/f/ /g; s/,/ /g") + +############################################################### + +################################################################################ +echo +echo "=============== BEGIN AWIPS ===============" + +for fhr3 in ${fhrlst}; do + fhr=$(( 10#${fhr3} )) + if (( fhr > FHMAX_GFS )); then + echo "Nothing to process for FHR = ${fhr3}, cycle" + continue + fi + + fhmin=0 + fhmax=240 + if (( fhr >= fhmin && fhr <= fhmax )); then + if ((fhr % 6 == 0)); then + "${AWIPSG2SH}" + fi + fi +done + + +############################################################### +# Force Exit out cleanly +if [[ ${KEEPDATA:-"NO"} == "NO" ]] ; then rm -rf "${DATA}" ; fi + +exit 0 diff --git a/parm/config/gfs/config.awips b/parm/config/gfs/config.awips index 9003e9f6b0..3b78d4bb4b 100644 --- a/parm/config/gfs/config.awips +++ b/parm/config/gfs/config.awips @@ -6,10 +6,10 @@ echo "BEGIN: config.awips" # Get task specific resources -. $EXPDIR/config.resources awips +. "${EXPDIR}/config.resources" awips -export AWIPS20SH=$HOMEgfs/jobs/JGFS_ATMOS_AWIPS_20KM_1P0DEG -export AWIPSG2SH=$HOMEgfs/jobs/JGFS_ATMOS_AWIPS_G2 +export AWIPS20KM1P0DEGSH="${HOMEgfs}/jobs/JGFS_ATMOS_AWIPS_20KM_1P0DEG" +export AWIPSG2SH="${HOMEgfs}/jobs/JGFS_ATMOS_AWIPS_G2" # No. of concurrent awips jobs export NAWIPSGRP=42 diff --git a/workflow/applications/gfs_cycled.py b/workflow/applications/gfs_cycled.py index 2438a8244e..4e0897f739 100644 --- a/workflow/applications/gfs_cycled.py +++ b/workflow/applications/gfs_cycled.py @@ -234,7 +234,8 @@ def get_task_names(self): gfs_tasks += ['gempak'] if self.do_awips: - gfs_tasks += ['awips'] + gfs_tasks += ['awips_20km_1p0deg'] + gfs_tasks += ['awips_g2'] gfs_tasks += ['fbwinds'] if self.do_npoess: diff --git a/workflow/applications/gfs_forecast_only.py b/workflow/applications/gfs_forecast_only.py index 01d31b08ca..1790c86a2c 100644 --- a/workflow/applications/gfs_forecast_only.py +++ b/workflow/applications/gfs_forecast_only.py @@ -119,7 +119,8 @@ def get_task_names(self): tasks += ['gempak'] if self.do_awips: - tasks += ['awips'] + tasks += ['awips_20km_1p0deg'] + tasks += ['awips_g2'] tasks += ['fbwinds'] if self.do_wafs: diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index a842b4704b..f69579b068 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -822,43 +822,68 @@ def fbwinds(self): return task - def awips(self): + @staticmethod + def _get_awipsgroups(cdump, config): - def _get_awipsgroups(cdump, config): + fhmin = config['FHMIN'] + fhmax = config['FHMAX'] + fhout = config['FHOUT'] - fhmin = config['FHMIN'] - fhmax = config['FHMAX'] - fhout = config['FHOUT'] + # Get a list of all forecast hours + fhrs = [] + if cdump in ['gdas']: + fhrs = range(fhmin, fhmax + fhout, fhout) + elif cdump in ['gfs']: + fhmax = np.max( + [config['FHMAX_GFS_00'], config['FHMAX_GFS_06'], config['FHMAX_GFS_12'], config['FHMAX_GFS_18']]) + fhout = config['FHOUT_GFS'] + fhmax_hf = config['FHMAX_HF_GFS'] + fhout_hf = config['FHOUT_HF_GFS'] + if fhmax > 240: + fhmax = 240 + if fhmax_hf > 240: + fhmax_hf = 240 + fhrs_hf = list(range(fhmin, fhmax_hf + fhout_hf, fhout_hf)) + fhrs = fhrs_hf + list(range(fhrs_hf[-1] + fhout, fhmax + fhout, fhout)) + + nawipsgrp = config['NAWIPSGRP'] + ngrps = nawipsgrp if len(fhrs) > nawipsgrp else len(fhrs) + + fhrs = [f'f{fhr:03d}' for fhr in fhrs] + fhrs = np.array_split(fhrs, ngrps) + fhrs = [fhr.tolist() for fhr in fhrs] + + grp = ' '.join([f'_{fhr[0]}-{fhr[-1]}' for fhr in fhrs]) + dep = ' '.join([fhr[-1] for fhr in fhrs]) + lst = ' '.join(['_'.join(fhr) for fhr in fhrs]) + + return grp, dep, lst + + def awips_20km_1p0deg(self): - # Get a list of all forecast hours - fhrs = [] - if cdump in ['gdas']: - fhrs = range(fhmin, fhmax + fhout, fhout) - elif cdump in ['gfs']: - fhmax = np.max( - [config['FHMAX_GFS_00'], config['FHMAX_GFS_06'], config['FHMAX_GFS_12'], config['FHMAX_GFS_18']]) - fhout = config['FHOUT_GFS'] - fhmax_hf = config['FHMAX_HF_GFS'] - fhout_hf = config['FHOUT_HF_GFS'] - if fhmax > 240: - fhmax = 240 - if fhmax_hf > 240: - fhmax_hf = 240 - fhrs_hf = list(range(fhmin, fhmax_hf + fhout_hf, fhout_hf)) - fhrs = fhrs_hf + list(range(fhrs_hf[-1] + fhout, fhmax + fhout, fhout)) + deps = [] + dep_dict = {'type': 'metatask', 'name': f'{self.cdump}post'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps) - nawipsgrp = config['NAWIPSGRP'] - ngrps = nawipsgrp if len(fhrs) > nawipsgrp else len(fhrs) + awipsenvars = self.envars.copy() + awipsenvar_dict = {'FHRGRP': '#grp#', + 'FHRLST': '#lst#', + 'ROTDIR': self._base.get('ROTDIR')} + for key, value in awipsenvar_dict.items(): + awipsenvars.append(rocoto.create_envar(name=key, value=str(value))) - fhrs = [f'f{fhr:03d}' for fhr in fhrs] - fhrs = np.array_split(fhrs, ngrps) - fhrs = [fhr.tolist() for fhr in fhrs] + varname1, varname2, varname3 = 'grp', 'dep', 'lst' + varval1, varval2, varval3 = self._get_awipsgroups(self.cdump, self._configs['awips']) + vardict = {varname2: varval2, varname3: varval3} - grp = ' '.join([f'_{fhr[0]}-{fhr[-1]}' for fhr in fhrs]) - dep = ' '.join([fhr[-1] for fhr in fhrs]) - lst = ' '.join(['_'.join(fhr) for fhr in fhrs]) + resources = self.get_resource('awips') + task = create_wf_task('awips_20km_1p0deg', resources, cdump=self.cdump, envar=awipsenvars, dependency=dependencies, + metatask='awips_20km_1p0deg', varname=varname1, varval=varval1, vardict=vardict) - return grp, dep, lst + return task + + def awips_g2(self): deps = [] dep_dict = {'type': 'metatask', 'name': f'{self.cdump}post'} @@ -873,12 +898,12 @@ def _get_awipsgroups(cdump, config): awipsenvars.append(rocoto.create_envar(name=key, value=str(value))) varname1, varname2, varname3 = 'grp', 'dep', 'lst' - varval1, varval2, varval3 = _get_awipsgroups(self.cdump, self._configs['awips']) + varval1, varval2, varval3 = self._get_awipsgroups(self.cdump, self._configs['awips']) vardict = {varname2: varval2, varname3: varval3} resources = self.get_resource('awips') - task = create_wf_task('awips', resources, cdump=self.cdump, envar=awipsenvars, dependency=dependencies, - metatask='awips', varname=varname1, varval=varval1, vardict=vardict) + task = create_wf_task('awips_g2', resources, cdump=self.cdump, envar=awipsenvars, dependency=dependencies, + metatask='awips_g2', varname=varname1, varval=varval1, vardict=vardict) return task diff --git a/workflow/rocoto/tasks.py b/workflow/rocoto/tasks.py index dea351ac12..aa52e4fa9b 100644 --- a/workflow/rocoto/tasks.py +++ b/workflow/rocoto/tasks.py @@ -22,7 +22,7 @@ class Tasks: 'fcst', 'post', 'ocnpost', 'verfozn', 'verfrad', 'vminmon', 'metp', 'tracker', 'genesis', 'genesis_fsu', - 'postsnd', 'awips', 'fbwinds', 'gempak', + 'postsnd', 'awips_g2', 'awips_20km_1p0deg', 'fbwinds', 'gempak', 'waveawipsbulls', 'waveawipsgridded', 'wavegempak', 'waveinit', 'wavepostbndpnt', 'wavepostbndpntbll', 'wavepostpnt', 'wavepostsbs', 'waveprep', 'npoess']