Skip to content

Commit

Permalink
Add ability to compress ocean and ice products (#3111)
Browse files Browse the repository at this point in the history
This PR adds the ability to optionally compress ocean and ice products
for the GEFS reforecast. Compression can be set using a new
`config.base` variable called `DO_OCNICE_COMPRESS`. For the reforecast,
the ice history data and the 0p25 ocean post-processed data are gzipped
at the end of the ice and ocean prod tasks, respectively. The archive
task was also modified to allow either gzipped or non-gzipped ocean and
ice data to be sent to HPSS, which is controlled by
`DO_OCNICE_COMPRESS`.

Furthermore, an option was added to skip the ocean and ice
post-processing within the ocean and ice prod jobs, respectively. This
can be controlled by `DO_ICE_INTERP` and `DO_OCN_INTERP` in
`config.base`. In order to meet the reforecast requirements,
`DO_ICE_INTERP` is set to `NO` and and `DO_OCN_INTERP` is set to `YES`
by default.

This PR also accomplishes the following:

- [x] The default WCOSS2 `STMP` and `PTMP` were modified to
`/lfs/h3/emc/gefstemp/` because this is the space where the reforecast
will be performed.
- [x] The wavepostpnt task has been removed. 
- [x] If a group account is used, divide the stmp and ptmp directories
by `SUDO_USER`.
- [x] The clean_up job has been modified for the reforecast.
  • Loading branch information
EricSinsky-NOAA authored Nov 22, 2024
1 parent 96d20b0 commit e020fce
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 31 deletions.
8 changes: 7 additions & 1 deletion parm/archive/gefs_extracted_ice.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ gefs_ice:
{% do members.append("mem" ~ '%03d' % mem_nm ) %}
{% endfor %}

{% if DO_OCNICE_COMPRESS %}
{% set fileext = ".nc.gz" %}
{% else %}
{% set fileext = ".nc" %}
{% endif %}

{% for mem in members %}
{% set tmpl_dict = ({ '${ROTDIR}':ROTDIR,
'${RUN}':RUN,
Expand All @@ -25,7 +31,7 @@ gefs_ice:
# Select netcdf files to copy to the atardir
{% if path_exists(COMIN_ICE_HISTORY) %}
{% for fhr in range(FHMIN_GFS + FHOUT_ICE_GFS, FHMAX_GFS + FHOUT_ICE_GFS, FHOUT_ICE_GFS) %}
{% set file_name = head ~ FHOUT_ICE_GFS ~ "hr_avg" ~ ".f" ~ '%03d'|format(fhr) ~ ".nc" %}
{% set file_name = head ~ FHOUT_ICE_GFS ~ "hr_avg" ~ ".f" ~ '%03d'|format(fhr) ~ fileext %}
{% set file_path = COMIN_ICE_HISTORY ~ "/" ~ file_name %}
- "{{ file_path | relpath(ROTDIR)}}"
{% endfor %}
Expand Down
8 changes: 7 additions & 1 deletion parm/archive/gefs_extracted_ocean.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ gefs_ocean:
{% do members.append("mem" ~ '%03d' % mem_nm ) %}
{% endfor %}

{% if DO_OCNICE_COMPRESS %}
{% set fileext = ".nc.gz" %}
{% else %}
{% set fileext = ".nc" %}
{% endif %}

{% set res = (OCNRES|string())[0] ~ "p" ~ (OCNRES|string())[-2:] %}

{% for mem in members %}
Expand All @@ -28,7 +34,7 @@ gefs_ocean:
{% set netcdf_grid_dir = COMIN_OCEAN_NETCDF ~ "/" ~ res %}
{% if path_exists(netcdf_grid_dir) %}
{% for fhr in range(FHMIN_GFS + FHOUT_OCN_GFS, FHMAX_GFS + FHOUT_OCN_GFS, FHOUT_OCN_GFS) %}
{% set file_name = head ~ res ~ ".f" ~ '%03d'|format(fhr) ~ ".nc" %}
{% set file_name = head ~ res ~ ".f" ~ '%03d'|format(fhr) ~ fileext %}
{% set file_path = netcdf_grid_dir ~ "/" ~ file_name %}
- "{{ file_path | relpath(ROTDIR)}}"
{% endfor %}
Expand Down
10 changes: 0 additions & 10 deletions parm/archive/gefs_extracted_wave.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,4 @@ gefs_wave:
{% endfor %}
{% endif %}

{% set COMIN_WAVE_STATION = COM_WAVE_STATION_TMPL | replace_tmpl(tmpl_dict) %}
# Select station files to copy to the atardir
{% if path_exists(COMIN_WAVE_STATION) %}
{% set file_path = COMIN_WAVE_STATION ~ "/" ~ RUN ~ "wave.t" ~ cycle_HH ~ "z.spec_tar.gz" %}
- "{{ file_path | relpath(ROTDIR)}}"
{% set file_path = COMIN_WAVE_STATION ~ "/" ~ RUN ~ "wave.t" ~ cycle_HH ~ "z.cbull_tar" %}
- "{{ file_path | relpath(ROTDIR)}}"
{% set file_path = COMIN_WAVE_STATION ~ "/" ~ RUN ~ "wave.t" ~ cycle_HH ~ "z.bull_tar" %}
- "{{ file_path | relpath(ROTDIR)}}"
{% endif %}
{% endfor %}
16 changes: 14 additions & 2 deletions parm/config/gefs/config.base
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,16 @@ export COMINsyn="@COMINsyn@"

# USER specific paths
export HOMEDIR="@HOMEDIR@"
export STMP="@STMP@"
export PTMP="@PTMP@"

SUDO_USER=${SUDO_USER:-""}
if [[ -z ${SUDO_USER} ]]; then
export STMP="/lfs/h3/emc/gefstemp/stmp/${USER}/stmp"
export PTMP="/lfs/h3/emc/gefstemp/ptmp/${USER}/ptmp"
else
export STMP="/lfs/h3/emc/gefstemp/${USER}/${SUDO_USER}/stmp"
export PTMP="/lfs/h2/emc/gefstemp/${USER}/${SUDO_USER}/ptmp"
fi

export NOSCRUB="@NOSCRUB@"

# Base directories for various builds
Expand Down Expand Up @@ -340,6 +348,10 @@ export HPSSICARCH="/NCEPDEV/emc-marine/2year/Neil.Barton/ICS/HR4/C384mx025" # IC
export DO_DOWNLOAD_ANLY="YES" # Set to YES to download f03 replay analysis from AWS, set to NO to copy f03 analysis locally
export ANLYDIR="" # f03 replay analysis directory on disk. May be left blank if DO_DOWNLOAD_ANLY is set to YES

export DO_ICE_INTERP="NO" # Set to YES to perform ice post-processing in ice_prod task
export DO_OCN_INTERP="YES" # Set to YES to perform ocean post-processing in ocean_prod task
export DO_OCNICE_COMPRESS="YES" # Set to YES to compress ocean and ice products

export DELETE_COM_IN_ARCHIVE_JOB="YES" # NO=retain ROTDIR. YES default in arch.sh and earc.sh.

# Number of regional collectives to create soundings for
Expand Down
2 changes: 1 addition & 1 deletion scripts/exglobal_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def main():
'AERO_ANL_RUN', 'AERO_FCST_RUN', 'DOIBP_WAV', 'DO_JEDIOCNVAR',
'NMEM_ENS', 'DO_JEDIATMVAR', 'DO_VRFY_OCEANDA', 'FHMAX_FITS', 'waveGRD',
'IAUFHRS', 'DO_FIT2OBS', 'NET', 'FHOUT_HF_GFS', 'FHMAX_HF_GFS', 'REPLAY_ICS',
'OFFSET_START_HOUR']
'OFFSET_START_HOUR', 'DO_OCNICE_COMPRESS']

archive_dict = AttrDict()
for key in keys:
Expand Down
9 changes: 8 additions & 1 deletion scripts/exglobal_cleanup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,16 @@ RDATE=$(date --utc +%Y%m%d%H -d "${PDY} ${cyc} -${FHMAX_GFS} hours")
if (( GDATE < RDATE )); then
RDATE=${GDATE}
fi
deletion_target="${ROTDIR}/${RUN}.${RDATE:0:8}"

# Delete ROTDIR data from current cycle
deletion_target="${ROTDIR}/${RUN}.${PDY:0:8}"
if [[ -d ${deletion_target} ]]; then rm -rf "${deletion_target}"; fi

# Delete ROTDIR data from previous cycle
PDYprevcyc=$(date --utc +%Y%m%d%H -d "${PDY} 00 -6 hours")
deletion_target_prevcyc="${ROTDIR}/${RUN}.${PDYprevcyc:0:8}"
if [[ -d ${deletion_target_prevcyc} ]]; then rm -rf "${deletion_target_prevcyc}"; fi

# sync and wait to avoid filesystem synchronization issues
sync && sleep 1

Expand Down
36 changes: 23 additions & 13 deletions scripts/exglobal_oceanice_products.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,41 @@ def main():
keys = ['HOMEgfs', 'DATA', 'current_cycle', 'RUN', 'NET',
f'COM_{oceanice.task_config.component.upper()}_HISTORY',
f'COM_{oceanice.task_config.component.upper()}_GRIB',
f'COM_{oceanice.task_config.component.upper()}_NETCDF',
'APRUN_OCNICEPOST',
'component', 'forecast_hour', 'valid_datetime', 'avg_period',
'model_grid', 'product_grids', 'oceanice_yaml']
'model_grid', 'product_grids', 'oceanice_yaml',
'DO_OCNICE_COMPRESS']

oceanice_dict = AttrDict()

for key in keys:
oceanice_dict[key] = oceanice.task_config[key]

# Initialize the DATA/ directory; copy static data
oceanice.initialize(oceanice_dict)
if oceanice.task_config.do_interp:

# Initialize the DATA/ directory; copy static data
oceanice.initialize(oceanice_dict)

for grid in oceanice_dict.product_grids:

for grid in oceanice_dict.product_grids:
logger.info(f"Processing {grid} grid")

logger.info(f"Processing {grid} grid")
# Configure DATA/ directory for execution; prepare namelist etc.
oceanice.configure(oceanice_dict, grid)

# Configure DATA/ directory for execution; prepare namelist etc.
oceanice.configure(oceanice_dict, grid)
# Run the oceanice post executable to interpolate and create grib2 files
oceanice.execute(oceanice_dict, grid)

# Run the oceanice post executable to interpolate and create grib2 files
oceanice.execute(oceanice_dict, grid)
# Subset raw model data to create netCDF products
oceanice.subset(oceanice_dict)

# Subset raw model data to create netCDF products
oceanice.subset(oceanice_dict)
# Copy processed output from execute and subset
oceanice.finalize(oceanice_dict)

# Copy processed output from execute and subset
oceanice.finalize(oceanice_dict)
# Compress ocean and ice data
if oceanice_dict.DO_OCNICE_COMPRESS:
oceanice.compress(oceanice_dict)


if __name__ == '__main__':
Expand Down
31 changes: 30 additions & 1 deletion ush/python/pygfs/task/oceanice_products.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
Task,
add_to_datetime, to_timedelta,
WorkflowException,
Executable)
Executable,
which)

