Skip to content

Commit

Permalink
Add an archive task to GEFS system to archive files locally (NOAA-EMC…
Browse files Browse the repository at this point in the history
…#2816)

- This task is an extension of the empty arch job previously merged. 
- This feature adds an archive task to GEFS system to archive files
locally.
- This feature archives files in ensstat directory. 

Resolves NOAA-EMC#2698
Refs NOAA-EMC#832 NOAA-EMC#2772
  • Loading branch information
AntonMFernando-NOAA committed Sep 7, 2024
1 parent ac93a9b commit 2816c3b
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 14 deletions.
3 changes: 0 additions & 3 deletions jobs/rocoto/arch_test.sh

This file was deleted.

38 changes: 38 additions & 0 deletions parm/archive/gefs_arcdir.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{% set cycle_HH = current_cycle | strftime("%H") %}
{% set cycle_YMDH = current_cycle | to_YMDH %}
{% set cycle_YMD = current_cycle | to_YMD %}
{% set head = RUN + ".t" + cycle_HH + "z." %}

# Declare the GEFS_ARCH where atmos data will be sent
{% set GEFS_ARCH = ROTDIR ~ "/gefsarch" %}

{% set file_set = [] %}

{% set tmpl_dict = ({ '${ROTDIR}':ROTDIR,
'${RUN}':RUN,
'${YMD}':cycle_YMD,
'${HH}':cycle_HH,
'${GRID}': '1p00',
'${MEMDIR}': 'ensstat' }) %}

{% set COMIN_ATMOS_ENSSTAT_1p00 = COM_ATMOS_GRIB_GRID_TMPL | replace_tmpl(tmpl_dict) %}

# Select ensstat files to copy to the arcdir
{% if RUN == "gefs" %}
{% set ensstat_files = [] %}
{% if path_exists(COMIN_ATMOS_ENSSTAT_1p00) %}
{% for fhr in range(FHMIN_GFS, FHMAX_GFS + FHOUT_GFS, FHOUT_GFS) %}
{% do ensstat_files.append([COMIN_ATMOS_ENSSTAT_1p00 ~ "/" ~ head ~ "mean.pres_." ~
"1p00" ~ ".f" ~ '%03d'|format(fhr) ~ ".grib2",
GEFS_ARCH]) %}
{% endfor %}
{% endif %}
{% endif %}
{% set file_set = ensstat_files %}
# Actually write the yaml
mkdir:
- "{{ GEFS_ARCH }}"
copy:
{% for source_dest_pair in file_set %}
- {{ source_dest_pair }}
{% endfor %}
File renamed without changes.
2 changes: 2 additions & 0 deletions parm/config/gefs/yaml/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ base:
FCST_BREAKPOINTS: "48"
REPLAY_ICS: "NO"
USE_OCN_PERTURB_FILES: "false"
HPSSARCH: "NO"
LOCALARCH: "NO"
6 changes: 4 additions & 2 deletions scripts/exgdas_enkf_earc.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ def main():
'DOHYBVAR', 'DOIAU_ENKF', 'IAU_OFFSET', 'DOIAU',
'DO_CALC_INCREMENT', 'assim_freq', 'ARCH_CYC',
'ARCH_WARMICFREQ', 'ARCH_FCSTICFREQ',
'IAUFHRS_ENKF']
'IAUFHRS_ENKF', 'NET']

archive_dict = AttrDict()
for key in keys:
archive_dict[key] = archive.task_config[key]
archive_dict[key] = archive.task_config.get(key)
if archive_dict[key] is None:
print(f"Warning: key ({key}) not found in task_config!")

# Also import all COMIN* directory and template variables
for key in archive.task_config.keys():
Expand Down
12 changes: 8 additions & 4 deletions scripts/exglobal_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@ def main():
'restart_interval_gdas', 'restart_interval_gfs',
'AERO_ANL_RUN', 'AERO_FCST_RUN', 'DOIBP_WAV', 'DO_JEDIOCNVAR',
'NMEM_ENS', 'DO_JEDIATMVAR', 'DO_VRFY_OCEANDA', 'FHMAX_FITS',
'IAUFHRS', 'DO_FIT2OBS']
'IAUFHRS', 'DO_FIT2OBS', 'NET']

archive_dict = AttrDict()
for key in keys:
archive_dict[key] = archive.task_config[key]
archive_dict[key] = archive.task_config.get(key)
if archive_dict[key] is None:
print(f"Warning: key ({key}) not found in task_config!")

# Also import all COMIN* and COMOUT* directory and template variables
for key in archive.task_config.keys():
if key.startswith("COMIN_") or key.startswith("COMOUT_"):
archive_dict[key] = archive.task_config[key]
if key.startswith("COM_") or key.startswith("COMIN_") or key.startswith("COMOUT_"):
archive_dict[key] = archive.task_config.get(key)
if archive_dict[key] is None:
print(f"Warning: key ({key}) not found in task_config!")

cwd = os.getcwd()

Expand Down
8 changes: 4 additions & 4 deletions ush/python/pygfs/task/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ def configure(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str
if not os.path.isdir(arch_dict.ROTDIR):
raise FileNotFoundError(f"FATAL ERROR: The ROTDIR ({arch_dict.ROTDIR}) does not exist!")

if arch_dict.RUN == "gefs":
raise NotImplementedError("FATAL ERROR: Archiving is not yet set up for GEFS runs")

if arch_dict.RUN in ["gdas", "gfs"]:

# Copy the cyclone track files and rename the experiments
Expand All @@ -75,7 +72,7 @@ def configure(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str
archive_parm = os.path.join(arch_dict.PARMgfs, "archive")

# Collect the dataset to archive locally
arcdir_j2yaml = os.path.join(archive_parm, "arcdir.yaml.j2")
arcdir_j2yaml = os.path.join(archive_parm, f"{arch_dict.NET}_arcdir.yaml.j2")

# Add the glob.glob function for capturing log filenames
# TODO remove this kludge once log filenames are explicit
Expand Down Expand Up @@ -117,6 +114,9 @@ def configure(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str
self.tar_cmd = ""
return arcdir_set, []

if arch_dict.NET == "gefs":
raise NotImplementedError("GEFS archiving is not yet implemented!")

master_yaml = "master_" + arch_dict.RUN + ".yaml.j2"

parsed_sets = parse_j2yaml(os.path.join(archive_parm, master_yaml),
Expand Down
2 changes: 1 addition & 1 deletion workflow/rocoto/gefs_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ def arch(self):
'envars': self.envars,
'cycledef': 'gefs',
'dependency': dependencies,
'command': f'{self.HOMEgfs}/jobs/rocoto/arch_test.sh',
'command': f'{self.HOMEgfs}/jobs/rocoto/arch.sh',
'job_name': f'{self.pslot}_{task_name}_@H',
'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log',
'maxtries': '&MAXTRIES;'
Expand Down

0 comments on commit 2816c3b

Please sign in to comment.