From de8706702ead0630beb54d868f83aa2cb23f8f79 Mon Sep 17 00:00:00 2001 From: RussTreadon-NOAA <26926959+RussTreadon-NOAA@users.noreply.github.com> Date: Mon, 1 Jul 2024 09:29:14 -0400 Subject: [PATCH] Update for JCB policies and stage DA job files with Jinja2-templates (#2700) This PR updates the `gdas.cd` hash to bring in new JCB conventions. Resolves #2699 From #2654 This PR will move much of the staging code that take place in the python initialization subroutines of the variational and ensemble DA jobs into Jinja2-templated YAML files to be passed into the wxflow file handler. Much of the staging has already been done this way, but this PR simply expands that strategy. The old Python routines that were doing this staging are now removed. This is part of a broader refactoring of the pygfs tasking. wxflow PR [#30](https://github.com/NOAA-EMC/wxflow/pull/30) is a companion to this PR. Co-authored-by: danholdaway Co-authored-by: DavidNew-NOAA Co-authored-by: DavidNew-NOAA <134300700+DavidNew-NOAA@users.noreply.github.com> Co-authored-by: Dan Holdaway <27729500+danholdaway@users.noreply.github.com> --- env/HERCULES.env | 10 + jobs/rocoto/aeroanlfinal.sh | 5 - jobs/rocoto/aeroanlinit.sh | 6 - jobs/rocoto/aeroanlrun.sh | 6 - jobs/rocoto/atmanlfinal.sh | 5 - jobs/rocoto/atmanlfv3inc.sh | 6 - jobs/rocoto/atmanlinit.sh | 6 - jobs/rocoto/atmanlvar.sh | 6 - jobs/rocoto/atmensanlfinal.sh | 5 - jobs/rocoto/atmensanlfv3inc.sh | 6 - jobs/rocoto/atmensanlinit.sh | 6 - jobs/rocoto/atmensanlletkf.sh | 6 - jobs/rocoto/oceanice_products.sh | 6 - jobs/rocoto/prepatmiodaobs.sh | 5 +- jobs/rocoto/prepobsaero.sh | 6 - jobs/rocoto/prepsnowobs.sh | 5 +- jobs/rocoto/snowanl.sh | 6 - jobs/rocoto/upp.sh | 12 +- parm/config/gfs/config.atmanl | 7 +- parm/config/gfs/config.atmensanl | 1 + parm/gdas/staging/atm_berror_gsibec.yaml.j2 | 8 + parm/gdas/staging/atm_lgetkf_bkg.yaml.j2 | 32 +++ parm/gdas/staging/atm_var_bkg.yaml.j2 | 14 ++ parm/gdas/staging/atm_var_fv3ens.yaml.j2 | 24 ++ scripts/exglobal_cleanup.sh | 6 +- scripts/exglobal_prep_snow_obs.py | 2 +- sorc/gdas.cd | 2 +- sorc/wxflow | 2 +- ush/detect_machine.sh | 4 +- ush/load_fv3gfs_modules.sh | 5 + ush/load_ufsda_modules.sh | 5 + ush/python/pygfs/task/aero_analysis.py | 32 +-- ush/python/pygfs/task/aero_emissions.py | 4 +- ush/python/pygfs/task/aero_prepobs.py | 18 +- ush/python/pygfs/task/analysis.py | 110 +-------- ush/python/pygfs/task/archive.py | 9 +- ush/python/pygfs/task/atm_analysis.py | 244 +++----------------- ush/python/pygfs/task/atmens_analysis.py | 60 ++--- ush/python/pygfs/task/oceanice_products.py | 39 ++-- ush/python/pygfs/task/snow_analysis.py | 22 +- ush/python/pygfs/task/upp.py | 17 +- workflow/hosts.py | 6 +- 42 files changed, 248 insertions(+), 538 deletions(-) create mode 100644 parm/gdas/staging/atm_berror_gsibec.yaml.j2 create mode 100644 parm/gdas/staging/atm_lgetkf_bkg.yaml.j2 create mode 100644 parm/gdas/staging/atm_var_bkg.yaml.j2 create mode 100644 parm/gdas/staging/atm_var_fv3ens.yaml.j2 diff --git a/env/HERCULES.env b/env/HERCULES.env index 77e57e066d..79424f8639 100755 --- a/env/HERCULES.env +++ b/env/HERCULES.env @@ -132,6 +132,16 @@ case ${step} in [[ ${NTHREADS_OCNANAL} -gt ${nth_max} ]] && export NTHREADS_OCNANAL=${nth_max} export APRUN_OCNANAL="${launcher} -n ${npe_ocnanalrun} --cpus-per-task=${NTHREADS_OCNANAL}" ;; +"ocnanalecen") + + export APRUNCFP="${launcher} -n \$ncmd ${mpmd_opt}" + + nth_max=$((npe_node_max / npe_node_ocnanalecen)) + + export NTHREADS_OCNANALECEN=${nth_ocnanalecen:-${nth_max}} + [[ ${NTHREADS_OCNANALECEN} -gt ${nth_max} ]] && export NTHREADS_OCNANALECEN=${nth_max} + export APRUN_OCNANALECEN="${launcher} -n ${npe_ocnanalecen} --cpus-per-task=${NTHREADS_OCNANALECEN}" +;; "ocnanalchkpt") export APRUNCFP="${launcher} -n \$ncmd ${mpmd_opt}" diff --git a/jobs/rocoto/aeroanlfinal.sh b/jobs/rocoto/aeroanlfinal.sh index 16bb6887fd..39dea71810 100755 --- a/jobs/rocoto/aeroanlfinal.sh +++ b/jobs/rocoto/aeroanlfinal.sh @@ -11,11 +11,6 @@ status=$? export job="aeroanlfinal" export jobid="${job}.$$" -############################################################### -# setup python path for workflow utilities and tasks -wxflowPATH="${HOMEgfs}/ush/python:${HOMEgfs}/ush/python/wxflow/src" -PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${wxflowPATH}" -export PYTHONPATH ############################################################### # Execute the JJOB "${HOMEgfs}/jobs/JGLOBAL_AERO_ANALYSIS_FINALIZE" diff --git a/jobs/rocoto/aeroanlinit.sh b/jobs/rocoto/aeroanlinit.sh index 9aaf255782..7a1cf885c1 100755 --- a/jobs/rocoto/aeroanlinit.sh +++ b/jobs/rocoto/aeroanlinit.sh @@ -11,12 +11,6 @@ status=$? export job="aeroanlinit" export jobid="${job}.$$" -############################################################### -# setup python path for workflow utilities and tasks -wxflowPATH="${HOMEgfs}/ush/python:${HOMEgfs}/ush/python/wxflow/src" -PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${wxflowPATH}" -export PYTHONPATH - ############################################################### # Execute the JJOB "${HOMEgfs}/jobs/JGLOBAL_AERO_ANALYSIS_INITIALIZE" diff --git a/jobs/rocoto/aeroanlrun.sh b/jobs/rocoto/aeroanlrun.sh index bcd86e3fbf..529bb2d7d1 100755 --- a/jobs/rocoto/aeroanlrun.sh +++ b/jobs/rocoto/aeroanlrun.sh @@ -11,12 +11,6 @@ status=$? export job="aeroanlrun" export jobid="${job}.$$" -############################################################### -# setup python path for workflow utilities and tasks -wxflowPATH="${HOMEgfs}/ush/python:${HOMEgfs}/ush/python/wxflow/src" -PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${wxflowPATH}" -export PYTHONPATH - ############################################################### # Execute the JJOB "${HOMEgfs}/jobs/JGLOBAL_AERO_ANALYSIS_RUN" diff --git a/jobs/rocoto/atmanlfinal.sh b/jobs/rocoto/atmanlfinal.sh index 3d3c3ba9e6..a12894ed1e 100755 --- a/jobs/rocoto/atmanlfinal.sh +++ b/jobs/rocoto/atmanlfinal.sh @@ -11,11 +11,6 @@ status=$? export job="atmanlfinal" export jobid="${job}.$$" -############################################################### -# setup python path for workflow utilities and tasks -wxflowPATH="${HOMEgfs}/ush/python:${HOMEgfs}/ush/python/wxflow/src" -PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${wxflowPATH}" -export PYTHONPATH ############################################################### # Execute the JJOB "${HOMEgfs}/jobs/JGLOBAL_ATM_ANALYSIS_FINALIZE" diff --git a/jobs/rocoto/atmanlfv3inc.sh b/jobs/rocoto/atmanlfv3inc.sh index effc18cee5..5261c15f09 100755 --- a/jobs/rocoto/atmanlfv3inc.sh +++ b/jobs/rocoto/atmanlfv3inc.sh @@ -11,12 +11,6 @@ status=$? export job="atmanlfv3inc" export jobid="${job}.$$" -############################################################### -# setup python path for workflow utilities and tasks -wxflowPATH="${HOMEgfs}/ush/python:${HOMEgfs}/ush/python/wxflow/src" -PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${wxflowPATH}" -export PYTHONPATH - ############################################################### # Execute the JJOB "${HOMEgfs}/jobs/JGLOBAL_ATM_ANALYSIS_FV3_INCREMENT" diff --git a/jobs/rocoto/atmanlinit.sh b/jobs/rocoto/atmanlinit.sh index 13c7d8710b..5329200590 100755 --- a/jobs/rocoto/atmanlinit.sh +++ b/jobs/rocoto/atmanlinit.sh @@ -11,12 +11,6 @@ status=$? export job="atmanlinit" export jobid="${job}.$$" -############################################################### -# setup python path for workflow utilities and tasks -wxflowPATH="${HOMEgfs}/ush/python:${HOMEgfs}/ush/python/wxflow/src" -PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${wxflowPATH}" -export PYTHONPATH - ############################################################### # Execute the JJOB "${HOMEgfs}/jobs/JGLOBAL_ATM_ANALYSIS_INITIALIZE" diff --git a/jobs/rocoto/atmanlvar.sh b/jobs/rocoto/atmanlvar.sh index 812e3c706a..7df7f59dd1 100755 --- a/jobs/rocoto/atmanlvar.sh +++ b/jobs/rocoto/atmanlvar.sh @@ -11,12 +11,6 @@ status=$? export job="atmanlvar" export jobid="${job}.$$" -############################################################### -# setup python path for workflow utilities and tasks -wxflowPATH="${HOMEgfs}/ush/python:${HOMEgfs}/ush/python/wxflow/src" -PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${wxflowPATH}" -export PYTHONPATH - ############################################################### # Execute the JJOB "${HOMEgfs}/jobs/JGLOBAL_ATM_ANALYSIS_VARIATIONAL" diff --git a/jobs/rocoto/atmensanlfinal.sh b/jobs/rocoto/atmensanlfinal.sh index 5ffaa92754..fc29bdd9af 100755 --- a/jobs/rocoto/atmensanlfinal.sh +++ b/jobs/rocoto/atmensanlfinal.sh @@ -11,11 +11,6 @@ status=$? export job="atmensanlfinal" export jobid="${job}.$$" -############################################################### -# setup python path for workflow utilities and tasks -wxflowPATH="${HOMEgfs}/ush/python:${HOMEgfs}/ush/python/wxflow/src" -PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${wxflowPATH}" -export PYTHONPATH ############################################################### # Execute the JJOB "${HOMEgfs}/jobs/JGLOBAL_ATMENS_ANALYSIS_FINALIZE" diff --git a/jobs/rocoto/atmensanlfv3inc.sh b/jobs/rocoto/atmensanlfv3inc.sh index bb44ddc3a0..7f57e8d618 100755 --- a/jobs/rocoto/atmensanlfv3inc.sh +++ b/jobs/rocoto/atmensanlfv3inc.sh @@ -11,12 +11,6 @@ status=$? export job="atmensanlfv3inc" export jobid="${job}.$$" -############################################################### -# setup python path for workflow utilities and tasks -wxflowPATH="${HOMEgfs}/ush/python:${HOMEgfs}/ush/python/wxflow/src" -PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${wxflowPATH}" -export PYTHONPATH - ############################################################### # Execute the JJOB "${HOMEgfs}/jobs/JGLOBAL_ATMENS_ANALYSIS_FV3_INCREMENT" diff --git a/jobs/rocoto/atmensanlinit.sh b/jobs/rocoto/atmensanlinit.sh index 2c2204548a..1cd8129df6 100755 --- a/jobs/rocoto/atmensanlinit.sh +++ b/jobs/rocoto/atmensanlinit.sh @@ -11,12 +11,6 @@ status=$? export job="atmensanlinit" export jobid="${job}.$$" -############################################################### -# setup python path for workflow utilities and tasks -wxflowPATH="${HOMEgfs}/ush/python:${HOMEgfs}/ush/python/wxflow/src" -PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${wxflowPATH}" -export PYTHONPATH - ############################################################### # Execute the JJOB "${HOMEgfs}/jobs/JGLOBAL_ATMENS_ANALYSIS_INITIALIZE" diff --git a/jobs/rocoto/atmensanlletkf.sh b/jobs/rocoto/atmensanlletkf.sh index b4a1a73a80..0ca86bfb43 100755 --- a/jobs/rocoto/atmensanlletkf.sh +++ b/jobs/rocoto/atmensanlletkf.sh @@ -11,12 +11,6 @@ status=$? export job="atmensanlletkf" export jobid="${job}.$$" -############################################################### -# setup python path for workflow utilities and tasks -wxflowPATH="${HOMEgfs}/ush/python:${HOMEgfs}/ush/python/wxflow/src" -PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${wxflowPATH}" -export PYTHONPATH - ############################################################### # Execute the JJOB "${HOMEgfs}/jobs/JGLOBAL_ATMENS_ANALYSIS_LETKF" diff --git a/jobs/rocoto/oceanice_products.sh b/jobs/rocoto/oceanice_products.sh index eb704fb35f..2a3b617d05 100755 --- a/jobs/rocoto/oceanice_products.sh +++ b/jobs/rocoto/oceanice_products.sh @@ -12,12 +12,6 @@ source "${HOMEgfs}/ush/preamble.sh" status=$? if (( status != 0 )); then exit "${status}"; fi -############################################################### -# setup python path for workflow utilities and tasks -wxflowPATH="${HOMEgfs}/ush/python:${HOMEgfs}/ush/python/wxflow/src" -PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${wxflowPATH}" -export PYTHONPATH - export job="oceanice_products" export jobid="${job}.$$" diff --git a/jobs/rocoto/prepatmiodaobs.sh b/jobs/rocoto/prepatmiodaobs.sh index 0e69eda5c9..26629a514f 100755 --- a/jobs/rocoto/prepatmiodaobs.sh +++ b/jobs/rocoto/prepatmiodaobs.sh @@ -12,11 +12,10 @@ export job="prepatmobs" export jobid="${job}.$$" ############################################################### -# setup python path for workflow and ioda utilities -wxflowPATH="${HOMEgfs}/ush/python:${HOMEgfs}/ush/python/wxflow/src" +# setup python path for ioda utilities # shellcheck disable=SC2311 pyiodaPATH="${HOMEgfs}/sorc/gdas.cd/build/lib/python$(detect_py_ver)/" -PYTHONPATH="${pyiodaPATH}:${wxflowPATH}:${PYTHONPATH}" +PYTHONPATH="${pyiodaPATH}:${PYTHONPATH}" export PYTHONPATH ############################################################### diff --git a/jobs/rocoto/prepobsaero.sh b/jobs/rocoto/prepobsaero.sh index 89da7547e8..5d65ff8a02 100755 --- a/jobs/rocoto/prepobsaero.sh +++ b/jobs/rocoto/prepobsaero.sh @@ -11,12 +11,6 @@ status=$? export job="prepobsaero" export jobid="${job}.$$" -############################################################### -# setup python path for workflow utilities and tasks -wxflowPATH="${HOMEgfs}/ush/python:${HOMEgfs}/ush/python/wxflow/src" -PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${wxflowPATH}" -export PYTHONPATH - ############################################################### # Execute the JJOB "${HOMEgfs}/jobs/JGLOBAL_PREP_OBS_AERO" diff --git a/jobs/rocoto/prepsnowobs.sh b/jobs/rocoto/prepsnowobs.sh index cff082bab2..3f23bc16a5 100755 --- a/jobs/rocoto/prepsnowobs.sh +++ b/jobs/rocoto/prepsnowobs.sh @@ -12,12 +12,11 @@ export job="prepsnowobs" export jobid="${job}.$$" ############################################################### -# setup python path for workflow utilities and tasks -wxflowPATH="${HOMEgfs}/ush/python:${HOMEgfs}/ush/python/wxflow/src" +# setup python path for ioda utilities # shellcheck disable=SC2311 pyiodaPATH="${HOMEgfs}/sorc/gdas.cd/build/lib/python$(detect_py_ver)/" gdasappPATH="${HOMEgfs}/sorc/gdas.cd/sorc/iodaconv/src:${pyiodaPATH}" -PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${wxflowPATH}:${gdasappPATH}" +PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}:${gdasappPATH}" export PYTHONPATH ############################################################### diff --git a/jobs/rocoto/snowanl.sh b/jobs/rocoto/snowanl.sh index 627dd860f4..97df7a46c7 100755 --- a/jobs/rocoto/snowanl.sh +++ b/jobs/rocoto/snowanl.sh @@ -11,12 +11,6 @@ status=$? export job="snowanl" export jobid="${job}.$$" -############################################################### -# setup python path for workflow utilities and tasks -wxflowPATH="${HOMEgfs}/ush/python:${HOMEgfs}/ush/python/wxflow/src" -PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${wxflowPATH}" -export PYTHONPATH - ############################################################### # Execute the JJOB "${HOMEgfs}/jobs/JGLOBAL_SNOW_ANALYSIS" diff --git a/jobs/rocoto/upp.sh b/jobs/rocoto/upp.sh index da0180472d..c3f128ab02 100755 --- a/jobs/rocoto/upp.sh +++ b/jobs/rocoto/upp.sh @@ -29,18 +29,18 @@ if [[ "${MACHINE_ID}" = "wcoss2" ]]; then module load python/3.8.6 module load crtm/2.4.0 # TODO: This is only needed when UPP_RUN=goes. Is there a better way to handle this? set_trace + + # Add wxflow to PYTHONPATH + wxflowPATH="${HOMEgfs}/ush/python" + PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${HOMEgfs}/ush:${wxflowPATH}" + export PYTHONPATH + else . "${HOMEgfs}/ush/load_fv3gfs_modules.sh" status=$? if (( status != 0 )); then exit "${status}"; fi fi -############################################################### -# setup python path for workflow utilities and tasks -wxflowPATH="${HOMEgfs}/ush/python:${HOMEgfs}/ush/python/wxflow/src" -PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${wxflowPATH}" -export PYTHONPATH - export job="upp" export jobid="${job}.$$" diff --git a/parm/config/gfs/config.atmanl b/parm/config/gfs/config.atmanl index 7879b8b683..9a06088ecc 100644 --- a/parm/config/gfs/config.atmanl +++ b/parm/config/gfs/config.atmanl @@ -15,14 +15,17 @@ export INTERP_METHOD='barycentric' if [[ ${DOHYBVAR} = "YES" ]]; then # shellcheck disable=SC2153 export CASE_ANL=${CASE_ENS} - export BERROR_YAML="background_error_hybrid_${STATICB_TYPE}_${LOCALIZATION_TYPE}" + export BERROR_YAML="atmosphere_background_error_hybrid_${STATICB_TYPE}_${LOCALIZATION_TYPE}" else export CASE_ANL=${CASE} - export BERROR_YAML="background_error_static_${STATICB_TYPE}" + export BERROR_YAML="atmosphere_background_error_static_${STATICB_TYPE}" fi export CRTM_FIX_YAML="${PARMgfs}/gdas/atm_crtm_coeff.yaml.j2" export JEDI_FIX_YAML="${PARMgfs}/gdas/atm_jedi_fix.yaml.j2" +export VAR_BKG_STAGING_YAML="${PARMgfs}/gdas/staging/atm_var_bkg.yaml.j2" +export BERROR_STAGING_YAML="${PARMgfs}/gdas/staging/atm_berror_${STATICB_TYPE}.yaml.j2" +export FV3ENS_STAGING_YAML="${PARMgfs}/gdas/staging/atm_var_fv3ens.yaml.j2" export layout_x_atmanl=@LAYOUT_X_ATMANL@ export layout_y_atmanl=@LAYOUT_Y_ATMANL@ diff --git a/parm/config/gfs/config.atmensanl b/parm/config/gfs/config.atmensanl index c03583659d..ddd3d88659 100644 --- a/parm/config/gfs/config.atmensanl +++ b/parm/config/gfs/config.atmensanl @@ -12,6 +12,7 @@ export INTERP_METHOD='barycentric' export CRTM_FIX_YAML="${PARMgfs}/gdas/atm_crtm_coeff.yaml.j2" export JEDI_FIX_YAML="${PARMgfs}/gdas/atm_jedi_fix.yaml.j2" +export LGETKF_BKG_STAGING_YAML="${PARMgfs}/gdas/staging/atm_lgetkf_bkg.yaml.j2" export layout_x_atmensanl=@LAYOUT_X_ATMENSANL@ export layout_y_atmensanl=@LAYOUT_Y_ATMENSANL@ diff --git a/parm/gdas/staging/atm_berror_gsibec.yaml.j2 b/parm/gdas/staging/atm_berror_gsibec.yaml.j2 new file mode 100644 index 0000000000..e6c5e41609 --- /dev/null +++ b/parm/gdas/staging/atm_berror_gsibec.yaml.j2 @@ -0,0 +1,8 @@ +{% set fname_list = ['gfs_gsi_global.nml', 'gsi-coeffs-gfs-global.nc4'] %} + +mkdir: +- '{{ DATA }}/berror' +copy: +{% for fname in fname_list %} +- ['{{ HOMEgfs }}/fix/gdas/gsibec/{{ CASE_ANL }}/{{ fname }}', '{{ DATA }}/berror'] +{% endfor %} diff --git a/parm/gdas/staging/atm_lgetkf_bkg.yaml.j2 b/parm/gdas/staging/atm_lgetkf_bkg.yaml.j2 new file mode 100644 index 0000000000..eda3dad5a7 --- /dev/null +++ b/parm/gdas/staging/atm_lgetkf_bkg.yaml.j2 @@ -0,0 +1,32 @@ +{% set ftype_list = ['fv_core.res', 'fv_srf_wnd.res', 'fv_tracer.res', 'phy_data', 'sfc_data'] %} +{% set time_list = [current_cycle] %} + +mkdir: +{% for imem in range(1,NMEM_ENS+1) %} + {% set memchar = 'mem%03d' | format(imem) %} + {% set tmpl_dict = ({ '${ROTDIR}': ROTDIR, + '${RUN}': RUN, + '${YMD}': current_cycle | to_YMD, + '${HH}': current_cycle | strftime('%H'), + '${MEMDIR}': memchar }) %} +- '{{ DATA }}/bkg/{{ memchar }}' +- '{{ DATA }}/anl/{{ memchar }}' +- '{{ COM_ATMOS_ANALYSIS_TMPL | replace_tmpl(tmpl_dict) }}' +{% endfor %} +copy: +{% for time in time_list %} + {% for imem in range(1,NMEM_ENS+1) %} + {% set memchar = 'mem%03d' | format(imem) %} + {% set tmpl_dict = ({ '${ROTDIR}': ROTDIR, + '${RUN}': 'enkfgdas', + '${YMD}': previous_cycle | to_YMD, + '${HH}': previous_cycle | strftime('%H'), + '${MEMDIR}': memchar }) %} +- ['{{ COM_ATMOS_RESTART_TMPL | replace_tmpl(tmpl_dict) }}/{{ time | to_fv3time }}.coupler.res', '{{ DATA }}/bkg/{{ memchar }}/'] + {% for ftype in ftype_list %} + {% for itile in range(1,7) %} +- ['{{ COM_ATMOS_RESTART_TMPL | replace_tmpl(tmpl_dict) }}/{{ time | to_fv3time }}.{{ ftype }}.tile{{ itile }}.nc', '{{ DATA }}/bkg/{{ memchar }}/'] + {% endfor %} + {% endfor %} + {% endfor %} +{% endfor %} diff --git a/parm/gdas/staging/atm_var_bkg.yaml.j2 b/parm/gdas/staging/atm_var_bkg.yaml.j2 new file mode 100644 index 0000000000..37af833649 --- /dev/null +++ b/parm/gdas/staging/atm_var_bkg.yaml.j2 @@ -0,0 +1,14 @@ +{% set ftype_list = ['fv_core.res', 'fv_srf_wnd.res', 'fv_tracer.res', 'phy_data', 'sfc_data'] %} +{% set time_list = [current_cycle] %} + +mkdir: +- '{{ DATA }}/bkg' +copy: +{% for time in time_list %} +- ['{{ COM_ATMOS_RESTART_PREV }}/{{ time | to_fv3time }}.coupler.res', '{{ DATA }}/bkg/'] + {% for ftype in ftype_list %} + {% for itile in range(1,ntiles+1) %} +- ['{{ COM_ATMOS_RESTART_PREV }}/{{ time | to_fv3time }}.{{ ftype }}.tile{{ itile }}.nc', '{{ DATA }}/bkg/'] + {% endfor %} + {% endfor %} +{% endfor %} diff --git a/parm/gdas/staging/atm_var_fv3ens.yaml.j2 b/parm/gdas/staging/atm_var_fv3ens.yaml.j2 new file mode 100644 index 0000000000..e499c86d57 --- /dev/null +++ b/parm/gdas/staging/atm_var_fv3ens.yaml.j2 @@ -0,0 +1,24 @@ +{% set ftype_list = ['fv_core.res', 'fv_srf_wnd.res', 'fv_tracer.res', 'phy_data', 'sfc_data'] %} +{% set time_list = [current_cycle] %} + +mkdir: +{% for imem in range(1,NMEM_ENS+1) %} +- '{{ DATA }}/ens/{{ 'mem%03d' | format(imem) }}' +{% endfor %} +copy: +{% for time in time_list %} + {% for imem in range(1,NMEM_ENS+1) %} + {% set memchar = 'mem%03d' | format(imem) %} + {% set tmpl_dict = ({ '${ROTDIR}': ROTDIR, + '${RUN}': 'enkfgdas', + '${YMD}': previous_cycle | to_YMD, + '${HH}': previous_cycle | strftime('%H'), + '${MEMDIR}': memchar }) %} +- ['{{ COM_ATMOS_RESTART_TMPL | replace_tmpl(tmpl_dict) }}/{{ time | to_fv3time }}.coupler.res', '{{ DATA }}/ens/{{ memchar }}/'] + {% for ftype in ftype_list %} + {% for itile in range(1,ntiles+1) %} +- ['{{ COM_ATMOS_RESTART_TMPL | replace_tmpl(tmpl_dict) }}/{{ time | to_fv3time }}.{{ ftype }}.tile{{ itile }}.nc', '{{ DATA }}/ens/{{ memchar }}/'] + {% endfor %} + {% endfor %} + {% endfor %} +{% endfor %} diff --git a/scripts/exglobal_cleanup.sh b/scripts/exglobal_cleanup.sh index 1150ca6d1d..dcf1baef31 100755 --- a/scripts/exglobal_cleanup.sh +++ b/scripts/exglobal_cleanup.sh @@ -14,13 +14,13 @@ rm -rf "${DATAROOT}/${RUN}efcs"*"${PDY:-}${cyc}" # Search and delete files/directories from DATAROOT/ older than ${purge_every_days} days # purge_every_days should be a positive integer -purge_every_days=3 +#purge_every_days=3 # Find and delete files older than ${purge_every_days} days -find "${DATAROOT}/"* -type f -mtime "+${purge_every_days}" -exec rm -f {} \; +#find "${DATAROOT}/"* -type f -mtime "+${purge_every_days}" -exec rm -f {} \; # Find and delete directories older than ${purge_every_days} days -find "${DATAROOT}/"* -type d -mtime "+${purge_every_days}" -exec rm -rf {} \; +#find "${DATAROOT}/"* -type d -mtime "+${purge_every_days}" -exec rm -rf {} \; echo "Cleanup ${DATAROOT} completed!" ############################################################### diff --git a/scripts/exglobal_prep_snow_obs.py b/scripts/exglobal_prep_snow_obs.py index 5107d9c935..d4998a7d84 100755 --- a/scripts/exglobal_prep_snow_obs.py +++ b/scripts/exglobal_prep_snow_obs.py @@ -21,5 +21,5 @@ # Instantiate the snow prepare task SnowAnl = SnowAnalysis(config) SnowAnl.prepare_GTS() - if f"{ SnowAnl.runtime_config.cyc }" == '18': + if f"{ SnowAnl.task_config.cyc }" == '18': SnowAnl.prepare_IMS() diff --git a/sorc/gdas.cd b/sorc/gdas.cd index 368c9c5db9..e3644a98c3 160000 --- a/sorc/gdas.cd +++ b/sorc/gdas.cd @@ -1 +1 @@ -Subproject commit 368c9c5db9b5ea62e72937b6d1b0f753adb9be40 +Subproject commit e3644a98c362d7321f9e3081a4e55947885ed2bf diff --git a/sorc/wxflow b/sorc/wxflow index 8406beeea4..5dad7dd61c 160000 --- a/sorc/wxflow +++ b/sorc/wxflow @@ -1 +1 @@ -Subproject commit 8406beeea410118cdfbd8300895b2b2878eadba6 +Subproject commit 5dad7dd61cebd9b3f2b163b3b06bb75eae1860a9 diff --git a/ush/detect_machine.sh b/ush/detect_machine.sh index 683ee0db7f..cfd0fa97e2 100755 --- a/ush/detect_machine.sh +++ b/ush/detect_machine.sh @@ -75,8 +75,8 @@ elif [[ -d /scratch1 ]]; then MACHINE_ID=hera elif [[ -d /work ]]; then # We are on MSU Orion or Hercules - if [[ -d /apps/other ]]; then - # We are on Hercules + mount=$(findmnt -n -o SOURCE /home) + if [[ ${mount} =~ "hercules" ]]; then MACHINE_ID=hercules else MACHINE_ID=orion diff --git a/ush/load_fv3gfs_modules.sh b/ush/load_fv3gfs_modules.sh index ae0e381db4..5f6afb7e35 100755 --- a/ush/load_fv3gfs_modules.sh +++ b/ush/load_fv3gfs_modules.sh @@ -30,6 +30,11 @@ esac module list +# Add wxflow to PYTHONPATH +wxflowPATH="${HOMEgfs}/ush/python" +PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${HOMEgfs}/ush:${wxflowPATH}" +export PYTHONPATH + # Restore stack soft limit: ulimit -S -s "${ulimit_s}" unset ulimit_s diff --git a/ush/load_ufsda_modules.sh b/ush/load_ufsda_modules.sh index d7aa08e1ae..8117d3f359 100755 --- a/ush/load_ufsda_modules.sh +++ b/ush/load_ufsda_modules.sh @@ -51,6 +51,11 @@ esac module list pip list +# Add wxflow to PYTHONPATH +wxflowPATH="${HOMEgfs}/ush/python" +PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${HOMEgfs}/ush:${wxflowPATH}" +export PYTHONPATH + # Restore stack soft limit: ulimit -S -s "${ulimit_s}" unset ulimit_s diff --git a/ush/python/pygfs/task/aero_analysis.py b/ush/python/pygfs/task/aero_analysis.py index 16d2735090..69a992d7d4 100644 --- a/ush/python/pygfs/task/aero_analysis.py +++ b/ush/python/pygfs/task/aero_analysis.py @@ -29,33 +29,33 @@ class AerosolAnalysis(Analysis): def __init__(self, config): super().__init__(config) - _res = int(self.config['CASE'][1:]) - _res_anl = int(self.config['CASE_ANL'][1:]) - _window_begin = add_to_datetime(self.runtime_config.current_cycle, -to_timedelta(f"{self.config['assim_freq']}H") / 2) - _jedi_yaml = os.path.join(self.runtime_config.DATA, f"{self.runtime_config.CDUMP}.t{self.runtime_config['cyc']:02d}z.aerovar.yaml") + _res = int(self.task_config['CASE'][1:]) + _res_anl = int(self.task_config['CASE_ANL'][1:]) + _window_begin = add_to_datetime(self.task_config.current_cycle, -to_timedelta(f"{self.task_config['assim_freq']}H") / 2) + _jedi_yaml = os.path.join(self.task_config.DATA, f"{self.task_config.RUN}.t{self.task_config['cyc']:02d}z.aerovar.yaml") # Create a local dictionary that is repeatedly used across this class local_dict = AttrDict( { 'npx_ges': _res + 1, 'npy_ges': _res + 1, - 'npz_ges': self.config.LEVS - 1, - 'npz': self.config.LEVS - 1, + 'npz_ges': self.task_config.LEVS - 1, + 'npz': self.task_config.LEVS - 1, 'npx_anl': _res_anl + 1, 'npy_anl': _res_anl + 1, - 'npz_anl': self.config['LEVS'] - 1, + 'npz_anl': self.task_config['LEVS'] - 1, 'AERO_WINDOW_BEGIN': _window_begin, - 'AERO_WINDOW_LENGTH': f"PT{self.config['assim_freq']}H", - 'aero_bkg_fhr': map(int, str(self.config['aero_bkg_times']).split(',')), - 'OPREFIX': f"{self.runtime_config.CDUMP}.t{self.runtime_config.cyc:02d}z.", # TODO: CDUMP is being replaced by RUN - 'APREFIX': f"{self.runtime_config.CDUMP}.t{self.runtime_config.cyc:02d}z.", # TODO: CDUMP is being replaced by RUN - 'GPREFIX': f"gdas.t{self.runtime_config.previous_cycle.hour:02d}z.", + 'AERO_WINDOW_LENGTH': f"PT{self.task_config['assim_freq']}H", + 'aero_bkg_fhr': map(int, str(self.task_config['aero_bkg_times']).split(',')), + 'OPREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", + 'APREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", + 'GPREFIX': f"gdas.t{self.task_config.previous_cycle.hour:02d}z.", 'jedi_yaml': _jedi_yaml, } ) - # task_config is everything that this task should need - self.task_config = AttrDict(**self.config, **self.runtime_config, **local_dict) + # Extend task_config with local_dict + self.task_config = AttrDict(**self.task_config, **local_dict) @logit(logger) def initialize(self: Analysis) -> None: @@ -157,8 +157,8 @@ def finalize(self: Analysis) -> None: archive.add(diaggzip, arcname=os.path.basename(diaggzip)) # copy full YAML from executable to ROTDIR - src = os.path.join(self.task_config['DATA'], f"{self.task_config['CDUMP']}.t{self.runtime_config['cyc']:02d}z.aerovar.yaml") - dest = os.path.join(self.task_config.COM_CHEM_ANALYSIS, f"{self.task_config['CDUMP']}.t{self.runtime_config['cyc']:02d}z.aerovar.yaml") + src = os.path.join(self.task_config['DATA'], f"{self.task_config['RUN']}.t{self.task_config['cyc']:02d}z.aerovar.yaml") + dest = os.path.join(self.task_config.COM_CHEM_ANALYSIS, f"{self.task_config['RUN']}.t{self.task_config['cyc']:02d}z.aerovar.yaml") yaml_copy = { 'mkdir': [self.task_config.COM_CHEM_ANALYSIS], 'copy': [[src, dest]] diff --git a/ush/python/pygfs/task/aero_emissions.py b/ush/python/pygfs/task/aero_emissions.py index 17d2f528e4..5f2d4c6840 100644 --- a/ush/python/pygfs/task/aero_emissions.py +++ b/ush/python/pygfs/task/aero_emissions.py @@ -42,7 +42,9 @@ def __init__(self, config: Dict[str, Any]) -> None: localdict = AttrDict( {'variable_used_repeatedly': local_variable} ) - self.task_config = AttrDict(**self.config, **self.runtime_config, **localdict) + + # Extend task_config with localdict + self.task_config = AttrDict(**self.task_config, **localdict) @staticmethod @logit(logger) diff --git a/ush/python/pygfs/task/aero_prepobs.py b/ush/python/pygfs/task/aero_prepobs.py index f2344241a9..d8396fe3ca 100644 --- a/ush/python/pygfs/task/aero_prepobs.py +++ b/ush/python/pygfs/task/aero_prepobs.py @@ -24,23 +24,23 @@ class AerosolObsPrep(Task): def __init__(self, config: Dict[str, Any]) -> None: super().__init__(config) - _window_begin = add_to_datetime(self.runtime_config.current_cycle, -to_timedelta(f"{self.config['assim_freq']}H") / 2) - _window_end = add_to_datetime(self.runtime_config.current_cycle, +to_timedelta(f"{self.config['assim_freq']}H") / 2) + _window_begin = add_to_datetime(self.task_config.current_cycle, -to_timedelta(f"{self.task_config['assim_freq']}H") / 2) + _window_end = add_to_datetime(self.task_config.current_cycle, +to_timedelta(f"{self.task_config['assim_freq']}H") / 2) local_dict = AttrDict( { 'window_begin': _window_begin, 'window_end': _window_end, - 'sensors': str(self.config['SENSORS']).split(','), - 'data_dir': self.config['VIIRS_DATA_DIR'], + 'sensors': str(self.task_config['SENSORS']).split(','), + 'data_dir': self.task_config['VIIRS_DATA_DIR'], 'input_files': '', - 'OPREFIX': f"{self.runtime_config.RUN}.t{self.runtime_config.cyc:02d}z.", - 'APREFIX': f"{self.runtime_config.RUN}.t{self.runtime_config.cyc:02d}z." + 'OPREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", + 'APREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z." } ) # task_config is everything that this task should need - self.task_config = AttrDict(**self.config, **self.runtime_config, **local_dict) + self.task_config = AttrDict(**self.task_config, **local_dict) @logit(logger) def initialize(self) -> None: @@ -64,8 +64,8 @@ def initialize(self) -> None: self.task_config.prepaero_config = self.get_obsproc_config(sensor) # generate converter YAML file - template = f"{self.runtime_config.CDUMP}.t{self.runtime_config['cyc']:02d}z.prepaero_viirs_{sensor}.yaml" - _prepaero_yaml = os.path.join(self.runtime_config.DATA, template) + template = f"{self.task_config.RUN}.t{self.task_config['cyc']:02d}z.prepaero_viirs_{sensor}.yaml" + _prepaero_yaml = os.path.join(self.task_config.DATA, template) self.task_config.prepaero_yaml.append(_prepaero_yaml) logger.debug(f"Generate PrepAeroObs YAML file: {_prepaero_yaml}") save_as_yaml(self.task_config.prepaero_config, _prepaero_yaml) diff --git a/ush/python/pygfs/task/analysis.py b/ush/python/pygfs/task/analysis.py index b668ac3980..e407cf1765 100644 --- a/ush/python/pygfs/task/analysis.py +++ b/ush/python/pygfs/task/analysis.py @@ -27,7 +27,7 @@ class Analysis(Task): def __init__(self, config: Dict[str, Any]) -> None: super().__init__(config) # Store location of GDASApp jinja2 templates - self.gdasapp_j2tmpl_dir = os.path.join(self.config.PARMgfs, 'gdas') + self.gdasapp_j2tmpl_dir = os.path.join(self.task_config.PARMgfs, 'gdas') def initialize(self) -> None: super().initialize() @@ -54,7 +54,7 @@ def get_jedi_config(self, algorithm: Optional[str] = None) -> Dict[str, Any]: ---------- algorithm (optional) : str Name of the algorithm to use in the JEDI configuration. Will override the algorithm - set in the self.config.JCB_<>_YAML file + set in the self.task_config.JCB_<>_YAML file Returns ---------- @@ -120,7 +120,7 @@ def get_obs_dict(self) -> Dict[str, Any]: basename = os.path.basename(obfile) copylist.append([os.path.join(self.task_config['COM_OBS'], basename), obfile]) obs_dict = { - 'mkdir': [os.path.join(self.runtime_config['DATA'], 'obs')], + 'mkdir': [os.path.join(self.task_config['DATA'], 'obs')], 'copy': copylist } return obs_dict @@ -161,7 +161,7 @@ def get_bias_dict(self) -> Dict[str, Any]: # TODO: Why is this specific to ATMOS? bias_dict = { - 'mkdir': [os.path.join(self.runtime_config.DATA, 'bc')], + 'mkdir': [os.path.join(self.task_config.DATA, 'bc')], 'copy': copylist } return bias_dict @@ -180,7 +180,7 @@ def add_fv3_increments(self, inc_file_tmpl: str, bkg_file_tmpl: str, incvars: Li List of increment variables to add to the background """ - for itile in range(1, self.config.ntiles + 1): + for itile in range(1, self.task_config.ntiles + 1): inc_path = inc_file_tmpl.format(tilenum=itile) bkg_path = bkg_file_tmpl.format(tilenum=itile) with Dataset(inc_path, mode='r') as incfile, Dataset(bkg_path, mode='a') as rstfile: @@ -194,44 +194,6 @@ def add_fv3_increments(self, inc_file_tmpl: str, bkg_file_tmpl: str, incvars: Li except (AttributeError, RuntimeError): pass # checksum is missing, move on - @logit(logger) - def get_bkg_dict(self, task_config: Dict[str, Any]) -> Dict[str, List[str]]: - """Compile a dictionary of model background files to copy - - This method is a placeholder for now... will be possibly made generic at a later date - - Parameters - ---------- - task_config: Dict - a dictionary containing all of the configuration needed for the task - - Returns - ---------- - bkg_dict: Dict - a dictionary containing the list of model background files to copy for FileHandler - """ - bkg_dict = {'foo': 'bar'} - return bkg_dict - - @logit(logger) - def get_berror_dict(self, config: Dict[str, Any]) -> Dict[str, List[str]]: - """Compile a dictionary of background error files to copy - - This method is a placeholder for now... will be possibly made generic at a later date - - Parameters - ---------- - config: Dict - a dictionary containing all of the configuration needed - - Returns - ---------- - berror_dict: Dict - a dictionary containing the list of background error files to copy for FileHandler - """ - berror_dict = {'foo': 'bar'} - return berror_dict - @logit(logger) def link_jediexe(self) -> None: """Compile a dictionary of background error files to copy @@ -258,68 +220,6 @@ def link_jediexe(self) -> None: return exe_dest - @staticmethod - @logit(logger) - def get_fv3ens_dict(config: Dict[str, Any]) -> Dict[str, Any]: - """Compile a dictionary of ensemble member restarts to copy - - This method constructs a dictionary of ensemble FV3 restart files (coupler, core, tracer) - that are needed for global atmens DA and returns said dictionary for use by the FileHandler class. - - Parameters - ---------- - config: Dict - a dictionary containing all of the configuration needed - - Returns - ---------- - ens_dict: Dict - a dictionary containing the list of ensemble member restart files to copy for FileHandler - """ - # NOTE for now this is FV3 restart files and just assumed to be fh006 - - # define template - template_res = config.COM_ATMOS_RESTART_TMPL - prev_cycle = config.previous_cycle - tmpl_res_dict = { - 'ROTDIR': config.ROTDIR, - 'RUN': config.RUN, - 'YMD': to_YMD(prev_cycle), - 'HH': prev_cycle.strftime('%H'), - 'MEMDIR': None - } - - # construct ensemble member file list - dirlist = [] - enslist = [] - for imem in range(1, config.NMEM_ENS + 1): - memchar = f"mem{imem:03d}" - - # create directory path for ensemble member restart - dirlist.append(os.path.join(config.DATA, config.dirname, f'mem{imem:03d}')) - - # get FV3 restart files, this will be a lot simpler when using history files - tmpl_res_dict['MEMDIR'] = memchar - rst_dir = Template.substitute_structure(template_res, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_res_dict.get) - run_dir = os.path.join(config.DATA, config.dirname, memchar) - - # atmens DA needs coupler - basename = f'{to_fv3time(config.current_cycle)}.coupler.res' - enslist.append([os.path.join(rst_dir, basename), os.path.join(config.DATA, config.dirname, memchar, basename)]) - - # atmens DA needs core, srf_wnd, tracer, phy_data, sfc_data - for ftype in ['fv_core.res', 'fv_srf_wnd.res', 'fv_tracer.res', 'phy_data', 'sfc_data']: - template = f'{to_fv3time(config.current_cycle)}.{ftype}.tile{{tilenum}}.nc' - for itile in range(1, config.ntiles + 1): - basename = template.format(tilenum=itile) - enslist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)]) - - ens_dict = { - 'mkdir': dirlist, - 'copy': enslist, - } - return ens_dict - @staticmethod @logit(logger) def tgz_diags(statfile: str, diagdir: str) -> None: diff --git a/ush/python/pygfs/task/archive.py b/ush/python/pygfs/task/archive.py index d0722552e1..953a856192 100644 --- a/ush/python/pygfs/task/archive.py +++ b/ush/python/pygfs/task/archive.py @@ -35,12 +35,13 @@ def __init__(self, config: Dict[str, Any]) -> None: """ super().__init__(config) - rotdir = self.config.ROTDIR + os.sep + rotdir = self.task_config.ROTDIR + os.sep # Find all absolute paths in the environment and get their relative paths from ${ROTDIR} path_dict = self._gen_relative_paths(rotdir) - self.task_config = AttrDict(**self.config, **self.runtime_config, **path_dict) + # Extend task_config with path_dict + self.task_config = AttrDict(**self.task_config, **path_dict) @logit(logger) def configure(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str, Any]]): @@ -297,7 +298,7 @@ def _create_tarball(target: str, fileset: List) -> None: @logit(logger) def _gen_relative_paths(self, root_path: str) -> Dict: - """Generate a dict of paths in self.config relative to root_path + """Generate a dict of paths in self.task_config relative to root_path Parameters ---------- @@ -314,7 +315,7 @@ def _gen_relative_paths(self, root_path: str) -> Dict: """ rel_path_dict = {} - for key, value in self.config.items(): + for key, value in self.task_config.items(): if isinstance(value, str): if root_path in value: rel_path = value.replace(root_path, "") diff --git a/ush/python/pygfs/task/atm_analysis.py b/ush/python/pygfs/task/atm_analysis.py index 95545c57a4..4e9d37335c 100644 --- a/ush/python/pygfs/task/atm_analysis.py +++ b/ush/python/pygfs/task/atm_analysis.py @@ -28,35 +28,35 @@ class AtmAnalysis(Analysis): def __init__(self, config): super().__init__(config) - _res = int(self.config.CASE[1:]) - _res_anl = int(self.config.CASE_ANL[1:]) - _window_begin = add_to_datetime(self.runtime_config.current_cycle, -to_timedelta(f"{self.config.assim_freq}H") / 2) - _jedi_yaml = os.path.join(self.runtime_config.DATA, f"{self.runtime_config.CDUMP}.t{self.runtime_config.cyc:02d}z.atmvar.yaml") + _res = int(self.task_config.CASE[1:]) + _res_anl = int(self.task_config.CASE_ANL[1:]) + _window_begin = add_to_datetime(self.task_config.current_cycle, -to_timedelta(f"{self.task_config.assim_freq}H") / 2) + _jedi_yaml = os.path.join(self.task_config.DATA, f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.atmvar.yaml") # Create a local dictionary that is repeatedly used across this class local_dict = AttrDict( { 'npx_ges': _res + 1, 'npy_ges': _res + 1, - 'npz_ges': self.config.LEVS - 1, - 'npz': self.config.LEVS - 1, + 'npz_ges': self.task_config.LEVS - 1, + 'npz': self.task_config.LEVS - 1, 'npx_anl': _res_anl + 1, 'npy_anl': _res_anl + 1, - 'npz_anl': self.config.LEVS - 1, + 'npz_anl': self.task_config.LEVS - 1, 'ATM_WINDOW_BEGIN': _window_begin, - 'ATM_WINDOW_LENGTH': f"PT{self.config.assim_freq}H", - 'OPREFIX': f"{self.runtime_config.CDUMP}.t{self.runtime_config.cyc:02d}z.", # TODO: CDUMP is being replaced by RUN - 'APREFIX': f"{self.runtime_config.CDUMP}.t{self.runtime_config.cyc:02d}z.", # TODO: CDUMP is being replaced by RUN - 'GPREFIX': f"gdas.t{self.runtime_config.previous_cycle.hour:02d}z.", + 'ATM_WINDOW_LENGTH': f"PT{self.task_config.assim_freq}H", + 'OPREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", + 'APREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", + 'GPREFIX': f"gdas.t{self.task_config.previous_cycle.hour:02d}z.", 'jedi_yaml': _jedi_yaml, - 'atm_obsdatain_path': f"{self.runtime_config.DATA}/obs/", - 'atm_obsdataout_path': f"{self.runtime_config.DATA}/diags/", + 'atm_obsdatain_path': f"{self.task_config.DATA}/obs/", + 'atm_obsdataout_path': f"{self.task_config.DATA}/diags/", 'BKG_TSTEP': "PT1H" # Placeholder for 4D applications } ) - # task_config is everything that this task should need - self.task_config = AttrDict(**self.config, **self.runtime_config, **local_dict) + # Extend task_config with local_dict + self.task_config = AttrDict(**self.task_config, **local_dict) @logit(logger) def initialize(self: Analysis) -> None: @@ -85,22 +85,22 @@ def initialize(self: Analysis) -> None: # stage static background error files, otherwise it will assume ID matrix logger.info(f"Stage files for STATICB_TYPE {self.task_config.STATICB_TYPE}") - FileHandler(self.get_berror_dict(self.task_config)).sync() + if self.task_config.STATICB_TYPE != 'identity': + berror_staging_dict = parse_j2yaml(self.task_config.BERROR_STAGING_YAML, self.task_config) + else: + berror_staging_dict = {} + FileHandler(berror_staging_dict).sync() # stage ensemble files for use in hybrid background error if self.task_config.DOHYBVAR: logger.debug(f"Stage ensemble files for DOHYBVAR {self.task_config.DOHYBVAR}") - localconf = AttrDict() - keys = ['COM_ATMOS_RESTART_TMPL', 'previous_cycle', 'ROTDIR', 'RUN', - 'NMEM_ENS', 'DATA', 'current_cycle', 'ntiles'] - for key in keys: - localconf[key] = self.task_config[key] - localconf.RUN = 'enkfgdas' - localconf.dirname = 'ens' - FileHandler(self.get_fv3ens_dict(localconf)).sync() + fv3ens_staging_dict = parse_j2yaml(self.task_config.FV3ENS_STAGING_YAML, self.task_config) + FileHandler(fv3ens_staging_dict).sync() # stage backgrounds - FileHandler(self.get_bkg_dict(AttrDict(self.task_config))).sync() + logger.info(f"Staging background files from {self.task_config.VAR_BKG_STAGING_YAML}") + bkg_staging_dict = parse_j2yaml(self.task_config.VAR_BKG_STAGING_YAML, self.task_config) + FileHandler(bkg_staging_dict).sync() # generate variational YAML file logger.debug(f"Generate variational YAML file: {self.task_config.jedi_yaml}") @@ -140,7 +140,7 @@ def variational(self: Analysis) -> None: @logit(logger) def init_fv3_increment(self: Analysis) -> None: # Setup JEDI YAML file - self.task_config.jedi_yaml = os.path.join(self.runtime_config.DATA, + self.task_config.jedi_yaml = os.path.join(self.task_config.DATA, f"{self.task_config.JCB_ALGO}.yaml") save_as_yaml(self.get_jedi_config(self.task_config.JCB_ALGO), self.task_config.jedi_yaml) @@ -198,8 +198,8 @@ def finalize(self: Analysis) -> None: # copy full YAML from executable to ROTDIR logger.info(f"Copying {self.task_config.jedi_yaml} to {self.task_config.COM_ATMOS_ANALYSIS}") - src = os.path.join(self.task_config.DATA, f"{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.atmvar.yaml") - dest = os.path.join(self.task_config.COM_ATMOS_ANALYSIS, f"{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.atmvar.yaml") + src = os.path.join(self.task_config.DATA, f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.atmvar.yaml") + dest = os.path.join(self.task_config.COM_ATMOS_ANALYSIS, f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.atmvar.yaml") logger.debug(f"Copying {src} to {dest}") yaml_copy = { 'mkdir': [self.task_config.COM_ATMOS_ANALYSIS], @@ -244,7 +244,7 @@ def finalize(self: Analysis) -> None: cdate = to_fv3time(self.task_config.current_cycle) cdate_inc = cdate.replace('.', '_') src = os.path.join(self.task_config.DATA, 'anl', f"atminc.{cdate_inc}z.nc4") - dest = os.path.join(self.task_config.COM_ATMOS_ANALYSIS, f'{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.atminc.nc') + dest = os.path.join(self.task_config.COM_ATMOS_ANALYSIS, f'{self.task_config.RUN}.t{self.task_config.cyc:02d}z.atminc.nc') logger.debug(f"Copying {src} to {dest}") inc_copy = { 'copy': [[src, dest]] @@ -253,189 +253,3 @@ def finalize(self: Analysis) -> None: def clean(self): super().clean() - - @logit(logger) - def get_bkg_dict(self, task_config: Dict[str, Any]) -> Dict[str, List[str]]: - """Compile a dictionary of model background files to copy - - This method constructs a dictionary of FV3 restart files (coupler, core, tracer) - that are needed for global atm DA and returns said dictionary for use by the FileHandler class. - - Parameters - ---------- - task_config: Dict - a dictionary containing all of the configuration needed for the task - - Returns - ---------- - bkg_dict: Dict - a dictionary containing the list of model background files to copy for FileHandler - """ - # NOTE for now this is FV3 restart files and just assumed to be fh006 - - # get FV3 restart files, this will be a lot simpler when using history files - rst_dir = os.path.join(task_config.COM_ATMOS_RESTART_PREV) # for now, option later? - run_dir = os.path.join(task_config.DATA, 'bkg') - - # Start accumulating list of background files to copy - bkglist = [] - - # atm DA needs coupler - basename = f'{to_fv3time(task_config.current_cycle)}.coupler.res' - bkglist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)]) - - # atm DA needs core, srf_wnd, tracer, phy_data, sfc_data - for ftype in ['core', 'srf_wnd', 'tracer']: - template = f'{to_fv3time(self.task_config.current_cycle)}.fv_{ftype}.res.tile{{tilenum}}.nc' - for itile in range(1, task_config.ntiles + 1): - basename = template.format(tilenum=itile) - bkglist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)]) - - for ftype in ['phy_data', 'sfc_data']: - template = f'{to_fv3time(self.task_config.current_cycle)}.{ftype}.tile{{tilenum}}.nc' - for itile in range(1, task_config.ntiles + 1): - basename = template.format(tilenum=itile) - bkglist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)]) - - bkg_dict = { - 'mkdir': [run_dir], - 'copy': bkglist, - } - return bkg_dict - - @logit(logger) - def get_berror_dict(self, config: Dict[str, Any]) -> Dict[str, List[str]]: - """Compile a dictionary of background error files to copy - - This method will construct a dictionary of either bump of gsibec background - error files for global atm DA and return said dictionary for use by the - FileHandler class. - - Parameters - ---------- - config: Dict - a dictionary containing all of the configuration needed - - Returns - ---------- - berror_dict: Dict - a dictionary containing the list of atm background error files to copy for FileHandler - """ - SUPPORTED_BERROR_STATIC_MAP = {'identity': self._get_berror_dict_identity, - 'bump': self._get_berror_dict_bump, - 'gsibec': self._get_berror_dict_gsibec} - - try: - berror_dict = SUPPORTED_BERROR_STATIC_MAP[config.STATICB_TYPE](config) - except KeyError: - raise KeyError(f"{config.STATICB_TYPE} is not a supported background error type.\n" + - f"Currently supported background error types are:\n" + - f'{" | ".join(SUPPORTED_BERROR_STATIC_MAP.keys())}') - - return berror_dict - - @staticmethod - @logit(logger) - def _get_berror_dict_identity(config: Dict[str, Any]) -> Dict[str, List[str]]: - """Identity BE does not need any files for staging. - - This is a private method and should not be accessed directly. - - Parameters - ---------- - config: Dict - a dictionary containing all of the configuration needed - Returns - ---------- - berror_dict: Dict - Empty dictionary [identity BE needs not files to stage] - """ - logger.info(f"Identity background error does not use staged files. Return empty dictionary") - return {} - - @staticmethod - @logit(logger) - def _get_berror_dict_bump(config: Dict[str, Any]) -> Dict[str, List[str]]: - """Compile a dictionary of atm bump background error files to copy - - This method will construct a dictionary of atm bump background error - files for global atm DA and return said dictionary to the parent - - This is a private method and should not be accessed directly. - - Parameters - ---------- - config: Dict - a dictionary containing all of the configuration needed - - Returns - ---------- - berror_dict: Dict - a dictionary of atm bump background error files to copy for FileHandler - """ - # BUMP atm static-B needs nicas, cor_rh, cor_rv and stddev files. - b_dir = config.BERROR_DATA_DIR - b_datestr = to_fv3time(config.BERROR_DATE) - berror_list = [] - for ftype in ['cor_rh', 'cor_rv', 'stddev']: - coupler = f'{b_datestr}.{ftype}.coupler.res' - berror_list.append([ - os.path.join(b_dir, coupler), os.path.join(config.DATA, 'berror', coupler) - ]) - - template = '{b_datestr}.{ftype}.fv_tracer.res.tile{{tilenum}}.nc' - for itile in range(1, config.ntiles + 1): - tracer = template.format(tilenum=itile) - berror_list.append([ - os.path.join(b_dir, tracer), os.path.join(config.DATA, 'berror', tracer) - ]) - - nproc = config.ntiles * config.layout_x * config.layout_y - for nn in range(1, nproc + 1): - berror_list.append([ - os.path.join(b_dir, f'nicas_aero_nicas_local_{nproc:06}-{nn:06}.nc'), - os.path.join(config.DATA, 'berror', f'nicas_aero_nicas_local_{nproc:06}-{nn:06}.nc') - ]) - - # create dictionary of background error files to stage - berror_dict = { - 'mkdir': [os.path.join(config.DATA, 'berror')], - 'copy': berror_list, - } - return berror_dict - - @staticmethod - @logit(logger) - def _get_berror_dict_gsibec(config: Dict[str, Any]) -> Dict[str, List[str]]: - """Compile a dictionary of atm gsibec background error files to copy - - This method will construct a dictionary of atm gsibec background error - files for global atm DA and return said dictionary to the parent - - This is a private method and should not be accessed directly. - - Parameters - ---------- - config: Dict - a dictionary containing all of the configuration needed - - Returns - ---------- - berror_dict: Dict - a dictionary of atm gsibec background error files to copy for FileHandler - """ - # GSI atm static-B needs namelist and coefficient files. - b_dir = os.path.join(config.HOMEgfs, 'fix', 'gdas', 'gsibec', config.CASE_ANL) - berror_list = [] - for ftype in ['gfs_gsi_global.nml', 'gsi-coeffs-gfs-global.nc4']: - berror_list.append([ - os.path.join(b_dir, ftype), - os.path.join(config.DATA, 'berror', ftype) - ]) - - # create dictionary of background error files to stage - berror_dict = { - 'mkdir': [os.path.join(config.DATA, 'berror')], - 'copy': berror_list, - } - return berror_dict diff --git a/ush/python/pygfs/task/atmens_analysis.py b/ush/python/pygfs/task/atmens_analysis.py index 37ac613736..bd5112050e 100644 --- a/ush/python/pygfs/task/atmens_analysis.py +++ b/ush/python/pygfs/task/atmens_analysis.py @@ -29,22 +29,22 @@ class AtmEnsAnalysis(Analysis): def __init__(self, config): super().__init__(config) - _res = int(self.config.CASE_ENS[1:]) - _window_begin = add_to_datetime(self.runtime_config.current_cycle, -to_timedelta(f"{self.config.assim_freq}H") / 2) - _jedi_yaml = os.path.join(self.runtime_config.DATA, f"{self.runtime_config.CDUMP}.t{self.runtime_config.cyc:02d}z.atmens.yaml") + _res = int(self.task_config.CASE_ENS[1:]) + _window_begin = add_to_datetime(self.task_config.current_cycle, -to_timedelta(f"{self.task_config.assim_freq}H") / 2) + _jedi_yaml = os.path.join(self.task_config.DATA, f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.atmens.yaml") # Create a local dictionary that is repeatedly used across this class local_dict = AttrDict( { 'npx_ges': _res + 1, 'npy_ges': _res + 1, - 'npz_ges': self.config.LEVS - 1, - 'npz': self.config.LEVS - 1, + 'npz_ges': self.task_config.LEVS - 1, + 'npz': self.task_config.LEVS - 1, 'ATM_WINDOW_BEGIN': _window_begin, - 'ATM_WINDOW_LENGTH': f"PT{self.config.assim_freq}H", - 'OPREFIX': f"{self.config.EUPD_CYC}.t{self.runtime_config.cyc:02d}z.", # TODO: CDUMP is being replaced by RUN - 'APREFIX': f"{self.runtime_config.CDUMP}.t{self.runtime_config.cyc:02d}z.", # TODO: CDUMP is being replaced by RUN - 'GPREFIX': f"gdas.t{self.runtime_config.previous_cycle.hour:02d}z.", + 'ATM_WINDOW_LENGTH': f"PT{self.task_config.assim_freq}H", + 'OPREFIX': f"{self.task_config.EUPD_CYC}.t{self.task_config.cyc:02d}z.", + 'APREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", + 'GPREFIX': f"gdas.t{self.task_config.previous_cycle.hour:02d}z.", 'jedi_yaml': _jedi_yaml, 'atm_obsdatain_path': f"./obs/", 'atm_obsdataout_path': f"./diags/", @@ -52,8 +52,8 @@ def __init__(self, config): } ) - # task_config is everything that this task should need - self.task_config = AttrDict(**self.config, **self.runtime_config, **local_dict) + # Extend task_config with local_dict + self.task_config = AttrDict(**self.task_config, **local_dict) @logit(logger) def initialize(self: Analysis) -> None: @@ -77,27 +77,6 @@ def initialize(self: Analysis) -> None: """ super().initialize() - # Make member directories in DATA for background and in DATA and ROTDIR for analysis files - # create template dictionary for output member analysis directories - template_inc = self.task_config.COM_ATMOS_ANALYSIS_TMPL - tmpl_inc_dict = { - 'ROTDIR': self.task_config.ROTDIR, - 'RUN': self.task_config.RUN, - 'YMD': to_YMD(self.task_config.current_cycle), - 'HH': self.task_config.current_cycle.strftime('%H') - } - dirlist = [] - for imem in range(1, self.task_config.NMEM_ENS + 1): - dirlist.append(os.path.join(self.task_config.DATA, 'bkg', f'mem{imem:03d}')) - dirlist.append(os.path.join(self.task_config.DATA, 'anl', f'mem{imem:03d}')) - - # create output directory path for member analysis - tmpl_inc_dict['MEMDIR'] = f"mem{imem:03d}" - incdir = Template.substitute_structure(template_inc, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_inc_dict.get) - dirlist.append(incdir) - - FileHandler({'mkdir': dirlist}).sync() - # stage CRTM fix files logger.info(f"Staging CRTM fix files from {self.task_config.CRTM_FIX_YAML}") crtm_fix_list = parse_j2yaml(self.task_config.CRTM_FIX_YAML, self.task_config) @@ -110,13 +89,8 @@ def initialize(self: Analysis) -> None: # stage backgrounds logger.info(f"Stage ensemble member background files") - localconf = AttrDict() - keys = ['COM_ATMOS_RESTART_TMPL', 'previous_cycle', 'ROTDIR', 'RUN', - 'NMEM_ENS', 'DATA', 'current_cycle', 'ntiles'] - for key in keys: - localconf[key] = self.task_config[key] - localconf.dirname = 'bkg' - FileHandler(self.get_fv3ens_dict(localconf)).sync() + bkg_staging_dict = parse_j2yaml(self.task_config.LGETKF_BKG_STAGING_YAML, self.task_config) + FileHandler(bkg_staging_dict).sync() # generate ensemble da YAML file logger.debug(f"Generate ensemble da YAML file: {self.task_config.jedi_yaml}") @@ -171,7 +145,7 @@ def letkf(self: Analysis) -> None: @logit(logger) def init_fv3_increment(self: Analysis) -> None: # Setup JEDI YAML file - self.task_config.jedi_yaml = os.path.join(self.runtime_config.DATA, + self.task_config.jedi_yaml = os.path.join(self.task_config.DATA, f"{self.task_config.JCB_ALGO}.yaml") save_as_yaml(self.get_jedi_config(self.task_config.JCB_ALGO), self.task_config.jedi_yaml) @@ -235,8 +209,8 @@ def finalize(self: Analysis) -> None: # copy full YAML from executable to ROTDIR logger.info(f"Copying {self.task_config.jedi_yaml} to {self.task_config.COM_ATMOS_ANALYSIS_ENS}") - src = os.path.join(self.task_config.DATA, f"{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.atmens.yaml") - dest = os.path.join(self.task_config.COM_ATMOS_ANALYSIS_ENS, f"{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.atmens.yaml") + src = os.path.join(self.task_config.DATA, f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.atmens.yaml") + dest = os.path.join(self.task_config.COM_ATMOS_ANALYSIS_ENS, f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.atmens.yaml") logger.debug(f"Copying {src} to {dest}") yaml_copy = { 'mkdir': [self.task_config.COM_ATMOS_ANALYSIS_ENS], @@ -265,7 +239,7 @@ def finalize(self: Analysis) -> None: tmpl_inc_dict['MEMDIR'] = memchar incdir = Template.substitute_structure(template_inc, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_inc_dict.get) src = os.path.join(self.task_config.DATA, 'anl', memchar, f"atminc.{cdate_inc}z.nc4") - dest = os.path.join(incdir, f"{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.atminc.nc") + dest = os.path.join(incdir, f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.atminc.nc") # copy increment logger.debug(f"Copying {src} to {dest}") diff --git a/ush/python/pygfs/task/oceanice_products.py b/ush/python/pygfs/task/oceanice_products.py index 690aac9542..98b57ae801 100644 --- a/ush/python/pygfs/task/oceanice_products.py +++ b/ush/python/pygfs/task/oceanice_products.py @@ -49,39 +49,40 @@ def __init__(self, config: Dict[str, Any]) -> None: """ super().__init__(config) - if self.config.COMPONENT not in self.VALID_COMPONENTS: - raise NotImplementedError(f'{self.config.COMPONENT} is not a valid model component.\n' + + if self.task_config.COMPONENT not in self.VALID_COMPONENTS: + raise NotImplementedError(f'{self.task_config.COMPONENT} is not a valid model component.\n' + 'Valid model components are:\n' + f'{", ".join(self.VALID_COMPONENTS)}') - model_grid = f"mx{self.config[self.COMPONENT_RES_MAP[self.config.COMPONENT]]:03d}" + model_grid = f"mx{self.task_config[self.COMPONENT_RES_MAP[self.task_config.COMPONENT]]:03d}" - valid_datetime = add_to_datetime(self.runtime_config.current_cycle, to_timedelta(f"{self.config.FORECAST_HOUR}H")) + valid_datetime = add_to_datetime(self.task_config.current_cycle, to_timedelta(f"{self.task_config.FORECAST_HOUR}H")) - if self.config.COMPONENT == 'ice': - offset = int(self.runtime_config.current_cycle.strftime("%H")) % self.config.FHOUT_ICE_GFS + if self.task_config.COMPONENT == 'ice': + offset = int(self.task_config.current_cycle.strftime("%H")) % self.task_config.FHOUT_ICE_GFS # For CICE cases where offset is not 0, forecast_hour needs to be adjusted based on the offset. # TODO: Consider FHMIN when calculating offset. if offset != 0: - forecast_hour = self.config.FORECAST_HOUR - int(self.runtime_config.current_cycle.strftime("%H")) + forecast_hour = self.task_config.FORECAST_HOUR - int(self.task_config.current_cycle.strftime("%H")) # For the first forecast hour, the interval may be different from the intervals of subsequent forecast hours - if forecast_hour <= self.config.FHOUT_ICE_GFS: - interval = self.config.FHOUT_ICE_GFS - int(self.runtime_config.current_cycle.strftime("%H")) + if forecast_hour <= self.task_config.FHOUT_ICE_GFS: + interval = self.task_config.FHOUT_ICE_GFS - int(self.task_config.current_cycle.strftime("%H")) else: - interval = self.config.FHOUT_ICE_GFS + interval = self.task_config.FHOUT_ICE_GFS else: - forecast_hour = self.config.FORECAST_HOUR - interval = self.config.FHOUT_ICE_GFS - if self.config.COMPONENT == 'ocean': - forecast_hour = self.config.FORECAST_HOUR - interval = self.config.FHOUT_OCN_GFS + forecast_hour = self.task_config.FORECAST_HOUR + interval = self.task_config.FHOUT_ICE_GFS + if self.task_config.COMPONENT == 'ocean': + forecast_hour = self.task_config.FORECAST_HOUR + interval = self.task_config.FHOUT_OCN_GFS # TODO: This is a bit of a hack, but it works for now # FIXME: find a better way to provide the averaging period avg_period = f"{forecast_hour-interval:03d}-{forecast_hour:03d}" + # Extend task_config with localdict localdict = AttrDict( - {'component': self.config.COMPONENT, + {'component': self.task_config.COMPONENT, 'forecast_hour': forecast_hour, 'valid_datetime': valid_datetime, 'avg_period': avg_period, @@ -89,11 +90,11 @@ def __init__(self, config: Dict[str, Any]) -> None: 'interval': interval, 'product_grids': self.VALID_PRODUCT_GRIDS[model_grid]} ) - self.task_config = AttrDict(**self.config, **self.runtime_config, **localdict) + self.task_config = AttrDict(**self.task_config, **localdict) # Read the oceanice_products.yaml file for common configuration - logger.info(f"Read the ocean ice products configuration yaml file {self.config.OCEANICEPRODUCTS_CONFIG}") - self.task_config.oceanice_yaml = parse_j2yaml(self.config.OCEANICEPRODUCTS_CONFIG, self.task_config) + logger.info(f"Read the ocean ice products configuration yaml file {self.task_config.OCEANICEPRODUCTS_CONFIG}") + self.task_config.oceanice_yaml = parse_j2yaml(self.task_config.OCEANICEPRODUCTS_CONFIG, self.task_config) logger.debug(f"oceanice_yaml:\n{pformat(self.task_config.oceanice_yaml)}") @staticmethod diff --git a/ush/python/pygfs/task/snow_analysis.py b/ush/python/pygfs/task/snow_analysis.py index 9a5c7fcab0..9656b00a8e 100644 --- a/ush/python/pygfs/task/snow_analysis.py +++ b/ush/python/pygfs/task/snow_analysis.py @@ -32,27 +32,27 @@ class SnowAnalysis(Analysis): def __init__(self, config): super().__init__(config) - _res = int(self.config['CASE'][1:]) - _window_begin = add_to_datetime(self.runtime_config.current_cycle, -to_timedelta(f"{self.config['assim_freq']}H") / 2) - _letkfoi_yaml = os.path.join(self.runtime_config.DATA, f"{self.runtime_config.RUN}.t{self.runtime_config['cyc']:02d}z.letkfoi.yaml") + _res = int(self.task_config['CASE'][1:]) + _window_begin = add_to_datetime(self.task_config.current_cycle, -to_timedelta(f"{self.task_config['assim_freq']}H") / 2) + _letkfoi_yaml = os.path.join(self.task_config.DATA, f"{self.task_config.RUN}.t{self.task_config['cyc']:02d}z.letkfoi.yaml") # Create a local dictionary that is repeatedly used across this class local_dict = AttrDict( { 'npx_ges': _res + 1, 'npy_ges': _res + 1, - 'npz_ges': self.config.LEVS - 1, - 'npz': self.config.LEVS - 1, + 'npz_ges': self.task_config.LEVS - 1, + 'npz': self.task_config.LEVS - 1, 'SNOW_WINDOW_BEGIN': _window_begin, - 'SNOW_WINDOW_LENGTH': f"PT{self.config['assim_freq']}H", - 'OPREFIX': f"{self.runtime_config.RUN}.t{self.runtime_config.cyc:02d}z.", - 'APREFIX': f"{self.runtime_config.RUN}.t{self.runtime_config.cyc:02d}z.", + 'SNOW_WINDOW_LENGTH': f"PT{self.task_config['assim_freq']}H", + 'OPREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", + 'APREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", 'jedi_yaml': _letkfoi_yaml } ) - # task_config is everything that this task should need - self.task_config = AttrDict(**self.config, **self.runtime_config, **local_dict) + # Extend task_config with local_dict + self.task_config = AttrDict(**self.task_config, **local_dict) @logit(logger) def prepare_GTS(self) -> None: @@ -114,7 +114,7 @@ def _gtsbufr2iodax(exe, yaml_file): # 1. generate bufr2ioda YAML files # 2. execute bufr2ioda.x for name in prep_gts_config.bufr2ioda.keys(): - gts_yaml = os.path.join(self.runtime_config.DATA, f"bufr_{name}_snow.yaml") + gts_yaml = os.path.join(self.task_config.DATA, f"bufr_{name}_snow.yaml") logger.info(f"Generate BUFR2IODA YAML file: {gts_yaml}") temp_yaml = parse_j2yaml(prep_gts_config.bufr2ioda[name], localconf) save_as_yaml(temp_yaml, gts_yaml) diff --git a/ush/python/pygfs/task/upp.py b/ush/python/pygfs/task/upp.py index 7db50e1582..7e42e07c64 100644 --- a/ush/python/pygfs/task/upp.py +++ b/ush/python/pygfs/task/upp.py @@ -46,26 +46,27 @@ def __init__(self, config: Dict[str, Any]) -> None: """ super().__init__(config) - if self.config.UPP_RUN not in self.VALID_UPP_RUN: - raise NotImplementedError(f'{self.config.UPP_RUN} is not a valid UPP run type.\n' + + if self.task_config.UPP_RUN not in self.VALID_UPP_RUN: + raise NotImplementedError(f'{self.task_config.UPP_RUN} is not a valid UPP run type.\n' + 'Valid UPP_RUN values are:\n' + f'{", ".join(self.VALID_UPP_RUN)}') - valid_datetime = add_to_datetime(self.runtime_config.current_cycle, to_timedelta(f"{self.config.FORECAST_HOUR}H")) + valid_datetime = add_to_datetime(self.task_config.current_cycle, to_timedelta(f"{self.task_config.FORECAST_HOUR}H")) + # Extend task_config with localdict localdict = AttrDict( - {'upp_run': self.config.UPP_RUN, - 'forecast_hour': self.config.FORECAST_HOUR, + {'upp_run': self.task_config.UPP_RUN, + 'forecast_hour': self.task_config.FORECAST_HOUR, 'valid_datetime': valid_datetime, 'atmos_filename': f"atm_{valid_datetime.strftime('%Y%m%d%H%M%S')}.nc", 'flux_filename': f"sfc_{valid_datetime.strftime('%Y%m%d%H%M%S')}.nc" } ) - self.task_config = AttrDict(**self.config, **self.runtime_config, **localdict) + self.task_config = AttrDict(**self.task_config, **localdict) # Read the upp.yaml file for common configuration - logger.info(f"Read the UPP configuration yaml file {self.config.UPP_CONFIG}") - self.task_config.upp_yaml = parse_j2yaml(self.config.UPP_CONFIG, self.task_config) + logger.info(f"Read the UPP configuration yaml file {self.task_config.UPP_CONFIG}") + self.task_config.upp_yaml = parse_j2yaml(self.task_config.UPP_CONFIG, self.task_config) logger.debug(f"upp_yaml:\n{pformat(self.task_config.upp_yaml)}") @staticmethod diff --git a/workflow/hosts.py b/workflow/hosts.py index 2334a3ac35..cd0cfe0083 100644 --- a/workflow/hosts.py +++ b/workflow/hosts.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import os +import socket from pathlib import Path from wxflow import YAMLFile @@ -39,10 +40,7 @@ def detect(cls): if os.path.exists('/scratch1/NCEPDEV'): machine = 'HERA' elif os.path.exists('/work/noaa'): - if os.path.exists('/apps/other'): - machine = 'HERCULES' - else: - machine = 'ORION' + machine = socket.gethostname().split("-", 1)[0].upper() elif os.path.exists('/lfs4/HFIP'): machine = 'JET' elif os.path.exists('/lfs/f1'):