Skip to content

Commit

Permalink
update forecast regions loader
Browse files Browse the repository at this point in the history
  • Loading branch information
Dukestep committed Nov 7, 2023
1 parent 8b256fa commit af2a06a
Showing 1 changed file with 169 additions and 34 deletions.
203 changes: 169 additions & 34 deletions msc_pygeoapi/loader/forecast_polygons.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,20 @@
#
# =================================================================

from glob import fnmatch
import logging
import os
from pathlib import Path
import re
import requests

import click
from parse import parse
from osgeo import ogr

from msc_pygeoapi import cli_options
from msc_pygeoapi.connector.elasticsearch_ import ElasticsearchConnector
from msc_pygeoapi.env import MSC_PYGEOAPI_CACHEDIR
from msc_pygeoapi.loader.base import BaseLoader
from msc_pygeoapi.util import configure_es_connection

Expand Down Expand Up @@ -284,19 +288,106 @@
INDICES = [
INDEX_NAME.format(forecast_zone, detail_level)
for forecast_zone in FILE_PROPERTIES
for detail_level in ['coarse', 'detail']
for detail_level in ['hybrid', 'exag']
]

FILENAME_PATTERN = 'MSC_Geography_Pkg_V{major:d}_{minor:d}_{patch:d}_{zone}_{type}.zip' # noqa

SHAPEFILES_TO_LOAD = {
'MSC_Geography_Pkg_V6_4_0_Water_Unproj':
['water_MarStdZone_coarse_unproj.shp',
'water_MarStdZone_detail_unproj.shp'],
'MSC_Geography_Pkg_V6_4_0_Land_Unproj':
['land_CLCBaseZone_coarse_unproj.shp',
'land_CLCBaseZone_detail_unproj.shp']
'water': [
'water_MarStdZone_exag_unproj.shp',
'water_MarStdZone_hybrid_unproj.shp',
],
'land': [
'land_PubStdZone_exag_unproj.shp',
'land_PubStdZone_hybrid_unproj.shp',
],
}


def version_to_tuple(pattern, filename):
"""
Returns a tuple of the version number when given a filename and a
format string of the filename pattern that contains at minimum
containing named fields major, minor, patch.
:param pattern: `str` of filename pattern
:param filename: `str` of filename to parse
:return: `tuple` of version number
"""

# ccheck if pattern contains named fields major, minor, patch
if not all(
[field in pattern for field in ['{major:d}', '{minor:d}', '{patch:d}']]
):
raise ValueError(
'Pattern must contain named integer-typed fields major, minor, patch'
)

parsed_filename = parse(
pattern,
filename,
)

return (
parsed_filename.named['major'],
parsed_filename.named['minor'],
parsed_filename.named['patch'],
)


def download_metecode_data(directory=None):
"""
Downloads the latest meteocode data from DataMart and stores it in
MSC_PYGEOAPI_CACHEDIR
:param directory: `str` of directory to store meteocode data
:returns: `str` of directory where meteocode data is stored
"""

directory = Path(directory) if directory else Path(MSC_PYGEOAPI_CACHEDIR)
# download latest meteocode data from DataMart and store in MSC_PYGEOAPI_CACHEDIR
response = requests.get('https://dd.weather.gc.ca/meteocode/geodata/')
response.raise_for_status()

folders = re.findall(r'<a href=\"(version_.*)/".*</a>', response.text)
versions = [
version_to_tuple('version_{major:d}.{minor:d}.{patch:d}', folder)
for folder in folders
]
# create dictionary from latest version with keys major, minor, patch
latest_version_dict = dict(
zip(['major', 'minor', 'patch'], max(versions))
)
most_recent_folder = f'version_{latest_version_dict["major"]}.{latest_version_dict["minor"]}.{latest_version_dict["patch"]}' # noqa

# download latest version and store files in MSC_PYGEOAPI_CACHEDIR]
files_to_download = [
'MSC_Geography_Pkg_V{major:d}_{minor:d}_{patch:d}_Water_Unproj.zip',
'MSC_Geography_Pkg_V{major:d}_{minor:d}_{patch:d}_Land_Unproj.zip',
]
for filename in files_to_download:
filename_with_version = filename.format(**latest_version_dict)
filepath_ = directory / "meteocode" / f"{filename_with_version}"

# if file doesn't already exist in cache, download it
if filepath_.exists():
LOGGER.debug(
f'File already exists in cache: {filepath_}. Skipping download.'
)
else:
response = requests.get(
f'https://dd.weather.gc.ca/meteocode/geodata/{most_recent_folder}/{filename_with_version}'
)
response.raise_for_status()

# create output directory
filepath_.parent.mkdir(parents=True, exist_ok=True)

with open(filepath_, 'wb') as f:
f.write(response.content)

return True


class ForecastPolygonsLoader(BaseLoader):
"""Forecast polygons (land/water) loader"""