logger = getLogger(__name__.split('.')[-1])

Expand Down Expand Up @@ -61,8 +62,10 @@ def __init__(self, config: Dict[str, Any]) -> None:
forecast_hour = self.task_config.FORECAST_HOUR
if self.task_config.COMPONENT == 'ice':
interval = self.task_config.FHOUT_ICE_GFS
do_interp = self.task_config.DO_ICE_INTERP
if self.task_config.COMPONENT == 'ocean':
interval = self.task_config.FHOUT_OCN_GFS
do_interp = self.task_config.DO_OCN_INTERP

# TODO: This is a bit of a hack, but it works for now
# FIXME: find a better way to provide the averaging period
Expand All @@ -76,6 +79,7 @@ def __init__(self, config: Dict[str, Any]) -> None:
'avg_period': avg_period,
'model_grid': model_grid,
'interval': interval,
'do_interp': do_interp,
'product_grids': self.VALID_PRODUCT_GRIDS[model_grid]}
)
self.task_config = AttrDict(**self.task_config, **localdict)
Expand Down Expand Up @@ -342,3 +346,28 @@ def finalize(config: Dict) -> None:

logger.info(f"Copy processed data to COM/ directory")
FileHandler(data_out).sync()

@staticmethod
@logit(logger)
def compress(config: Dict) -> None:
"""Perform compression at the end of the task.
Perform compression to individual files in COM/
Parameters
----------
config: Dict
Configuration dictionary for the task
Returns
-------
None
"""

