diff --git a/create_graphics.py b/create_graphics.py index bb93c91..32fa84d 100644 --- a/create_graphics.py +++ b/create_graphics.py @@ -15,6 +15,7 @@ import glob from multiprocessing import Pool, Process import os +import subprocess import sys import time import zipfile @@ -32,6 +33,11 @@ AIRPORTS = 'static/Airports_locs.txt' +COMBINED_FN = 'combined_{fhr:03d}.grib2' +TMP_FN = 'combined_{fhr:03d}.tmp.grib2' + +LOG_BREAK = f"{('-' * 80)}\n{('-' * 80)}" + def create_skewt(cla, fhr, grib_path, workdir): ''' Generate arguments for parallel processing of Skew T graphics, @@ -72,7 +78,6 @@ def create_maps(cla, fhr, gribfiles, workdir): with Pool(processes=cla.nprocs) as pool: pool.starmap(parallel_maps, args) - def create_zip(png_files, zipf): ''' Create a zip file. Use a locking mechanism -- write a lock file to disk. ''' @@ -108,29 +113,21 @@ def create_zip(png_files, zipf): # Wait before trying to obtain the lock on the file time.sleep(5) -def gather_gribfiles(cla, fhr, gribfiles): +def gather_gribfiles(cla, fhr, filename, gribfiles): ''' Returns the appropriate gribfiles object for the type of graphics being generated -- whether it's for a single forecast time or all forecast lead times. ''' - # We already checked that the current file exists and is old enough, so - # assume that the earlier ones are, too. - filenames = {'01fcst': [], 'free_fcst': []} - fcst_hours = [int(fhr)] - if cla.all_leads and gribfiles is None: - fcst_hours = list(range(int(fhr) + 1)) + fcst_hour = int(fhr) - for fcst_hour in fcst_hours: - filename = os.path.join(cla.data_root, - cla.file_tmpl.format(FCST_TIME=fcst_hour)) - first_fcst = 6 if cla.images[0] == 'global' else 1 - if fcst_hour <= first_fcst: - filenames['01fcst'].append(filename) - else: - filenames['free_fcst'].append(filename) + first_fcst = 6 if cla.images[0] == 'global' else 1 + if fcst_hour <= first_fcst: + filenames['01fcst'].append(filename) + else: + filenames['free_fcst'].append(filename) if gribfiles is None or not cla.all_leads: @@ -138,7 +135,7 @@ def gather_gribfiles(cla, fhr, gribfiles): # depending on command line argument flag gribfiles = gribfile.GribFiles( - coord_dims={'fcst_hr': fcst_hours}, + coord_dims={'fcst_hr': [fhr]}, filenames=filenames, filetype=cla.file_type, model=cla.images[0], @@ -241,7 +238,11 @@ def parse_args(): parser.add_argument( '-d', dest='data_root', - help='Cycle-independant data directory location.', + help='Cycle-independant data directory location. Provide more than one \ + data path if data input files should be combined. When providing \ + multiple options, the same number of options is required for the \ + --file_tmpl flag.', + nargs='+', required=True, type=utils.path_exists, ) @@ -306,7 +307,11 @@ def parse_args(): parser.add_argument( '--file_tmpl', default='wrfnat_hrconus_{FCST_TIME:02d}.grib2', - help='File naming convention', + nargs='+', + help='File naming convention. Use FCST_TIME to indicate forecast hour. \ + Provide more than one template when data files should be combined. \ + When providing multiple options, the same number of options is required \ + for the -d flag.', \ ) parser.add_argument( '--file_type', @@ -530,6 +535,198 @@ def parallel_skewt(cla, fhr, ds, site, workdir): ) plt.close() +def pre_proc_grib_files(cla, fhr): + + ''' Use the command line argument object (cla) to determine the grib file + loaction at a given forecast hour. If multiple data input paths and file + templates are provided by user, concatenate the files and remove the + duplicates. Return the file path of the file to be used by the graphics data + handler, and whether the file is old enough. Files making it through the + combined process here are assumed to be old enough. + + Input: + cla Program command line arguments in a Namespace datastructure + fhr Forecast hour; integer + + Output + grib_path path to data used in plotting + old_enough bool stating whether the file is old enough as defined by + user settings. Combined files here are presumed old enough + by default + ''' + + if len(cla.data_root) == 1 and len(cla.file_tmpl) == 1: + # Nothing to do, return the original file location + grib_path = os.path.join(cla.data_root[0], + cla.file_tmpl[0].format(FCST_TIME=fhr)) + + old_enough = utils.old_enough(cla.data_age, grib_path) if \ + os.path.exists(grib_path) else False + return grib_path, old_enough + + # Generate a list of files to be joined. + file_list = [os.path.join(*path).format(FCST_TIME=fhr) for path in + zip(cla.data_root, cla.file_tmpl)] + for file_path in file_list: + if not os.path.exists(file_path) \ + or not utils.old_enough(cla.data_age, file_path): + return file_path, False + + print(f'Combining input files: ') + for fn in file_list: + print(f' {fn}') + + combined_fp = os.path.join(cla.output_path, + COMBINED_FN.format(fhr=fhr)) + tmp_fp = os.path.join(cla.output_path, + TMP_FN.format(fhr=fhr)) + + cmd = f'cat {" ".join(file_list)} > {tmp_fp}' + output = subprocess.run(cmd, + capture_output=True, + check=True, + shell=True, + ) + if output.returncode != 0: + msg = f'{cmd} returned exit status: {output.returncode}!' + raise OSError(msg) + + # Gather all grib2 entries from combined file + cmd = f'wgrib2 {tmp_fp} -submsg 1' + output = subprocess.run(cmd, + capture_output=True, + check=True, + shell=True, + ) + wgrib2_list = output.stdout.decode("utf-8").split('\n') + + # Create a unique list of grib fields. + uniq_list = uniq_wgrib2_list(wgrib2_list) + + # Remove duplicate grib2 entries in grib file + cmd = f'wgrib2 -i {tmp_fp} -GRIB {combined_fp}' + input_arg = '\n'.join(uniq_list).encode("utf-8") + + output = subprocess.run(cmd, + capture_output=True, + check=True, + input=input_arg, + shell=True, + ) + if output.returncode != 0: + msg = f'{cmd} returned exit status: {output.returncode}' + raise OSError(msg) + os.remove(f'{tmp_fp}') + + return f'{combined_fp}', True + +def remove_accumulated_images(cla): + + ''' Searches for all images that correspond with specs that have the + accumulate entry set to True and removes them from the list of images to + create. ''' + + for variable, levels in cla.images[1].items(): + for level in levels: + spec = cla.specs.get(variable, {}).get(level) + if not spec: + msg = f'graphics: {variable} {level}' + raise errors.NoGraphicsDefinitionForVariable(msg) + accumulate = spec.get('accumulate', False) + + if accumulate: + print(f'Will not plot {variable}:{level}') + cla.images[1][variable].remove(level) + if not cla.images[1][variable]: + del cla.images[1][variable] + +def remove_proc_grib_files(cla): + + ''' Find all processed grib files produced by this script and remove them. + ''' + + # Prepare template with all viable forecast hours -- glob accepts * + combined_fn = COMBINED_FN.format(fhr=999).replace('999', '*') + combined_fp = os.path.join(cla.output_path, combined_fn) + + combined_files = glob.glob(combined_fp) + + print(f'Removing combined files: ') + for file_path in combined_files: + print(f' {file_path}') + os.remove(file_path) + +def stage_zip_files(tiles, zip_dir): + + ''' Stage the zip files in the appropriate directory for each tile to be + plotted. Return the dictionary of zipfile paths. + + Input: + + tiles list of subregions to plot from larger domain. becomes the + subdirectory under the zip_dir + zip_dir the top level zip file directory where files are expected to + show up + + Returns: + zipfiles dictionary of tile keys, and zip directory values. + + ''' + zipfiles = {} + for tile in tiles: + tile_zip_dir = os.path.join(zip_dir, tile) + tile_zip_file = os.path.join(tile_zip_dir, 'files.zip') + print(f"checking for {tile_zip_file}") + if os.path.isfile(tile_zip_file): + os.remove(tile_zip_file) + print(f"{tile_zip_file} found and removed") + os.makedirs(tile_zip_dir, exist_ok=True) + zipfiles[tile] = tile_zip_file + return zipfiles + +def uniq_wgrib2_list(inlist): + + ''' Given a list of wgrib2 output fields, returns a uniq list of fields for + simplifying a grib2 dataset. Uniqueness is defined by the wgrib output from + field 3 (colon delimted) onward, although the original full grib record must + be included in the wgrib2 command below. + ''' + + uniq_field_set = set() + uniq_list = [] + for infield in inlist: + infield_info = infield.split(':') + if len(infield_info) <= 3: + continue + infield_str = ':'.join(infield_info[3:]) + if infield_str not in uniq_field_set: + uniq_list.append(infield) + uniq_field_set.add(infield_str) + + return uniq_list + +def zip_pngs(fhr, workdir, zipfiles): + + ''' Spin up a subprocess to zip all the png files into the staged zip files. + + Input: + + fhr integer forecast hour + workdir path to the png files + zipfiles dictionary of tile keys, and zip directory values. + + Output: + None + ''' + + for tile, zipf in zipfiles.items(): + png_files = glob.glob(os.path.join(workdir, f'*_{tile}_*{fhr:02d}.png')) + zip_proc = Process(group=None, + target=create_zip, + args=(png_files, zipf), + ) + zip_proc.start() + zip_proc.join() @utils.timer def graphics_driver(cla): @@ -548,17 +745,8 @@ def graphics_driver(cla): # Create an empty zip file if cla.zip_dir: - zipfiles = {} tiles = cla.tiles if cla.graphic_type == "maps" else ['skewt'] - for tile in tiles: - tile_zip_dir = os.path.join(cla.zip_dir, tile) - tile_zip_file = os.path.join(tile_zip_dir, 'files.zip') - print(f"checking for {tile_zip_file}") - if os.path.isfile(tile_zip_file): - os.remove(tile_zip_file) - print(f"{tile_zip_file} found and removed") - os.makedirs(tile_zip_dir, exist_ok=True) - zipfiles[tile] = tile_zip_file + zipfiles = stage_zip_files(tiles, cla.zip_dir) fcst_hours = copy.deepcopy(cla.fcst_hour) @@ -567,21 +755,52 @@ def graphics_driver(cla): gribfiles = None + # When accummulating variables for preparing a single lead time, + # load all of those into gribfiles up front. + # This is not an operational feature. Exit if files don't exist. + + first_fcst = 6 if cla.images[0] == 'global' else 0 + fcst_inc = 6 if cla.images[0] == 'global' else 1 + if len(cla.fcst_hour) == 1 and cla.all_leads: + for fhr in range(first_fcst, int(cla.fcst_hour[0]), fcst_inc): + grib_path, old_enough = pre_proc_grib_files(cla, fhr) + if not os.path.exists(grib_path) or not old_enough: + msg = (f'File {grib_path} does not exist! Cannot accumulate', + f'data for this forecast lead time!') + remove_proc_grib_files(cla) + raise FileNotFoundError(' '.join(msg)) + gribfiles = gather_gribfiles(cla, fhr, grib_path, gribfiles) + + # Allow this task to run concurrently with UPP by continuing to check for # new files as they become available. while fcst_hours: timer_sleep = time.time() for fhr in sorted(fcst_hours): - grib_path = os.path.join(cla.data_root, - cla.file_tmpl.format(FCST_TIME=fhr)) + grib_path, old_enough = pre_proc_grib_files(cla, fhr) # UPP is most likely done writing if it hasn't written in data_age # mins (default is 3 to address most CONUS-sized domains) - if os.path.exists(grib_path) and utils.old_enough(cla.data_age, grib_path): + if os.path.exists(grib_path) and old_enough: fcst_hours.remove(fhr) else: - # Try next forecast hour - print(f'Cannot find {grib_path}') + if cla.all_leads: + # Wait on the missing file for an arbitrary 90% of wait time + if time.time() - timer_end > cla.wait_time * 60 * .9: + print(f"Giving up waiting on {grib_path}. \n", + f"Removing accumulated variables from image list \n", + f"{LOG_BREAK}\n") + remove_accumulated_images(cla) + # Explicitly set -all_leads to False + cla.all_leads = False + else: + # Break out of loop, wait for the desired period, and start + # back at this forecast hour. + print(f'Waiting for {grib_path} to be available.') + break + # It's safe to continue on processing the next forecast hour + print(f'Cannot find {grib_path}, continuing to check on \n \ + next forecast hour.') continue # Create the working directory @@ -589,17 +808,15 @@ def graphics_driver(cla): f"{utils.from_datetime(cla.start_time)}{fhr:02d}") os.makedirs(workdir, exist_ok=True) - print((('-' * 80)+'\n') * 2) - print() - print(f'Graphics will be created for input file: {grib_path}') - print(f'Output graphics directory: {workdir}') - print() - print((('-' * 80)+'\n') * 2) + print(f'{LOG_BREAK}\n', + f'Graphics will be created for input file: {grib_path}\n', + f'Output graphics directory: {workdir} \n' + f'{LOG_BREAK}') if cla.graphic_type == 'skewts': create_skewt(cla, fhr, grib_path, workdir) else: - gribfiles = gather_gribfiles(cla, fhr, gribfiles) + gribfiles = gather_gribfiles(cla, fhr, grib_path, gribfiles) create_maps(cla, fhr=fhr, gribfiles=gribfiles, @@ -608,14 +825,7 @@ def graphics_driver(cla): # Zip png files and remove the originals in a subprocess if cla.zip_dir: - for tile, zipf in zipfiles.items(): - png_files = glob.glob(os.path.join(workdir, f'*_{tile}_*{fhr:02d}.png')) - zip_proc = Process(group=None, - target=create_zip, - args=(png_files, zipf), - ) - zip_proc.start() - zip_proc.join() + zip_pngs(fhr, workdir, zipfiles) # Keep track of last time we did something useful timer_end = time.time() @@ -624,23 +834,39 @@ def graphics_driver(cla): # wait_time mins. This accounts for slower UPP processes. Default for # most CONUS-sized domains is 10 mins. if time.time() - timer_end > cla.wait_time * 60: - print(f"Exiting with forecast hours remaining: {fcst_hours}") - print((('-' * 80)+'\n') * 2) + print(f"Exiting with forecast hours remaining: {fcst_hours}", + f"{LOG_BREAK}") break # Wait for a bit if it's been < 2 minutes (about the length of time UPP # takes) since starting last loop if fcst_hours and time.time() - timer_sleep < 120: - print(f"Waiting for a minute before forecast hours: {fcst_hours}") - print((('-' * 80)+'\n') * 2) + print(f"Waiting for a minute for forecast hours: {fcst_hours}", + f"{LOG_BREAK}") time.sleep(60) + remove_proc_grib_files(cla) if __name__ == '__main__': CLARGS = parse_args() CLARGS.fcst_hour = utils.fhr_list(CLARGS.fcst_hour) + # Check that the same number of entries exists in -d and --file_tmpl + if len(CLARGS.data_root) != len(CLARGS.file_tmpl): + errmsg = 'Must specify the same number of arguments for -d and --file_tmpl' + print(errmsg) + raise argparse.ArgumentError + + # Ensure wgrib command is available in environment before getting too far + # down this path... + if len(CLARGS.data_root) > 1: + retcode = subprocess.run('which wgrib2', shell=True, check=True) + if retcode.returncode != 0: + errmsg = 'Could not find wgrib2, please make sure it is loaded \n \ + in your environment.' + raise OSError(errmsg) + # Only need to load the default in memory if we're making maps. if CLARGS.graphic_type == 'maps': CLARGS.specs = load_specs(CLARGS.specs) @@ -648,8 +874,8 @@ def graphics_driver(cla): CLARGS.images = load_images(CLARGS.images) CLARGS.tiles = generate_tile_list(CLARGS.tiles) - print(f"Running script for {CLARGS.graphic_type} with args: ") - print((('-' * 80)+'\n') * 2) + print(f"Running script for {CLARGS.graphic_type} with args: ", + f"{LOG_BREAK}") for name, val in CLARGS.__dict__.items(): if name not in ['specs', 'sites']: diff --git a/pre.sh b/pre.sh index edade3f..29bdfa5 100644 --- a/pre.sh +++ b/pre.sh @@ -6,4 +6,6 @@ module use -a /contrib/miniconda3/modulefiles module load miniconda3 conda activate pygraf +module load wgrib2 + module list