Expand Down Expand Up @@ -326,13 +417,11 @@ def parse_filename(self):
:return: `bool` of parse status
"""
# parse filepath
pattern = 'MSC_Geography_Pkg_V{version:w}_{zone}_{type}.zip'
filename = self.filepath.name
parsed_filename = parse(pattern, filename)
parsed_filename = parse(FILENAME_PATTERN, self.filepath.name)

# set class variables
self.version = parsed_filename.named['version'].replace('_', '.')
self.zone = parsed_filename.named['zone']
self.version = f'{parsed_filename.named["major"]}.{parsed_filename.named["minor"]}.{parsed_filename.named["patch"]}' # noqa
self.zone = parsed_filename.named['zone'].lower()

return True

Expand All @@ -346,14 +435,16 @@ def generate_geojson_features(self, shapefile_name):
:returns: Generator of Elasticsearch actions to upsert the forecast
polygons for given shapefile in zip archive
"""
filepath = str((self.filepath / self.filepath.stem /
shapefile_name).resolve())
filepath = str(
(self.filepath / shapefile_name).resolve()
)
data = ogr.Open(rf'/vsizip/{filepath}')
lyr = data.GetLayer()

for feature in lyr:
feature_json = feature.ExportToJson(as_object=True,
options=['RFC7946=YES'])
feature_json = feature.ExportToJson(
as_object=True, options=['RFC7946=YES']
)
feature_json['properties']['version'] = self.version

_id = feature_json['properties']['FEATURE_ID']
Expand All @@ -362,12 +453,12 @@ def generate_geojson_features(self, shapefile_name):

action = {
'_id': _id,
'_index': INDEX_NAME.format(self.zone.lower(),
shapefile_name.split('_')[2]
),
'_index': INDEX_NAME.format(
self.zone.lower(), shapefile_name.split('_')[2]
),
'_op_type': 'update',
'doc': feature_json,
'doc_as_upsert': True
'doc_as_upsert': True,
}

yield action
Expand All @@ -379,12 +470,18 @@ def load_data(self, filepath):
"""

self.filepath = Path(filepath)
parsed_filename = parse(
FILENAME_PATTERN,
self.filepath.name
)

# set class variables from filename
self.parse_filename()
LOGGER.debug(f'Received file {self.filepath}')

for shapefile in SHAPEFILES_TO_LOAD[self.filepath.stem]:
for shapefile in SHAPEFILES_TO_LOAD[
parsed_filename.named['zone'].lower()
]:
# generate geojson features
package = self.generate_geojson_features(shapefile)
self.conn.submit_elastic_package(package, request_size=80000)
Expand All @@ -410,7 +507,20 @@ def add(ctx, file_, directory, es, username, password, ignore_certs):
"""add data to system"""

if all([file_ is None, directory is None]):
raise click.ClickException('Missing --file/-f or --dir/-d option')
# ask user if they want to retrieve file via HTTP
if click.confirm(
'No file or directory specified. Do you want to retrieve the'
' latest forecast polygons file via the MSC DataMart?',
abort=True,
):
# download latest meteocode data from DataMart and store in MSC_PYGEOAPI_CACHEDIR
try:
download_metecode_data(directory=MSC_PYGEOAPI_CACHEDIR)
directory = os.path.join(MSC_PYGEOAPI_CACHEDIR, 'meteocode')
except requests.exceptions.HTTPError as err:
click.echo(
'Could not retrieve latest forecast polygons files from MSC DataMart.'
)

conn_config = configure_es_connection(es, username, password, ignore_certs)

Expand All @@ -419,10 +529,30 @@ def add(ctx, file_, directory, es, username, password, ignore_certs):
if file_ is not None:
files_to_process = [file_]
elif directory is not None:
for root, dirs, files in os.walk(directory):
for f in [file for file in files if file.endswith('.zip')]:
files_to_process.append(os.path.join(root, f))
files_to_process.sort(key=os.path.getmtime)
def get_latest_file(directory, pattern):
"""
Returns the file corresponding to the latest release
in the directory that matches the pattern
:param directory: `str` of directory to search
:param pattern: `str` of filename pattern to match
:returns: `str` of filename of latest release
"""
files = [
file
for file in os.listdir(directory)
if fnmatch.fnmatch(file, pattern)
]

most_recent_release_file = max(
files, key=lambda file: version_to_tuple(FILENAME_PATTERN, file)
)

return most_recent_release_file

for zone in ['Land', 'Water']:
pattern = f'MSC_Geography_Pkg_V*_{zone}_Unproj.zip'
latest_file = get_latest_file(directory, pattern)
files_to_process.append(os.path.join(directory, latest_file))

for file_to_process in files_to_process:
loader = ForecastPolygonsLoader(conn_config)
Expand Down Expand Up @@ -451,19 +581,24 @@ def delete_indexes(ctx, index_name, es, username, password, ignore_certs):

if index_name:
if click.confirm(
'Are you sure you want to delete ES index named: {}?'.format(
click.style(index_name, fg='red')), abort=True):
'Are you sure you want to delete ES index named: {}?'.format(
click.style(index_name, fg='red')
),
abort=True
):
LOGGER.info(f'Deleting ES index {index_name}')
conn.delete(index_name)
return True
else:
if click.confirm(
'Are you sure you want to delete {} forecast polygon'
' indices ({})?'.format(click.style('ALL', fg='red'),
click.style(", ".join(INDICES),
fg='red')),
abort=True):
conn.delete(",".join(INDICES))
'Are you sure you want to delete {} forecast polygon'
' indices ({})?'.format(
click.style('ALL', fg='red'),
click.style(', '.join(INDICES), fg='red'),
),
abort=True
):
conn.delete(','.join(INDICES))
return True


Expand Down

0 comments on commit af2a06a

Please sign in to comment.