logger.info(f"Compress processed data in COM/ directory")
gzip_cmd = which("/usr/bin/gzip")
gzip_cmd.add_default_arg("-kf")
if config.component == 'ocean':
interpfile = config.COM_OCEAN_NETCDF + f"/0p25/gefs.{config.component}.t00z.0p25.f{config.forecast_hour:03d}.nc"
if config.component == 'ice':
interpfile = config.COM_ICE_HISTORY + f"/gefs.{config.component}.t00z.24hr_avg.f{config.forecast_hour:03d}.nc"
gzip_cmd(interpfile)
4 changes: 3 additions & 1 deletion workflow/applications/gefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,13 @@ def get_task_names(self):
if self.do_ocean:
tasks += ['ocean_prod']

if self.do_ice:
tasks += ['ice_prod']

if self.do_wave:
tasks += ['wavepostsbs']
if self.do_wave_bnd:
tasks += ['wavepostbndpnt', 'wavepostbndpntbll']
tasks += ['wavepostpnt']

if self.do_extractvars:
tasks += ['extractvars', 'arch']
Expand Down
6 changes: 6 additions & 0 deletions workflow/rocoto/gefs_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,9 @@ def extractvars(self):
if self.app_config.do_ocean:
dep_dict = {'type': 'metatask', 'name': 'gefs_ocean_prod_#member#'}
deps.append(rocoto.add_dependency(dep_dict))
if self.app_config.do_ice:
dep_dict = {'type': 'metatask', 'name': 'gefs_ice_prod_#member#'}
deps.append(rocoto.add_dependency(dep_dict))
if self.app_config.do_atm:
dep_dict = {'type': 'metatask', 'name': 'gefs_atmos_prod_#member#'}
deps.append(rocoto.add_dependency(dep_dict))
Expand Down Expand Up @@ -574,6 +577,9 @@ def arch(self):
if self.app_config.do_ocean:
dep_dict = {'type': 'metatask', 'name': 'gefs_ocean_prod'}
deps.append(rocoto.add_dependency(dep_dict))
if self.app_config.do_ice:
dep_dict = {'type': 'metatask', 'name': 'gefs_ice_prod'}
deps.append(rocoto.add_dependency(dep_dict))
if self.app_config.do_wave:
dep_dict = {'type': 'metatask', 'name': 'gefs_wave_post_grid'}
deps.append(rocoto.add_dependency(dep_dict))
Expand Down

0 comments on commit e020fce

Please sign in to comment.