diff --git a/msc_pygeoapi/loader/forecast_polygons.py b/msc_pygeoapi/loader/forecast_polygons.py index d3e6741c..b0033f1b 100644 --- a/msc_pygeoapi/loader/forecast_polygons.py +++ b/msc_pygeoapi/loader/forecast_polygons.py @@ -28,9 +28,12 @@ # # ================================================================= +from glob import fnmatch import logging import os from pathlib import Path +import re +import requests import click from parse import parse @@ -38,6 +41,7 @@ from msc_pygeoapi import cli_options from msc_pygeoapi.connector.elasticsearch_ import ElasticsearchConnector +from msc_pygeoapi.env import MSC_PYGEOAPI_CACHEDIR from msc_pygeoapi.loader.base import BaseLoader from msc_pygeoapi.util import configure_es_connection @@ -47,209 +51,96 @@ INDEX_NAME = 'forecast_polygons_{}_{}' FILE_PROPERTIES = { - 'water': { - 'F_MARSTDZA': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, - 'POLY_ID': { - 'type': 'integer', - }, - 'PRIME_ID': { - 'type': 'integer', - }, - 'CLC': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, - 'FEATURE_ID': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, - 'NAME': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'}, - 'normalize': {'type': 'keyword', - 'normalizer': 'name_normalizer'} - } - }, - 'NOM': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'}, - 'normalize': {'type': 'keyword', - 'normalizer': 'name_normalizer'} - } - }, - 'PERIM_KM': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, - 'AREA_KM2': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, - 'LAT_DD': { - 'type': 'float' - }, - 'LON_DD': { - 'type': 'float' - }, - 'KIND': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, - 'USAGE': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, - 'DEPICTN': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, - 'PROVINCE_C': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, - 'COUNTRY_C': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, - 'WATRBODY_C': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, - 'version': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, + 'OBJECT_ID': { + 'type': 'integer', }, - 'land': { - 'F_CLCBZA': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, - 'POLY_ID': { - 'type': 'integer', - }, - 'PRIME_ID': { - 'type': 'integer', - }, - 'CLC': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, - 'FEATURE_ID': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, - 'NAME': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'}, - 'normalize': {'type': 'keyword', - 'normalizer': 'name_normalizer'} - } - }, - 'NOM': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'}, - 'normalize': {'type': 'keyword', - 'normalizer': 'name_normalizer'} - } - }, - 'PERIM_KM': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, - 'AREA_KM2': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, - 'LAT_DD': { - 'type': 'float' - }, - 'LON_DD': { - 'type': 'float' - }, - 'KIND': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, - 'USAGE': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, - 'DEPICTN': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, - 'PROVINCE_C': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, - 'COUNTRY_C': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, - 'WATRBODY_C': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, - 'version': { - 'type': 'text', - 'fields': { - 'raw': {'type': 'keyword'} - } - }, + 'CLC': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'FEATURE_ID': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'NAME': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'}, + 'normalize': {'type': 'keyword', + 'normalizer': 'name_normalizer'} + } + }, + 'NOM': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'}, + 'normalize': {'type': 'keyword', + 'normalizer': 'name_normalizer'} + } + }, + 'PERIM_KM': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'AREA_KM2': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'LAT_DD': { + 'type': 'float' + }, + 'LON_DD': { + 'type': 'float' + }, + 'KIND': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'USAGE': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'DEPICTN': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'PROVINCE_C': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'COUNTRY_C': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'WATRBODY_C': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'version': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } } } @@ -275,7 +166,7 @@ 'type': 'geo_shape' }, 'properties': { - 'properties': None + 'properties': FILE_PROPERTIES } } } @@ -283,20 +174,110 @@ INDICES = [ INDEX_NAME.format(forecast_zone, detail_level) - for forecast_zone in FILE_PROPERTIES - for detail_level in ['coarse', 'detail'] + for forecast_zone in ['land', 'water'] + for detail_level in ['hybrid', 'exag'] ] +FILENAME_PATTERN = 'MSC_Geography_Pkg_V{major:d}_{minor:d}_{patch:d}_{zone}_{type}.zip' # noqa + SHAPEFILES_TO_LOAD = { - 'MSC_Geography_Pkg_V6_4_0_Water_Unproj': - ['water_MarStdZone_coarse_unproj.shp', - 'water_MarStdZone_detail_unproj.shp'], - 'MSC_Geography_Pkg_V6_4_0_Land_Unproj': - ['land_CLCBaseZone_coarse_unproj.shp', - 'land_CLCBaseZone_detail_unproj.shp'] + 'water': [ + 'water_MarStdZone_exag_unproj.shp', + 'water_MarStdZone_hybrid_unproj.shp', + ], + 'land': [ + 'land_PubStdZone_exag_unproj.shp', + 'land_PubStdZone_hybrid_unproj.shp', + ], } +def version_to_tuple(pattern, filename): + """ + Returns a tuple of the version number when given a filename and a + format string of the filename pattern that contains at minimum + containing named fields major, minor, patch. + :param pattern: `str` of filename pattern + :param filename: `str` of filename to parse + :return: `tuple` of version number + """ + + # ccheck if pattern contains named fields major, minor, patch + if not all( + [field in pattern for field in ['{major:d}', '{minor:d}', '{patch:d}']] + ): + raise ValueError( + 'Pattern must contain named integer-typed' + ' fields major, minor, and patch.' + ) + + parsed_filename = parse( + pattern, + filename, + ) + + return ( + parsed_filename.named['major'], + parsed_filename.named['minor'], + parsed_filename.named['patch'], + ) + + +def download_metecode_data(directory=None): + """ + Downloads the latest meteocode data from DataMart and stores it in + MSC_PYGEOAPI_CACHEDIR + :param directory: `str` of directory to store meteocode data + :returns: `str` of directory where meteocode data is stored + """ + + directory = Path(directory) if directory else Path(MSC_PYGEOAPI_CACHEDIR) + # download and store in MSC_PYGEOAPI_CACHEDIR + response = requests.get('https://dd.weather.gc.ca/meteocode/geodata/') + response.raise_for_status() + + folders = re.findall(r'', response.text) + versions = [ + version_to_tuple('version_{major:d}.{minor:d}.{patch:d}', folder) + for folder in folders + ] + # create dictionary from latest version with keys major, minor, patch + latest_version_dict = dict( + zip(['major', 'minor', 'patch'], max(versions)) + ) + most_recent_folder = f'version_{latest_version_dict["major"]}.{latest_version_dict["minor"]}.{latest_version_dict["patch"]}' # noqa + + # download latest version and store files in MSC_PYGEOAPI_CACHEDIR] + files_to_download = [ + 'MSC_Geography_Pkg_V{major:d}_{minor:d}_{patch:d}_Water_Unproj.zip', + 'MSC_Geography_Pkg_V{major:d}_{minor:d}_{patch:d}_Land_Unproj.zip', + ] + for filename in files_to_download: + filename_with_version = filename.format(**latest_version_dict) + filepath_ = directory / "meteocode" / f"{filename_with_version}" + + # if file doesn't already exist in cache, download it + if filepath_.exists(): + LOGGER.debug( + f'File already exists in cache: {filepath_}.' + ' Skipping download.' + ) + else: + response = requests.get( + 'https://dd.weather.gc.ca/meteocode/geodata/' + f'{most_recent_folder}/{filename_with_version}' + ) + response.raise_for_status() + + # create output directory + filepath_.parent.mkdir(parents=True, exist_ok=True) + + with open(filepath_, 'wb') as f: + f.write(response.content) + + return True + + class ForecastPolygonsLoader(BaseLoader): """Forecast polygons (land/water) loader""" @@ -313,10 +294,6 @@ def __init__(self, conn_config={}): # create forecast polygon indices if they don't exist for index in INDICES: - zone = index.split('_')[2] - SETTINGS['mappings']['properties']['properties'][ - 'properties' - ] = FILE_PROPERTIES[zone] self.conn.create(index, SETTINGS) def parse_filename(self): @@ -326,13 +303,11 @@ def parse_filename(self): :return: `bool` of parse status """ # parse filepath - pattern = 'MSC_Geography_Pkg_V{version:w}_{zone}_{type}.zip' - filename = self.filepath.name - parsed_filename = parse(pattern, filename) + parsed_filename = parse(FILENAME_PATTERN, self.filepath.name) # set class variables - self.version = parsed_filename.named['version'].replace('_', '.') - self.zone = parsed_filename.named['zone'] + self.version = f'{parsed_filename.named["major"]}.{parsed_filename.named["minor"]}.{parsed_filename.named["patch"]}' # noqa + self.zone = parsed_filename.named['zone'].lower() return True @@ -346,15 +321,19 @@ def generate_geojson_features(self, shapefile_name): :returns: Generator of Elasticsearch actions to upsert the forecast polygons for given shapefile in zip archive """ - filepath = str((self.filepath / self.filepath.stem / - shapefile_name).resolve()) + filepath = str( + (self.filepath / shapefile_name).resolve() + ) data = ogr.Open(rf'/vsizip/{filepath}') lyr = data.GetLayer() for feature in lyr: - feature_json = feature.ExportToJson(as_object=True, - options=['RFC7946=YES']) + feature_json = feature.ExportToJson( + as_object=True, + options=['RFC7946=YES'] + ) feature_json['properties']['version'] = self.version + feature_json['id'] = feature_json['properties']['FEATURE_ID'] _id = feature_json['properties']['FEATURE_ID'] @@ -362,12 +341,12 @@ def generate_geojson_features(self, shapefile_name): action = { '_id': _id, - '_index': INDEX_NAME.format(self.zone.lower(), - shapefile_name.split('_')[2] - ), + '_index': INDEX_NAME.format( + self.zone.lower(), shapefile_name.split('_')[2] + ), '_op_type': 'update', 'doc': feature_json, - 'doc_as_upsert': True + 'doc_as_upsert': True, } yield action @@ -379,12 +358,15 @@ def load_data(self, filepath): """ self.filepath = Path(filepath) + parsed_filename = parse(FILENAME_PATTERN, self.filepath.name) # set class variables from filename self.parse_filename() LOGGER.debug(f'Received file {self.filepath}') - for shapefile in SHAPEFILES_TO_LOAD[self.filepath.stem]: + for shapefile in SHAPEFILES_TO_LOAD[ + parsed_filename.named['zone'].lower() + ]: # generate geojson features package = self.generate_geojson_features(shapefile) self.conn.submit_elastic_package(package, request_size=80000) @@ -410,7 +392,20 @@ def add(ctx, file_, directory, es, username, password, ignore_certs): """add data to system""" if all([file_ is None, directory is None]): - raise click.ClickException('Missing --file/-f or --dir/-d option') + # ask user if they want to retrieve file via HTTP + if click.confirm( + 'No file or directory specified. Do you want to retrieve the' + ' latest forecast polygons file via the MSC DataMart?', + abort=True, + ): + try: + download_metecode_data(directory=MSC_PYGEOAPI_CACHEDIR) + directory = os.path.join(MSC_PYGEOAPI_CACHEDIR, 'meteocode') + except requests.exceptions.HTTPError as err: + click.echo( + 'Could not retrieve latest forecast polygons files ' + 'from MSC DataMart.' + ) conn_config = configure_es_connection(es, username, password, ignore_certs) @@ -419,10 +414,32 @@ def add(ctx, file_, directory, es, username, password, ignore_certs): if file_ is not None: files_to_process = [file_] elif directory is not None: - for root, dirs, files in os.walk(directory): - for f in [file for file in files if file.endswith('.zip')]: - files_to_process.append(os.path.join(root, f)) - files_to_process.sort(key=os.path.getmtime) + + def get_latest_file(directory, pattern): + """ + Returns the file corresponding to the latest release + in the directory that matches the pattern + :param directory: `str` of directory to search + :param pattern: `str` of filename pattern to match + :returns: `str` of filename of latest release + """ + files = [ + file + for file in os.listdir(directory) + if fnmatch.fnmatch(file, pattern) + ] + + most_recent_release_file = max( + files, + key=lambda file: version_to_tuple(FILENAME_PATTERN, file), + ) + + return most_recent_release_file + + for zone in ['Land', 'Water']: + pattern = f'MSC_Geography_Pkg_V*_{zone}_Unproj.zip' + latest_file = get_latest_file(directory, pattern) + files_to_process.append(os.path.join(directory, latest_file)) for file_to_process in files_to_process: loader = ForecastPolygonsLoader(conn_config) @@ -451,19 +468,24 @@ def delete_indexes(ctx, index_name, es, username, password, ignore_certs): if index_name: if click.confirm( - 'Are you sure you want to delete ES index named: {}?'.format( - click.style(index_name, fg='red')), abort=True): + 'Are you sure you want to delete ES index named: {}?'.format( + click.style(index_name, fg='red') + ), + abort=True + ): LOGGER.info(f'Deleting ES index {index_name}') conn.delete(index_name) return True else: if click.confirm( - 'Are you sure you want to delete {} forecast polygon' - ' indices ({})?'.format(click.style('ALL', fg='red'), - click.style(", ".join(INDICES), - fg='red')), - abort=True): - conn.delete(",".join(INDICES)) + 'Are you sure you want to delete {} forecast polygon' + ' indices ({})?'.format( + click.style('ALL', fg='red'), + click.style(', '.join(INDICES), fg='red'), + ), + abort=True + ): + conn.delete(','.join(INDICES)) return True