From e020fce29599a272676a321d31153afc5386b8c1 Mon Sep 17 00:00:00 2001 From: Eric Sinsky - NOAA <48259628+EricSinsky-NOAA@users.noreply.github.com> Date: Fri, 22 Nov 2024 16:00:02 -0500 Subject: [PATCH] Add ability to compress ocean and ice products (#3111) This PR adds the ability to optionally compress ocean and ice products for the GEFS reforecast. Compression can be set using a new `config.base` variable called `DO_OCNICE_COMPRESS`. For the reforecast, the ice history data and the 0p25 ocean post-processed data are gzipped at the end of the ice and ocean prod tasks, respectively. The archive task was also modified to allow either gzipped or non-gzipped ocean and ice data to be sent to HPSS, which is controlled by `DO_OCNICE_COMPRESS`. Furthermore, an option was added to skip the ocean and ice post-processing within the ocean and ice prod jobs, respectively. This can be controlled by `DO_ICE_INTERP` and `DO_OCN_INTERP` in `config.base`. In order to meet the reforecast requirements, `DO_ICE_INTERP` is set to `NO` and and `DO_OCN_INTERP` is set to `YES` by default. This PR also accomplishes the following: - [x] The default WCOSS2 `STMP` and `PTMP` were modified to `/lfs/h3/emc/gefstemp/` because this is the space where the reforecast will be performed. - [x] The wavepostpnt task has been removed. - [x] If a group account is used, divide the stmp and ptmp directories by `SUDO_USER`. - [x] The clean_up job has been modified for the reforecast. --- parm/archive/gefs_extracted_ice.yaml.j2 | 8 ++++- parm/archive/gefs_extracted_ocean.yaml.j2 | 8 ++++- parm/archive/gefs_extracted_wave.yaml.j2 | 10 ------ parm/config/gefs/config.base | 16 ++++++++-- scripts/exglobal_archive.py | 2 +- scripts/exglobal_cleanup.sh | 9 +++++- scripts/exglobal_oceanice_products.py | 36 ++++++++++++++-------- ush/python/pygfs/task/oceanice_products.py | 31 ++++++++++++++++++- workflow/applications/gefs.py | 4 ++- workflow/rocoto/gefs_tasks.py | 6 ++++ 10 files changed, 99 insertions(+), 31 deletions(-) diff --git a/parm/archive/gefs_extracted_ice.yaml.j2 b/parm/archive/gefs_extracted_ice.yaml.j2 index 786d502f23..f46394a8cc 100644 --- a/parm/archive/gefs_extracted_ice.yaml.j2 +++ b/parm/archive/gefs_extracted_ice.yaml.j2 @@ -13,6 +13,12 @@ gefs_ice: {% do members.append("mem" ~ '%03d' % mem_nm ) %} {% endfor %} +{% if DO_OCNICE_COMPRESS %} + {% set fileext = ".nc.gz" %} +{% else %} + {% set fileext = ".nc" %} +{% endif %} + {% for mem in members %} {% set tmpl_dict = ({ '${ROTDIR}':ROTDIR, '${RUN}':RUN, @@ -25,7 +31,7 @@ gefs_ice: # Select netcdf files to copy to the atardir {% if path_exists(COMIN_ICE_HISTORY) %} {% for fhr in range(FHMIN_GFS + FHOUT_ICE_GFS, FHMAX_GFS + FHOUT_ICE_GFS, FHOUT_ICE_GFS) %} - {% set file_name = head ~ FHOUT_ICE_GFS ~ "hr_avg" ~ ".f" ~ '%03d'|format(fhr) ~ ".nc" %} + {% set file_name = head ~ FHOUT_ICE_GFS ~ "hr_avg" ~ ".f" ~ '%03d'|format(fhr) ~ fileext %} {% set file_path = COMIN_ICE_HISTORY ~ "/" ~ file_name %} - "{{ file_path | relpath(ROTDIR)}}" {% endfor %} diff --git a/parm/archive/gefs_extracted_ocean.yaml.j2 b/parm/archive/gefs_extracted_ocean.yaml.j2 index 2a5c73c102..63863a3461 100644 --- a/parm/archive/gefs_extracted_ocean.yaml.j2 +++ b/parm/archive/gefs_extracted_ocean.yaml.j2 @@ -13,6 +13,12 @@ gefs_ocean: {% do members.append("mem" ~ '%03d' % mem_nm ) %} {% endfor %} +{% if DO_OCNICE_COMPRESS %} + {% set fileext = ".nc.gz" %} +{% else %} + {% set fileext = ".nc" %} +{% endif %} + {% set res = (OCNRES|string())[0] ~ "p" ~ (OCNRES|string())[-2:] %} {% for mem in members %} @@ -28,7 +34,7 @@ gefs_ocean: {% set netcdf_grid_dir = COMIN_OCEAN_NETCDF ~ "/" ~ res %} {% if path_exists(netcdf_grid_dir) %} {% for fhr in range(FHMIN_GFS + FHOUT_OCN_GFS, FHMAX_GFS + FHOUT_OCN_GFS, FHOUT_OCN_GFS) %} - {% set file_name = head ~ res ~ ".f" ~ '%03d'|format(fhr) ~ ".nc" %} + {% set file_name = head ~ res ~ ".f" ~ '%03d'|format(fhr) ~ fileext %} {% set file_path = netcdf_grid_dir ~ "/" ~ file_name %} - "{{ file_path | relpath(ROTDIR)}}" {% endfor %} diff --git a/parm/archive/gefs_extracted_wave.yaml.j2 b/parm/archive/gefs_extracted_wave.yaml.j2 index e0aa07c816..2eb533d277 100644 --- a/parm/archive/gefs_extracted_wave.yaml.j2 +++ b/parm/archive/gefs_extracted_wave.yaml.j2 @@ -38,14 +38,4 @@ gefs_wave: {% endfor %} {% endif %} - {% set COMIN_WAVE_STATION = COM_WAVE_STATION_TMPL | replace_tmpl(tmpl_dict) %} - # Select station files to copy to the atardir - {% if path_exists(COMIN_WAVE_STATION) %} - {% set file_path = COMIN_WAVE_STATION ~ "/" ~ RUN ~ "wave.t" ~ cycle_HH ~ "z.spec_tar.gz" %} - - "{{ file_path | relpath(ROTDIR)}}" - {% set file_path = COMIN_WAVE_STATION ~ "/" ~ RUN ~ "wave.t" ~ cycle_HH ~ "z.cbull_tar" %} - - "{{ file_path | relpath(ROTDIR)}}" - {% set file_path = COMIN_WAVE_STATION ~ "/" ~ RUN ~ "wave.t" ~ cycle_HH ~ "z.bull_tar" %} - - "{{ file_path | relpath(ROTDIR)}}" - {% endif %} {% endfor %} diff --git a/parm/config/gefs/config.base b/parm/config/gefs/config.base index f16bd6a9bf..50ba6ad113 100644 --- a/parm/config/gefs/config.base +++ b/parm/config/gefs/config.base @@ -41,8 +41,16 @@ export COMINsyn="@COMINsyn@" # USER specific paths export HOMEDIR="@HOMEDIR@" -export STMP="@STMP@" -export PTMP="@PTMP@" + +SUDO_USER=${SUDO_USER:-""} +if [[ -z ${SUDO_USER} ]]; then + export STMP="/lfs/h3/emc/gefstemp/stmp/${USER}/stmp" + export PTMP="/lfs/h3/emc/gefstemp/ptmp/${USER}/ptmp" +else + export STMP="/lfs/h3/emc/gefstemp/${USER}/${SUDO_USER}/stmp" + export PTMP="/lfs/h2/emc/gefstemp/${USER}/${SUDO_USER}/ptmp" +fi + export NOSCRUB="@NOSCRUB@" # Base directories for various builds @@ -340,6 +348,10 @@ export HPSSICARCH="/NCEPDEV/emc-marine/2year/Neil.Barton/ICS/HR4/C384mx025" # IC export DO_DOWNLOAD_ANLY="YES" # Set to YES to download f03 replay analysis from AWS, set to NO to copy f03 analysis locally export ANLYDIR="" # f03 replay analysis directory on disk. May be left blank if DO_DOWNLOAD_ANLY is set to YES +export DO_ICE_INTERP="NO" # Set to YES to perform ice post-processing in ice_prod task +export DO_OCN_INTERP="YES" # Set to YES to perform ocean post-processing in ocean_prod task +export DO_OCNICE_COMPRESS="YES" # Set to YES to compress ocean and ice products + export DELETE_COM_IN_ARCHIVE_JOB="YES" # NO=retain ROTDIR. YES default in arch.sh and earc.sh. # Number of regional collectives to create soundings for diff --git a/scripts/exglobal_archive.py b/scripts/exglobal_archive.py index 4eb6270fcd..37a2e4ad1b 100755 --- a/scripts/exglobal_archive.py +++ b/scripts/exglobal_archive.py @@ -32,7 +32,7 @@ def main(): 'AERO_ANL_RUN', 'AERO_FCST_RUN', 'DOIBP_WAV', 'DO_JEDIOCNVAR', 'NMEM_ENS', 'DO_JEDIATMVAR', 'DO_VRFY_OCEANDA', 'FHMAX_FITS', 'waveGRD', 'IAUFHRS', 'DO_FIT2OBS', 'NET', 'FHOUT_HF_GFS', 'FHMAX_HF_GFS', 'REPLAY_ICS', - 'OFFSET_START_HOUR'] + 'OFFSET_START_HOUR', 'DO_OCNICE_COMPRESS'] archive_dict = AttrDict() for key in keys: diff --git a/scripts/exglobal_cleanup.sh b/scripts/exglobal_cleanup.sh index 73637a0d55..6b1bd60de8 100755 --- a/scripts/exglobal_cleanup.sh +++ b/scripts/exglobal_cleanup.sh @@ -103,9 +103,16 @@ RDATE=$(date --utc +%Y%m%d%H -d "${PDY} ${cyc} -${FHMAX_GFS} hours") if (( GDATE < RDATE )); then RDATE=${GDATE} fi -deletion_target="${ROTDIR}/${RUN}.${RDATE:0:8}" + +# Delete ROTDIR data from current cycle +deletion_target="${ROTDIR}/${RUN}.${PDY:0:8}" if [[ -d ${deletion_target} ]]; then rm -rf "${deletion_target}"; fi +# Delete ROTDIR data from previous cycle +PDYprevcyc=$(date --utc +%Y%m%d%H -d "${PDY} 00 -6 hours") +deletion_target_prevcyc="${ROTDIR}/${RUN}.${PDYprevcyc:0:8}" +if [[ -d ${deletion_target_prevcyc} ]]; then rm -rf "${deletion_target_prevcyc}"; fi + # sync and wait to avoid filesystem synchronization issues sync && sleep 1 diff --git a/scripts/exglobal_oceanice_products.py b/scripts/exglobal_oceanice_products.py index 9bb2b09596..64255e0a5a 100755 --- a/scripts/exglobal_oceanice_products.py +++ b/scripts/exglobal_oceanice_products.py @@ -21,31 +21,41 @@ def main(): keys = ['HOMEgfs', 'DATA', 'current_cycle', 'RUN', 'NET', f'COM_{oceanice.task_config.component.upper()}_HISTORY', f'COM_{oceanice.task_config.component.upper()}_GRIB', + f'COM_{oceanice.task_config.component.upper()}_NETCDF', 'APRUN_OCNICEPOST', 'component', 'forecast_hour', 'valid_datetime', 'avg_period', - 'model_grid', 'product_grids', 'oceanice_yaml'] + 'model_grid', 'product_grids', 'oceanice_yaml', + 'DO_OCNICE_COMPRESS'] + oceanice_dict = AttrDict() + for key in keys: oceanice_dict[key] = oceanice.task_config[key] - # Initialize the DATA/ directory; copy static data - oceanice.initialize(oceanice_dict) + if oceanice.task_config.do_interp: + + # Initialize the DATA/ directory; copy static data + oceanice.initialize(oceanice_dict) + + for grid in oceanice_dict.product_grids: - for grid in oceanice_dict.product_grids: + logger.info(f"Processing {grid} grid") - logger.info(f"Processing {grid} grid") + # Configure DATA/ directory for execution; prepare namelist etc. + oceanice.configure(oceanice_dict, grid) - # Configure DATA/ directory for execution; prepare namelist etc. - oceanice.configure(oceanice_dict, grid) + # Run the oceanice post executable to interpolate and create grib2 files + oceanice.execute(oceanice_dict, grid) - # Run the oceanice post executable to interpolate and create grib2 files - oceanice.execute(oceanice_dict, grid) + # Subset raw model data to create netCDF products + oceanice.subset(oceanice_dict) - # Subset raw model data to create netCDF products - oceanice.subset(oceanice_dict) + # Copy processed output from execute and subset + oceanice.finalize(oceanice_dict) - # Copy processed output from execute and subset - oceanice.finalize(oceanice_dict) + # Compress ocean and ice data + if oceanice_dict.DO_OCNICE_COMPRESS: + oceanice.compress(oceanice_dict) if __name__ == '__main__': diff --git a/ush/python/pygfs/task/oceanice_products.py b/ush/python/pygfs/task/oceanice_products.py index 39ec53b100..3de605668b 100644 --- a/ush/python/pygfs/task/oceanice_products.py +++ b/ush/python/pygfs/task/oceanice_products.py @@ -14,7 +14,8 @@ Task, add_to_datetime, to_timedelta, WorkflowException, - Executable) + Executable, + which) logger = getLogger(__name__.split('.')[-1]) @@ -61,8 +62,10 @@ def __init__(self, config: Dict[str, Any]) -> None: forecast_hour = self.task_config.FORECAST_HOUR if self.task_config.COMPONENT == 'ice': interval = self.task_config.FHOUT_ICE_GFS + do_interp = self.task_config.DO_ICE_INTERP if self.task_config.COMPONENT == 'ocean': interval = self.task_config.FHOUT_OCN_GFS + do_interp = self.task_config.DO_OCN_INTERP # TODO: This is a bit of a hack, but it works for now # FIXME: find a better way to provide the averaging period @@ -76,6 +79,7 @@ def __init__(self, config: Dict[str, Any]) -> None: 'avg_period': avg_period, 'model_grid': model_grid, 'interval': interval, + 'do_interp': do_interp, 'product_grids': self.VALID_PRODUCT_GRIDS[model_grid]} ) self.task_config = AttrDict(**self.task_config, **localdict) @@ -342,3 +346,28 @@ def finalize(config: Dict) -> None: logger.info(f"Copy processed data to COM/ directory") FileHandler(data_out).sync() + + @staticmethod + @logit(logger) + def compress(config: Dict) -> None: + """Perform compression at the end of the task. + Perform compression to individual files in COM/ + + Parameters + ---------- + config: Dict + Configuration dictionary for the task + + Returns + ------- + None + """ + + logger.info(f"Compress processed data in COM/ directory") + gzip_cmd = which("/usr/bin/gzip") + gzip_cmd.add_default_arg("-kf") + if config.component == 'ocean': + interpfile = config.COM_OCEAN_NETCDF + f"/0p25/gefs.{config.component}.t00z.0p25.f{config.forecast_hour:03d}.nc" + if config.component == 'ice': + interpfile = config.COM_ICE_HISTORY + f"/gefs.{config.component}.t00z.24hr_avg.f{config.forecast_hour:03d}.nc" + gzip_cmd(interpfile) diff --git a/workflow/applications/gefs.py b/workflow/applications/gefs.py index 99de24122c..3b2cfd59d5 100644 --- a/workflow/applications/gefs.py +++ b/workflow/applications/gefs.py @@ -75,11 +75,13 @@ def get_task_names(self): if self.do_ocean: tasks += ['ocean_prod'] + if self.do_ice: + tasks += ['ice_prod'] + if self.do_wave: tasks += ['wavepostsbs'] if self.do_wave_bnd: tasks += ['wavepostbndpnt', 'wavepostbndpntbll'] - tasks += ['wavepostpnt'] if self.do_extractvars: tasks += ['extractvars', 'arch'] diff --git a/workflow/rocoto/gefs_tasks.py b/workflow/rocoto/gefs_tasks.py index 961abd819a..237e22dca8 100644 --- a/workflow/rocoto/gefs_tasks.py +++ b/workflow/rocoto/gefs_tasks.py @@ -531,6 +531,9 @@ def extractvars(self): if self.app_config.do_ocean: dep_dict = {'type': 'metatask', 'name': 'gefs_ocean_prod_#member#'} deps.append(rocoto.add_dependency(dep_dict)) + if self.app_config.do_ice: + dep_dict = {'type': 'metatask', 'name': 'gefs_ice_prod_#member#'} + deps.append(rocoto.add_dependency(dep_dict)) if self.app_config.do_atm: dep_dict = {'type': 'metatask', 'name': 'gefs_atmos_prod_#member#'} deps.append(rocoto.add_dependency(dep_dict)) @@ -574,6 +577,9 @@ def arch(self): if self.app_config.do_ocean: dep_dict = {'type': 'metatask', 'name': 'gefs_ocean_prod'} deps.append(rocoto.add_dependency(dep_dict)) + if self.app_config.do_ice: + dep_dict = {'type': 'metatask', 'name': 'gefs_ice_prod'} + deps.append(rocoto.add_dependency(dep_dict)) if self.app_config.do_wave: dep_dict = {'type': 'metatask', 'name': 'gefs_wave_post_grid'} deps.append(rocoto.add_dependency(dep_dict))