diff --git a/msc_pygeoapi/loader/citypageweather_realtime.py b/msc_pygeoapi/loader/citypageweather_realtime.py index 5337d7e8..bb91417e 100644 --- a/msc_pygeoapi/loader/citypageweather_realtime.py +++ b/msc_pygeoapi/loader/citypageweather_realtime.py @@ -35,12 +35,18 @@ import logging from lxml import etree import os +from pathlib import Path from msc_pygeoapi import cli_options from msc_pygeoapi.connector.elasticsearch_ import ElasticsearchConnector from msc_pygeoapi.env import MSC_PYGEOAPI_BASEPATH from msc_pygeoapi.loader.base import BaseLoader -from msc_pygeoapi.util import configure_es_connection +from msc_pygeoapi.util import ( + DATETIME_RFC3339_FMT, + configure_es_connection, + _get_element, + safe_cast_to_number, +) LOGGER = logging.getLogger(__name__) @@ -48,7 +54,7 @@ DAYS_TO_KEEP = 30 # Index settings -INDEX_NAME = 'current_conditions' +INDEX_NAME = 'citypageweather_realtime' NATIONAL_CITIES = [ 'Calgary', @@ -57,10 +63,10 @@ 'Fredericton', 'Halifax', 'Iqaluit', - u'Montréal', - u'Ottawa (Kanata - Orléans)', + 'Montréal', + 'Ottawa (Kanata - Orléans)', 'Prince George', - u'Québec', + 'Québec', 'Regina', 'Saskatoon', 'St. John\'s', @@ -73,169 +79,1006 @@ 'Yellowknife', ] -SETTINGS = { - 'settings': { - 'number_of_shards': 1, - 'number_of_replicas': 0 - }, - 'mappings': { - 'properties': { - 'geometry': { - 'type': 'geo_shape' + +CPW_PROPERTIES = { + 'properties': { + 'identifier': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'name': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': {'type': 'text', 'fields': {'raw': {'type': 'keyword'}}}, }, + }, + 'url': { + 'type': 'object', 'properties': { - 'properties': { - 'identifier': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } - }, - 'name': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } - }, - 'nom': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } - }, - 'station_en': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } - }, - 'stations_fr': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } - }, - 'icon': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } - }, - 'cond_en': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } - }, - 'cond_fr': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } - }, - 'temp': { - "type": "float" - }, - 'dewpoint': { - "type": "float" - }, - 'windchill': { - "type": "integer" - }, - 'pres_en': { - "type": "float" + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': {'type': 'text', 'fields': {'raw': {'type': 'keyword'}}}, + }, + }, + 'currentConditions': { + 'type': 'object', + 'properties': { + 'icon': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'timestamp': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'date', + 'format': 'date_time_no_millis', + }, + 'fr': { + 'type': 'date', + 'format': 'date_time_no_millis', + }, }, - 'pres_fr': { - "type": "float" + }, + 'relativeHumidity': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'float', + }, + 'fr': { + 'type': 'float', + }, + }, + }, }, - 'prestnd_en': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } + }, + 'wind': { + 'type': 'object', + 'properties': { + 'speed': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'integer', + }, + 'fr': { + 'type': 'integer', + }, + }, + }, + }, + }, + 'gust': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'integer', + }, + 'fr': { + 'type': 'integer', + }, + }, + }, + }, + }, + 'direction': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'bearing': { + 'type': 'object', + 'properties': { + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'float', + }, + 'fr': { + 'type': 'float', + }, + }, + }, + }, + }, }, - 'prestnd_fr': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } + }, + 'temperature': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'float', + }, + 'fr': { + 'type': 'float', + }, + }, + }, }, - 'rel_hum': { - "type": "integer" + }, + 'dewpoint': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'float', + }, + 'fr': { + 'type': 'float', + }, + }, + }, }, - 'speed': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } + }, + 'windChill': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'float', + }, + 'fr': { + 'type': 'float', + }, + }, + }, }, - 'gust': { - "type": "integer" + }, + 'station': { + 'type': 'object', + 'properties': { + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'lat': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'lon': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'code': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, }, - 'direction': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } + }, + 'condition': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, }, - 'bearing': { - "type": "float" + }, + 'pressure': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'float', + }, + 'fr': { + 'type': 'float', + }, + }, + }, }, - 'timestamp': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } + }, + }, + }, + 'forecastGroup': { + 'type': 'object', + 'properties': { + 'timestamp': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, }, - 'url_en': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } + }, + 'regionalNormals': { + 'type': 'object', + 'properties': { + 'textSummary': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'temperature': { + 'type': 'nested', + 'properties': { + 'class': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'integer', + }, + 'fr': { + 'type': 'integer', + }, + }, + }, + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + }, + }, }, - 'url_fr': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } + }, + 'forecasts': { + 'type': 'nested', + 'properties': { + 'period': { + 'type': 'object', + 'properties': { + 'textForecastName': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + }, + }, + 'textSummary': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'textForecast_name': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'cloud_precip': { + 'type': 'text', + }, + 'abbreviated_forecast': { + 'type': 'object', + 'properties': { + 'icon': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'pop': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'text_summary': { + 'type': 'text', + }, + }, + }, + 'temperatures': { + 'type': 'object', + 'properties': { + 'text_summary': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'temp_high': { + 'type': 'integer', + }, + 'temp_low': { + 'type': 'integer', + }, + }, + }, + 'winds': { + 'type': 'object', + 'properties': { + 'text_summary': { + 'type': 'text', + }, + 'periods': { + 'type': 'object', + 'properties': { + 'index': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + }, + }, + 'rank': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + }, + }, + 'speed': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' # noqa + } + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' # noqa + } + }, + }, + }, + }, + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' # noqa + } + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' # noqa + } + }, + }, + }, + }, + }, + }, + 'gust': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' # noqa + } + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' # noqa + } + }, + }, + }, + }, + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {} # noqa + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' # noqa + } + }, + }, + }, + }, + }, + }, + 'direction': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + }, + }, + 'bearing': { + 'type': 'object', + 'properties': { + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' # noqa + } + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' # noqa + } + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + 'precipitation': { + 'type': 'object', + 'properties': { + 'textSummary': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'precip_periods': { + 'type': 'object', + 'properties': { + 'start': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'end': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'precip_type': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + }, + }, + 'windChill': { + 'type': 'object', + }, + 'uv': { + 'type': 'object', + 'properties': { + 'category': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'text_summary': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'index': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + }, + }, + 'rel_hum': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'humidex': { + 'type': 'object', + }, }, - 'national': { - 'type': 'integer', - } - } - } + }, + }, + }, + # TODO: Add yesterdayConditions mapping + }, +} + +SETTINGS = { + 'settings': {'number_of_shards': 1, 'number_of_replicas': 0}, + 'mappings': { + 'properties': { + 'geometry': {'type': 'geo_shape'}, + 'properties': CPW_PROPERTIES, } - } + }, } @@ -249,21 +1092,92 @@ def __init__(self, conn_config={}): self.conn = ElasticsearchConnector(conn_config) self.conn.create(INDEX_NAME, mapping=SETTINGS) + self.xml_root = None + self.lang = None + self.filepath_en = None + self.filepath_fr = None + self.wxo_lookup = None + self.sitecode = None + self.citycode = None + self.cpw_feature = { + 'type': "Feature", + 'properties': { + 'lastUpdated': datetime.now().strftime(DATETIME_RFC3339_FMT), + }, + } def load_data(self, filepath): """ fonction from base to load the data in ES - :param filepath: filepath for parsing the current condition file + :param filepath: filepath for parsing the CPW file :returns: True/False """ - with open(os.path.join(MSC_PYGEOAPI_BASEPATH, - 'resources/wxo_lookup.json')) as json_file: - wxo_lookup = json.load(json_file) + # set filepath for associated language XML + if filepath.endswith('_e.xml'): + self.filepath_en = Path(filepath) + self.filepath_fr = Path(filepath.replace('_e.xml', '_f.xml')) + elif filepath.endswith('_f.xml'): + self.filepath_fr = Path(filepath) + self.filepath_en = Path(filepath.replace('_f.xml', '_e.xml')) + + LOGGER.debug( + f'Processing XML: {self.filepath_en} and {self.filepath_fr}' + ) + + # load wxo_lookup + with open( + os.path.join(MSC_PYGEOAPI_BASEPATH, 'resources/wxo_lookup.json') + ) as json_file: + self.wxo_lookup = json.load(json_file) - data = self.xml2json_cpw(wxo_lookup, filepath) + try: + self.xml_roots = { + 'en': etree.parse(self.filepath_en).getroot(), + 'fr': etree.parse(self.filepath_fr).getroot(), + } + except Exception as err: + LOGGER.error(f'ERROR: cannot process data: {err}') + return None + + xml_creation_dates = [ + datetime.strptime( + self.xml_roots[key].find('dateTime/timeStamp').text, + '%Y%m%d%H%M%S', + ) + for key in self.xml_roots + ] + + # if difference between xmlCreation_en and xmlCreation_fr is more + # than 30 seconds, skip + xml_creation_diff_seconds = abs( + (xml_creation_dates[0] - xml_creation_dates[1]).total_seconds() + ) + if xml_creation_diff_seconds > 30: + LOGGER.warning( + 'File creation times differ by more than 30 seconds. ' + 'Skipping loading...' + ) + return False + else: + LOGGER.debug( + f'File creation times differ by {xml_creation_diff_seconds} ' + 'seconds. Proceeding...' + ) + + self.sitecode = os.path.basename(filepath)[:-6] + try: + self.citycode = self.wxo_lookup[self.sitecode]['citycode'] + except KeyError: + LOGGER.error( + f'ERROR: cannot find sitecode {self.sitecode} key in WxO ' + 'lookup table.' + ) + return False + + data = self.xml2json_cpw() if data: try: @@ -271,7 +1185,7 @@ def load_data(self, filepath): index=INDEX_NAME, id=data['properties']['identifier'], doc_as_upsert=True, - doc=data + doc=data, ) LOGGER.debug(f'Result: {r}') return True @@ -279,224 +1193,853 @@ def load_data(self, filepath): LOGGER.warning(f'Error indexing: {err}') return False - def _get_element(self, node, path, attrib=None): + def _node_to_dict(self, node, lang=None): """ - Convenience function to resolve lxml.etree.Element handling + Convert an lxml.etree.Element to a dict - :param node: xml node - :param path: path in the xml node - :param attrib: attribute to get in the node + :param node: `lxml.etree.Element` node - returns: attribute as text or None + returns: `dict` representation of xml node """ - val = node.find(path) - if attrib is not None and val is not None: - return val.attrib.get(attrib) - if hasattr(val, 'text') and val.text not in [None, '']: - return val.text + if node is not None: + # print node raw tag + # if node has no attributes, just return the text + if not node.attrib and node.text: + if lang: + return {lang: safe_cast_to_number(node.text)} + else: + return safe_cast_to_number(node.text) + else: + node_dict = {} + for attrib in node.attrib: + if node.attrib[attrib]: + if lang: + node_dict[attrib] = { + lang: safe_cast_to_number(node.attrib[attrib]) + } + else: + node_dict[attrib] = safe_cast_to_number( + node.attrib[attrib] + ) + + if node.text: + if lang: + node_dict['value'] = {lang: safe_cast_to_number(node.text)} + else: + node_dict['value'] = safe_cast_to_number(node.text) + + return node_dict + return None - def if_none(self, type_, value): + def _deep_merge(self, d1, d2): """ - Convenience fonction to avoid errors when - converting to int or float + Deep merge two dictionaries + :param d1: `dict` to merge into + :param d2: `dict` to merge from - :param type_: f for float and i for int - :param value: value to convert to float/int + :returns: `dict` of merged dictionaries + """ + for key in d2: + if key in d1: + if isinstance(d1[key], dict) and isinstance(d2[key], dict): + self._deep_merge(d1[key], d2[key]) + else: + d1[key] = d2[key] + else: + d1[key] = d2[key] + return d1 - :returns: converted variable + def _set_nested_value(self, d, keys, value): """ + Set nested value in dictionary, and merges dictionaries if they + already exist at path + :param d: `dict` to set value in + :param keys: `list` of keys + :param value: value to set - try: - if type_ == 'f': - variable = float(value) if value else None - elif type_ == 'i': - variable = int(value) if value else None - except ValueError: - variable = value + :returns: `dict` of modified dictionary + """ + for key in keys[:-1]: + d = d.setdefault(key, {}) + + if keys[-1] in d: + # try to merge dictionaries + if isinstance(value, dict): + for k, v in value.items(): + if k in d[keys[-1]]: + if isinstance(v, dict): + d[keys[-1]][k] = self._deep_merge( + d[keys[-1]][k], v + ) + else: + d[keys[-1]][k] = v + else: + d[keys[-1]][k] = v + else: + d[keys[-1]] = value + else: + d[keys[-1]] = value - return variable + return d - def xml2json_cpw(self, wxo_lookup, xml): + def _get_utc_timestamp(self, node): """ - main for generating weather data + Get timestamp from node + :param node: `lxml.etree.Element` node - :param wxo_lookup: json file to have the city id - :param xml: xml file to parse and convert to json + :returns: `dict` of timestamp + """ + timestamp = node.find('timeStamp') + if timestamp is not None: + dt = datetime.strptime(timestamp.text, '%Y%m%d%H%M%S') + return {self.lang: dt.strftime('%Y-%m-%dT%H:%M:%SZ')} + return None - :returns: xml as json object + def _set_cpw_location(self): """ + Set location and identifier information for the citypageweather object - feature = {} - row = {} + :returns: `dict` of modified citypageweather object + """ - LOGGER.debug(f'Processing XML: {xml}') - LOGGER.debug('Fetching English elements') + self.cpw_feature['properties']['identifier'] = self.citycode - try: - root = etree.parse(xml).getroot() - except Exception as err: - LOGGER.error(f'ERROR: cannot process data: {err}') - return None + location = self.xml_root.find('location') + if location is not None: + self._set_nested_value( + self.cpw_feature['properties'], + ['name'], + {self.lang: location.find('name').text}, + ) - if root.findall("currentConditions/"): - sitecode = os.path.basename(xml)[:-6] - try: - citycode = wxo_lookup[sitecode]['citycode'] - except KeyError as err: - LOGGER.error(f'ERROR: cannot find sitecode {sitecode}: {err}') + self._set_nested_value( + self.cpw_feature['properties'], + ['region'], + {self.lang: location.find('region').text}, + ) + + lon = location.find('name').attrib.get('lon') + lat = location.find('name').attrib.get('lat') + + lon, lon_dir = float(lon[:-1]), lon[-1] + lat, lat_dir = float(lat[:-1]), lat[-1] + + if lon_dir in ['W', 'O']: + lon *= -1 # west means negative longitude + if lat_dir == 'S': + lat *= -1 # south means negative latitude + + self.cpw_feature['geometry'] = { + 'type': 'Point', + 'coordinates': [lon, lat, 0.0], + } + + if self.lang == 'en': + self._set_nested_value( + self.cpw_feature['properties'], + ['url'], + { + self.lang: f'https://weather.gc.ca/city/pages/{self.citycode}_metric_e.html' # noqa + }, + ) + else: + self._set_nested_value( + self.cpw_feature['properties'], + ['url'], + { + self.lang: f'https://meteo.gc.ca/city/pages/{self.citycode}_metric_f.html' # noqa + }, + ) + + return self.cpw_feature + + def _set_cpw_current_conditions(self): + """ + Set current conditions information for the citypageweather object + + :returns: `dict` of modified citypageweather object + """ - location_name = root.find('location/name') - x = float(location_name.attrib.get('lon')[:-1]) - y = float(location_name.attrib.get('lat')[:-1]) + current_conditions = self.xml_root.find("currentConditions") - if location_name.attrib.get('lat')[-1] == 'S': - y *= -1 # south means negative latitude - elif location_name.attrib.get('lon')[-1] in ['W', 'O']: - x *= -1 # west means negative longitude + if current_conditions is not None and len(current_conditions): - feature['geom'] = [x, y, 0.0] - icon = self._get_element(root, 'currentConditions/iconCode') + current_conditions_dict = {} - if icon: - row['icon'] = f'https://weather.gc.ca/weathericons/{icon}.gif' + iconCode = current_conditions_dict['iconCode'] = ( + self._node_to_dict(current_conditions.find('iconCode')) + ) + + if iconCode and 'value' in iconCode: + current_conditions_dict['iconCode'][ + 'url' + ] = f'https://weather.gc.ca/weathericons/{current_conditions_dict["iconCode"]["value"]:02d}.gif' # noqa + + for date in self.xml_root.findall( + "currentConditions/dateTime" + "[@zone='UTC'][@name='observation']" + ): + timestamp = self._get_utc_timestamp(date) + if timestamp: + current_conditions_dict['timestamp'] = timestamp + + kv_mapping = { + 'relativeHumidity': 'relativeHumidity', + 'wind': [ + 'wind/speed', + 'wind/gust', + 'wind/direction', + 'wind/bearing', + ], + 'pressure': 'pressure', + 'temperature': 'temperature', + 'dewpoint': 'dewpoint', + 'windChill': 'windChill', + 'station': 'station', + 'condition': 'condition', + } + + for key, value in kv_mapping.items(): + if isinstance(value, list): + _dict = {} + for val in value: + node = current_conditions.find(val) + if node is not None and node.text: + _dict[val.split('/')[-1]] = self._node_to_dict( + current_conditions.find(val), self.lang + ) + if _dict: + current_conditions_dict[key] = _dict + else: + node = current_conditions.find(value) + if node is not None and (node.attrib or node.text): + current_conditions_dict[key] = self._node_to_dict( + current_conditions.find(value), self.lang + ) + + if self.cpw_feature.get('properties', {}).get( + 'currentConditions', {} + ): + existing_dict = self.cpw_feature['properties'][ + 'currentConditions' + ] + current_conditions_dict = self._deep_merge( + existing_dict, current_conditions_dict + ) else: - row['icon'] = None + self.cpw_feature['properties'][ + 'currentConditions' + ] = current_conditions_dict - for dates in root.findall("currentConditions/dateTime" - "[@zone='UTC'][@name='observation']"): - timestamp = dates.find('timeStamp') - if timestamp is not None: - dt2 = datetime.strptime(timestamp.text, '%Y%m%d%H%M%S') - row['timestamp'] = dt2.strftime('%Y-%m-%dT%H:%M:%SZ') - - row['rel_hum'] = self._get_element( - root, - 'currentConditions/relativeHumidity') - row['speed'] = self._get_element(root, - 'currentConditions/wind/speed') - row['gust'] = self._get_element(root, - 'currentConditions/wind/gust') - row['direction'] = self._get_element( - root, - 'currentConditions/wind/direction') - row['bearing'] = self._get_element( - root, 'currentConditions/wind/bearing') - row['temp'] = self._get_element( - root, 'currentConditions/temperature') - row['dewpoint'] = self._get_element( - root, 'currentConditions/dewpoint') - row['windchill'] = self._get_element( - root, 'currentConditions/windChill') - - if xml.endswith('e.xml'): - row['name'] = self._get_element(root, 'location/name') - row['station_en'] = self._get_element( - root, 'currentConditions/station') - row['cond_en'] = self._get_element( - root, 'currentConditions/condition') - row['pres_en'] = self._get_element( - root, 'currentConditions/pressure') - row['prestnd_en'] = self._get_element( - root, - 'currentConditions/pressure', - 'tendency') - row['url_en'] = f'https://weather.gc.ca/city/pages/{citycode}_metric_e.html' # noqa - - row['national'] = 0 - if row['name'] in NATIONAL_CITIES: - row['national'] = 1 - - LOGGER.debug('Adding feature') - LOGGER.debug('Setting geometry') - - conditions = { - 'type': "Feature", - 'properties': { - 'identifier': citycode, - 'name': row['name'], - 'station_en': row['station_en'], - 'icon': row['icon'], - 'cond_en': row['cond_en'], - 'temp': self.if_none('f', row['temp']), - 'dewpoint': self.if_none('f', row['dewpoint']), - 'windchill': self.if_none('i', row['windchill']), - 'pres_en': self.if_none('f', row['pres_en']), - 'prestnd_en': row['prestnd_en'], - 'rel_hum': self.if_none('i', row['rel_hum']), - 'speed': self.if_none('i', row['speed']), - 'gust': self.if_none('i', row['gust']), - 'direction_en': row['direction'], - 'bearing': self.if_none('f', row['bearing']), - 'timestamp': row['timestamp'], - 'url_en': row['url_en'], - 'national': int(row['national']) - }, - 'geometry': { - 'type': "Point", - 'coordinates': feature['geom'] + return self.cpw_feature + + def _set_cpw_forecast_group_regional_normals(self): + """ + Set regional normals information for the citypageweather object + + :returns: `dict` of modified citypageweather object + """ + + regional_normals = self.xml_root.find('forecastGroup/regionalNormals') + if regional_normals is not None and len(regional_normals): + regional_normals_dict = {} + textSummary = regional_normals.find('textSummary') + temperatures = regional_normals.findall('temperature') + + if textSummary.text: + self._set_nested_value( + regional_normals_dict, + ['textSummary'], + {self.lang: textSummary.text}, + ) + + regional_high_lows = [ + self._node_to_dict(temp, self.lang) for temp in temperatures + ] + + # if properties.forecast_group.regionalNormals.temperature + # exists, retrieve them and update them with the new values + if ( + self.cpw_feature.get('properties', {}) + .get('forecastGroup', {}) + .get('regionalNormals', {}) + .get('temperature', {}) + ): + for i, temp in enumerate(regional_high_lows): + existing_dict = self.cpw_feature['properties'][ + 'forecastGroup' + ]['regionalNormals']['temperature'][i] + regional_high_lows[i] = self._deep_merge( + existing_dict, temp + ) + + if regional_high_lows: + regional_normals_dict['temperature'] = regional_high_lows + + if ( + self.cpw_feature.get('properties', {}) + .get('forecastGroup', {}) + .get('regionalNormals', {}) + ): + existing_dict = self.cpw_feature['properties'][ + 'forecastGroup' + ]['regionalNormals'] + regional_normals_dict = self._deep_merge( + existing_dict, regional_normals_dict + ) + else: + self.cpw_feature['properties']['forecastGroup'][ + 'regionalNormals' + ] = regional_normals_dict + + return self.cpw_feature + + def _set_forecast_general_info(self, forecast, forecast_dict): + """ + Set general forecast information for the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + period_dict = self._node_to_dict(forecast.find('period')) + + self._set_nested_value( + forecast_dict, + ['period', 'textForecastName'], + {self.lang: period_dict.get('textForecastName')}, + ) + self._set_nested_value( + forecast_dict, + ['period', 'value'], + {self.lang: period_dict.get('value')}, + ) + self._set_nested_value( + forecast_dict, + ['textSummary'], + {self.lang: _get_element(forecast, 'textSummary')}, + ) + + return forecast_dict + + def _set_forecast_cloud_precip(self, forecast_elem, forecast_dict): + """ + Set cloud precipitation forecast information for + the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + cloud_precip = forecast_elem.find('cloudPrecip') + if cloud_precip is not None and len(cloud_precip): + self._set_nested_value( + forecast_dict, + ['cloudPrecip'], + {self.lang: cloud_precip.find('textSummary').text}, + ) + + return forecast_dict + + def _set_forecast_abbreviated_forecast(self, forecast_elem, forecast_dict): + """ + Set abbreviated forecast information for the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + abbreviated_forecast = forecast_elem.find('abbreviatedForecast') + if abbreviated_forecast is not None and len(abbreviated_forecast): + self._set_nested_value( + forecast_dict, + ['abbreviatedForecast', 'textSummary'], + {self.lang: abbreviated_forecast.find('textSummary').text}, + ) + + self._set_nested_value( + forecast_dict, + ['abbreviatedForecast', 'icon'], + self._node_to_dict(abbreviated_forecast.find('iconCode')), + ) + + self._set_nested_value( + forecast_dict, + ['abbreviatedForecast', 'icon', 'url'], + f'https://weather.gc.ca/weathericons/{forecast_dict["abbreviatedForecast"]["icon"]["value"]:02d}.gif', # noqa + ) + + return forecast_dict + + def _set_forecast_temperatures(self, forecast_elem, forecast_dict): + """ + Set temperatures forecast information for the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + temperatures = forecast_elem.find('temperatures') + if temperatures is not None and len(temperatures): + self._set_nested_value( + forecast_dict, + ['temperatures', 'textSummary'], + {self.lang: temperatures.find('textSummary').text}, + ) + + temps = [] + for i, temp in enumerate(temperatures.findall('temperature')): + temp_dict = self._node_to_dict(temp, self.lang) + # get existing forecast_dict['temperatures']['temperature'][i] + # if it exists + if i < len( + forecast_dict['temperatures'].get('temperature', []) + ): + existing_dict = forecast_dict['temperatures'][ + 'temperature' + ][i] + for key in existing_dict.keys(): + if key in temp_dict: + existing_dict[key] = { + **existing_dict[key], + **temp_dict[key], + } + else: + temps.append(temp_dict) + if temps: + self._set_nested_value( + forecast_dict, ['temperatures', 'temperature'], temps + ) + + return forecast_dict + + def _set_forecast_winds(self, forecast_elem, forecast_dict): + """ + Set winds forecast information for the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + winds = forecast_elem.find('winds') + if winds is not None and len(winds): + if winds.find('textSummary') is not None: + self._set_nested_value( + forecast_dict, + ['winds', 'textSummary'], + {self.lang: _get_element(winds, 'textSummary')}, + ) + + periods = [] + for i, period in enumerate(winds.findall('wind')): + wind_period_dict = {} + attrs = ['index', 'rank'] + for attr in attrs: + wind_period_dict[attr] = { + self.lang: safe_cast_to_number(period.attrib.get(attr)) } - } - elif xml.endswith('f.xml'): - LOGGER.debug(f'Processing {xml}') - - row['nom'] = self._get_element(root, 'location/name') - row['station_fr'] = self._get_element( - root, 'currentConditions/station') - row['cond_fr'] = self._get_element( - root, 'currentConditions/condition') - row['pres_fr'] = self._get_element( - root, 'currentConditions/pressure') - row['prestnd_fr'] = self._get_element( - root, - 'currentConditions/pressure', - 'tendency') - row['url_fr'] = f'https://meteo.gc.ca/city/pages/{citycode}_metric_f.html' # noqa - - row['national'] = 0 - if row['nom'] in NATIONAL_CITIES: - row['national'] = 1 - - LOGGER.debug('Adding feature') - LOGGER.debug('Setting geometry') - - conditions = { - 'type': "Feature", - 'properties': { - 'identifier': citycode, - 'nom': row['nom'], - 'station_fr': row['station_fr'], - 'icon': row['icon'], - 'cond_fr': row['cond_fr'], - 'temp': self.if_none('f', row['temp']), - 'dewpoint': self.if_none('f', row['dewpoint']), - 'windchill': self.if_none('i', row['windchill']), - 'pres_fr': self.if_none('f', row['pres_fr']), - 'prestnd_fr': row['prestnd_fr'], - 'rel_hum': self.if_none('i', row['rel_hum']), - 'speed': self.if_none('i', row['speed']), - 'gust': self.if_none('i', row['gust']), - 'direction_fr': row['direction'], - 'bearing': self.if_none('f', row['bearing']), - 'timestamp': row['timestamp'], - 'url_fr': row['url_fr'], - 'national': int(row['national'])}, - 'geometry': { - 'type': "Point", - 'coordinates': feature['geom'] + nodes = ['speed', 'gust', 'direction', 'bearing'] + for node in nodes: + wind_period_dict[node] = self._node_to_dict( + period.find(node), self.lang + ) + + if i < len(forecast_dict.get('winds', {}).get('periods', [])): + existing_dict = forecast_dict['winds']['periods'][i] + forecast_dict['winds']['periods'][i] = self._deep_merge( + existing_dict, wind_period_dict + ) + else: + periods.append(wind_period_dict) + + if periods: + self._set_nested_value( + forecast_dict, ['winds', 'periods'], periods + ) + + return forecast_dict + + def _set_forecast_precipitation(self, forecast_elem, forecast_dict): + """ + Set precipitation forecast information for the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + precipitation = forecast_elem.find('precipitation') + if precipitation is not None and len(precipitation): + + # set textSummary if it exists + if precipitation.find('textSummary').text: + self._set_nested_value( + forecast_dict, + ['precipitation', 'textSummary'], + {self.lang: _get_element(precipitation, 'textSummary')}, + ) + + # get precipitation periods + precip_periods = [] + for i, precip_type in enumerate( + precipitation.findall('precipType') + ): + if precip_type.attrib.get('start') and precip_type.attrib.get( + 'end' + ): + precip_type_dict = self._node_to_dict( + precip_type, self.lang + ) + if i < len( + forecast_dict.get('precipitation', {}).get( + 'precipPeriods', [] + ) + ): + existing_dict = forecast_dict['precipitation'][ + 'precipPeriods' + ][i] + forecast_dict['precipitation']['precipPeriods'][i] = ( + self._deep_merge(existing_dict, precip_type_dict) + ) + else: + precip_periods.append(precip_type_dict) + + if precip_periods: + self._set_nested_value( + forecast_dict, + ['precipitation', 'precipPeriods'], + precip_periods, + ) + + # get accumulation + accumulation = precipitation.find('accumulation') + if accumulation is not None and len(accumulation): + name = accumulation.find('name').text + amount = accumulation.find('amount') + + if name or amount: + accumulation_dict = { + 'name': {self.lang: name}, + 'amount': self._node_to_dict(amount, self.lang), } + if 'accumulation' in forecast_dict['precipitation']: + existing_accumulation_dict = forecast_dict[ + 'precipitation' + ]['accumulation'] + forecast_dict['precipitation']['accumulation'] = ( + self._deep_merge( + existing_accumulation_dict, accumulation_dict + ) + ) + + if 'accumulation' not in forecast_dict['precipitation']: + forecast_dict['precipitation']['accumulation'] = { + 'name': {self.lang: name}, + 'amount': self._node_to_dict(amount, self.lang), + } + + return forecast_dict + + def _set_forecast_windchill(self, forecast_elem, forecast_dict): + """ + Set windchill forecast information for the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + windchill = forecast_elem.find('windChill') + if windchill is not None and len(windchill): + if windchill.find('textSummary').text: + self._set_nested_value( + forecast_dict, + ['windChill', 'textSummary'], + {self.lang: windchill.find('textSummary').text}, + ) + + if windchill.find('calculated').text: + self._set_nested_value( + forecast_dict, + ['windChill', 'calculated'], + {self.lang: windchill.find('calculated').text}, + ) + + if windchill.find('frostbite').text: + self._set_nested_value( + forecast_dict, + ['windChill', 'frostbite'], + {self.lang: windchill.find('frostbite').text}, + ) + + return forecast_dict + + def _set_forecast_uv(self, forecast, forecast_dict): + """ + Set uv forecast information for the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + uv = forecast.find('uv') + if uv is not None and len(uv): + uv_dict = {} + if uv.attrib.get('category'): + uv_dict['category'] = {self.lang: uv.attrib.get('category')} + + if ( + uv.find('textSummary') is not None + and uv.find('textSummary').text + ): + uv_dict['textSummary'] = { + self.lang: uv.find('textSummary').text } - conditions['properties'] = {key:val for key, val in conditions['properties'].items() if val is not None} # noqa - return conditions + if uv.find('index').text: + uv_dict['index'] = {self.lang: uv.find('index').text} - else: - LOGGER.warning( - f'No current conditions found. Skippping file {xml}.' + if 'uv' in forecast_dict: + existing_dict = forecast_dict['uv'] + forecast_dict['uv'] = self._deep_merge(existing_dict, uv_dict) + else: + forecast_dict['uv'] = uv_dict + + return forecast_dict + + def _set_forecast_rel_hum(self, forecast_elem, forecast_dict): + """ + Set relative humidity forecast information for + the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + rel_hum = forecast_elem.find('relativeHumidity') + if rel_hum is not None: + self._set_nested_value( + forecast_dict, + ['relativeHumidity'], + self._node_to_dict(rel_hum, self.lang), ) - return None + + return forecast_dict + + def _set_forecast_humidex(self, forecast_elem, forecast_dict): + """ + Set humidex forecast information for the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + humidex = forecast_elem.find('humidex') + if humidex is not None and len(humidex): + if humidex.find('textSummary').text: + self._set_nested_value( + forecast_dict, + ['humidex', 'textSummary'], + {self.lang: humidex.find('textSummary').text}, + ) + + if humidex.find('calculated').text: + self._set_nested_value( + forecast_dict, + ['humidex', 'calculated'], + {self.lang: humidex.find('calculated').text}, + ) + + return forecast_dict + + def _set_forecast_visibility(self, forecast_elem, forecast_dict): + """ + Set visibility forecast information for the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + visibility = forecast_elem.find('visibility') + if visibility is not None and len(visibility): + visibility_dict = {} + for child in visibility: + if 'Visib' in child.tag and child.text: + visibility_dict['textSummary'] = { + self.lang: child.find('textSummary').text + } + visibility_dict['cause'] = child.attrib.get('cause') + + if visibility_dict: + self._set_nested_value( + forecast_dict, ['visibility'], visibility_dict + ) + + return forecast_dict + + def _set_forecast_snowlevel(self, forecast_elem, forecast_dict): + """ + Set snowlevel forecast information for the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + snow_level = forecast_elem.find('snowLevel') + if snow_level is not None and len(snow_level): + snow_level_dict = {} + if snow_level.find('textSummary').text: + snow_level_dict['textSummary'] = { + self.lang: snow_level.find('textSummary').text + } + + if snow_level_dict: + self._set_nested_value( + forecast_dict, ['snowLevel'], snow_level_dict + ) + + return forecast_dict + + def _set_forecast_frost(self, forecast_elem, forecast_dict): + """ + Set frost forecast information for the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + frost = forecast_elem.find('frost') + if frost is not None and len(frost): + frost_dict = {} + if frost.find('textSummary').text: + frost_dict['textSummary'] = { + self.lang: frost.find('textSummary').text + } + + if frost_dict: + self._set_nested_value(forecast_dict, ['frost'], frost_dict) + + return forecast_dict + + def _set_cpw_forecast_group(self): + """ + Set forecast group information for the citypageweather object + + :returns: `dict` of modified citypageweather object + """ + + forecast_group = self.xml_root.find("forecastGroup") + + if forecast_group is not None and len(forecast_group): + if not 'forecastGroup' in self.cpw_feature['properties']: # noqa + self.cpw_feature['properties']['forecastGroup'] = {} + + for date in forecast_group.findall( + "dateTime" "[@zone='UTC'][@name='forecastIssue']" + ): + timestamp = self._get_utc_timestamp(date) + if timestamp is not None: + self._set_nested_value( + self.cpw_feature['properties']['forecastGroup'], + ['timestamp'], + timestamp, + ) + + self._set_cpw_forecast_group_regional_normals() + + # iterate over forecasts and populate + forecasts = forecast_group.findall("forecast") + + if ( + forecasts is not None + and len(forecasts) + and 'forecasts' + not in self.cpw_feature['properties']['forecastGroup'] + ): + self.cpw_feature['properties']['forecastGroup'][ + 'forecasts' + ] = [] + + for i, forecast_elem in enumerate(forecasts): + # if index exists in + # self.cpw_feature['properties']['forecastGroup']['forecasts'] + # use it, otherwise create a new dict + if i < len( + self.cpw_feature['properties']['forecastGroup'][ + 'forecasts' + ] + ): + forecast_dict = self.cpw_feature['properties'][ + 'forecastGroup' + ]['forecasts'][i] + else: + forecast_dict = {} + + # get list of all self._get_forecast_* functions and call each + # function to populate each forecast object + set_forecast_funcs = [ + getattr(self, f) + for f in dir(self) + if f.startswith('_set_forecast_') + ] + + for forecast_func in set_forecast_funcs: + forecast_dict = forecast_func(forecast_elem, forecast_dict) + + self.cpw_feature['properties']['forecastGroup'][ + 'forecasts' + ].append(forecast_dict) + + if not self.cpw_feature['properties'].get('forecast_group'): + self.cpw_feature['properties'].pop('forecast_group', None) + + return self.cpw_feature + + def xml2json_cpw(self): + """ + main for generating weather data + + :param wxo_lookup: json file to have the city id + :param xml: xml file to parse and convert to json + + :returns: xml as json object + """ + + for lang, xml_root in self.xml_roots.items(): + self.xml_root = xml_root + self.lang = lang + self._set_cpw_location() + self._set_cpw_current_conditions() + self._set_cpw_forecast_group() + + return self.cpw_feature @click.group() @@ -542,7 +2085,7 @@ def add(ctx, file_, directory, es, username, password, ignore_certs): @click.pass_context @cli_options.OPTION_DAYS( default=DAYS_TO_KEEP, - help=f'Delete documents older than n days (default={DAYS_TO_KEEP})' + help=f'Delete documents older than n days (default={DAYS_TO_KEEP})', ) @cli_options.OPTION_ELASTICSEARCH() @cli_options.OPTION_ES_USERNAME() @@ -558,18 +2101,11 @@ def clean_records(ctx, days, es, username, password, ignore_certs): conn = ElasticsearchConnector(conn_config) older_than = (datetime.now() - timedelta(days=days)).strftime( - '%Y-%m-%d %H:%M') + '%Y-%m-%d %H:%M' + ) click.echo(f'Deleting documents older than {older_than} ({days} days)') - query = { - 'query': { - 'range': { - 'properties.datetime': { - 'lte': older_than - } - } - } - } + query = {'query': {'range': {'properties.datetime': {'lte': older_than}}}} conn.Elasticsearch.delete_by_query(index=INDEX_NAME, body=query) @@ -580,9 +2116,7 @@ def clean_records(ctx, days, es, username, password, ignore_certs): @cli_options.OPTION_ES_USERNAME() @cli_options.OPTION_ES_PASSWORD() @cli_options.OPTION_ES_IGNORE_CERTS() -@cli_options.OPTION_YES( - prompt='Are you sure you want to delete this index?' -) +@cli_options.OPTION_YES(prompt='Are you sure you want to delete this index?') def delete_index(ctx, es, username, password, ignore_certs): """Delete current conditions index""" diff --git a/msc_pygeoapi/provider/cpw_elasticsearch.py b/msc_pygeoapi/provider/cpw_elasticsearch.py new file mode 100644 index 00000000..346c62f2 --- /dev/null +++ b/msc_pygeoapi/provider/cpw_elasticsearch.py @@ -0,0 +1,562 @@ +# ================================================================= +# +# Authors: Etienne Pelletier +# +# Copyright (c) 2024 Etienne Pelletier +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= +from typing import Dict +from collections import OrderedDict +import json +import logging + +from elasticsearch import exceptions, helpers +from elasticsearch_dsl import Search, Q + +from pygeoapi.provider.base import ProviderConnectionError, ProviderQueryError +from pygeoapi.models.cql import CQLModel, get_next_node +from pygeoapi.util import get_envelope, crs_transform + +from msc_pygeoapi.provider.elasticsearch import MSCElasticsearchProvider + + +LOGGER = logging.getLogger(__name__) + + +class CPWElasticsearchProvider(MSCElasticsearchProvider): + """CPW Elasticsearch Provider""" + + def __init__(self, provider_def): + """ + Initialize object + + :param provider_def: provider definition + + :returns: msc_pygeoapi.provider.elasticsearch.CPWElasticsearchProvider + """ + self._nested_fields = [] + super().__init__(provider_def) + + def _assign_field(self, field_name, field_def): + """ + Assign field based on Elasticsearch field definition + + :param field_name: field name + :param field_def: field definition + + :returns: None + """ + + if 'properties' in field_def: + for k, v in field_def['properties'].items(): + self._assign_field(f'{field_name}.{k}', v) + self._fields[field_name] = {'type': 'object'} + + if 'type' in field_def: + if field_def['type'] == 'text': + self._fields[field_name] = {'type': 'string'} + elif field_def['type'] == 'date': + self._fields[field_name] = {'type': 'string', 'format': 'date'} + elif field_def['type'] in ('float', 'long'): + self._fields[field_name] = { + 'type': 'number', + 'format': field_def['type'], + } + elif field_def['type'] == 'nested': + self._nested_fields.append(field_name) + self._fields[field_name] = {'type': field_def['type']} + else: + self._fields[field_name] = {'type': field_def['type']} + + def get_fields(self): + """ + Get provider field information (names, types) + + :returns: dict of fields + """ + if not self._fields: + ii = self.es.indices.get( + index=self.index_name, allow_no_indices=False + ) + + LOGGER.debug(f'Response: {ii}') + try: + if '*' not in self.index_name: + mappings = ii[self.index_name]['mappings'] + p = mappings['properties']['properties'] + else: + LOGGER.debug('Wildcard index; setting from first match') + index_name_ = list(ii.keys())[0] + p = ii[index_name_]['mappings']['properties']['properties'] + except KeyError: + LOGGER.warning('Trying for alias') + alias_name = next(iter(ii)) + p = ii[alias_name]['mappings']['properties']['properties'] + except IndexError: + LOGGER.warning('could not get fields; returning empty set') + return {} + + for k, v in p['properties'].items(): + self._assign_field(k, v) + + return self._fields + + @crs_transform + def query( + self, + offset=0, + limit=10, + resulttype='results', + bbox=[], + datetime_=None, + properties=[], + sortby=[], + select_properties=[], + skip_geometry=False, + q=None, + filterq=None, + **kwargs, + ): + """ + query Elasticsearch index + + :param offset: starting record to return (default 0) + :param limit: number of records to return (default 10) + :param resulttype: return results or hit limit (default results) + :param bbox: bounding box [minx,miny,maxx,maxy] + :param datetime_: temporal (datestamp or extent) + :param properties: list of tuples (name, value) + :param sortby: list of dicts (property, order) + :param select_properties: list of property names + :param skip_geometry: bool of whether to skip geometry (default False) + :param q: full-text search term(s) + :param filterq: filter object + + :returns: dict of 0..n GeoJSON features + """ + + self.select_properties = select_properties + + query = {'track_total_hits': True, 'query': {'bool': {'filter': []}}} + filter_ = [] + + feature_collection = {'type': 'FeatureCollection', 'features': []} + + if resulttype == 'hits': + LOGGER.debug('hits only specified') + limit = 0 + + if bbox: + LOGGER.debug('processing bbox parameter') + minx, miny, maxx, maxy = bbox + bbox_filter = { + 'geo_shape': { + 'geometry': { + 'shape': { + 'type': 'envelope', + 'coordinates': [[minx, maxy], [maxx, miny]], + }, + 'relation': 'intersects', + } + } + } + + query['query']['bool']['filter'].append(bbox_filter) + + if datetime_ is not None: + LOGGER.debug('processing datetime parameter') + if self.time_field is None: + LOGGER.error('time_field not enabled for collection') + raise ProviderQueryError() + + time_field = self.mask_prop(self.time_field) + + if '/' in datetime_: # envelope + LOGGER.debug('detected time range') + time_begin, time_end = datetime_.split('/') + + range_ = { + 'range': {time_field: {'gte': time_begin, 'lte': time_end}} + } + if time_begin == '..': + range_['range'][time_field].pop('gte') + elif time_end == '..': + range_['range'][time_field].pop('lte') + + filter_.append(range_) + + else: # time instant + LOGGER.debug('detected time instant') + filter_.append({'match': {time_field: datetime_}}) + + LOGGER.debug(filter_) + query['query']['bool']['filter'].append(*filter_) + + if properties: + LOGGER.debug('processing properties') + for prop in properties: + prop_name = self.mask_prop(prop[0]) + matching_nested_field = next( + (f for f in self._nested_fields if prop[0].startswith(f)), + None, + ) + if matching_nested_field: + prop_values = prop[1].split('|') + occur = 'should' if '|' in prop[1] else 'must' + pf = { + 'nested': { + 'path': f'{self.mask_prop(matching_nested_field)}', + 'query': {'bool': {occur: []}}, + }, + } + for prop_value in prop_values: + pf['nested']['query']['bool'][occur].append( + {'match': {prop_name: {'query': prop_value}}} + ) + query['query']['bool']['filter'].append(pf) + else: + pf = {'match': {prop_name: {'query': prop[1]}}} + query['query']['bool']['filter'].append(pf) + + if '|' not in prop[1]: + pf['match'][prop_name]['minimum_should_match'] = '100%' + + if sortby: + LOGGER.debug('processing sortby') + query['sort'] = [] + for sort in sortby: + LOGGER.debug(f'processing sort object: {sort}') + + sp = sort['property'] + + if ( + self.fields[sp]['type'] == 'string' + and self.fields[sp].get('format') != 'date' + ): + LOGGER.debug('setting ES .raw on property') + sort_property = f'{self.mask_prop(sp)}.raw' + else: + sort_property = self.mask_prop(sp) + + sort_order = 'asc' + if sort['order'] == '-': + sort_order = 'desc' + + sort_ = {sort_property: {'order': sort_order}} + query['sort'].append(sort_) + + if q is not None: + LOGGER.debug('Adding free-text search') + query['query']['bool']['must'] = {'query_string': {'query': q}} + + query['_source'] = { + 'excludes': [ + 'properties._metadata-payload', + 'properties._metadata-schema', + 'properties._metadata-format', + ] + } + + if self.properties or self.select_properties: + LOGGER.debug('filtering properties') + + all_properties = self.get_properties() + + query['_source'] = { + 'includes': list(map(self.mask_prop, all_properties)) + } + + query['_source']['includes'].append('id') + query['_source']['includes'].append('type') + query['_source']['includes'].append('geometry') + + if skip_geometry: + LOGGER.debug('excluding geometry') + try: + query['_source']['excludes'] = ['geometry'] + except KeyError: + query['_source'] = {'excludes': ['geometry']} + try: + LOGGER.debug('querying Elasticsearch') + if filterq: + LOGGER.debug(f'adding cql object: {filterq.json()}') + query = update_query(input_query=query, cql=filterq) + LOGGER.debug(json.dumps(query, indent=4)) + + LOGGER.debug('Testing for ES scrolling') + if offset + limit > 10000: + gen = helpers.scan( + client=self.es, + query=query, + preserve_order=True, + index=self.index_name, + ) + results = {'hits': {'total': limit, 'hits': []}} + for i in range(offset + limit): + try: + if i >= offset: + results['hits']['hits'].append(next(gen)) + else: + next(gen) + except StopIteration: + break + + matched = len(results['hits']['hits']) + offset + returned = len(results['hits']['hits']) + else: + es_results = self.es.search( + index=self.index_name, from_=offset, size=limit, **query + ) + results = es_results + matched = es_results['hits']['total']['value'] + returned = len(es_results['hits']['hits']) + + except exceptions.ConnectionError as err: + LOGGER.error(err) + raise ProviderConnectionError() + except exceptions.RequestError as err: + LOGGER.error(err) + raise ProviderQueryError() + except exceptions.NotFoundError as err: + LOGGER.error(err) + raise ProviderQueryError() + + feature_collection['numberMatched'] = matched + + if resulttype == 'hits': + return feature_collection + + feature_collection['numberReturned'] = returned + + LOGGER.debug('serializing features') + for feature in results['hits']['hits']: + feature_ = self.esdoc2geojson(feature) + feature_collection['features'].append(feature_) + + return feature_collection + + def esdoc2geojson(self, doc): + """ + generate GeoJSON `dict` from ES document + + :param doc: `dict` of ES document + + :returns: GeoJSON `dict` + """ + + feature_ = {} + feature_thinned = {} + + LOGGER.debug('Fetching id and geometry from GeoJSON document') + feature_ = doc['_source'] + + try: + id_ = doc['_source']['properties'][self.id_field] + except KeyError as err: + LOGGER.debug(f'Missing field: {err}') + id_ = doc['_source'].get('id', doc['_id']) + + feature_['id'] = id_ + feature_['geometry'] = doc['_source'].get('geometry') + if self.properties or self.select_properties: + LOGGER.debug('Filtering properties') + all_properties = self.get_properties() + LOGGER.debug(all_properties) + feature_thinned = { + 'id': id_, + 'type': feature_['type'], + 'geometry': feature_.get('geometry'), + 'properties': OrderedDict(), + } + LOGGER.debug(feature_) + for p in all_properties: + try: + feature_thinned['properties'][p] = feature_['properties'][ + p + ] # noqa + except KeyError: + LOGGER.debug('Property not found in source feature.') + pass + + if feature_thinned: + return feature_thinned + else: + return feature_ + + +class ESQueryBuilder: + def __init__(self): + self._operation = None + self.must_value = {} + self.should_value = {} + self.mustnot_value = {} + self.filter_value = {} + + def must(self, must_value): + self.must_value = must_value + return self + + def should(self, should_value): + self.should_value = should_value + return self + + def must_not(self, mustnot_value): + self.mustnot_value = mustnot_value + return self + + def filter(self, filter_value): + self.filter_value = filter_value + return self + + @property + def operation(self): + return self._operation + + @operation.setter + def operation(self, value): + self._operation = value + + def build(self): + if self.must_value: + must_clause = self.must_value or {} + if self.should_value: + should_clause = self.should_value or {} + if self.mustnot_value: + mustnot_clause = self.mustnot_value or {} + if self.filter_value: + filter_clause = self.filter_value or {} + else: + filter_clause = {} + + # to figure out how to deal with logical operations + # return match_clause & range_clause + clauses = must_clause or should_clause or mustnot_clause + filters = filter_clause + if self.operation == 'and': + res = Q( + 'bool', + must=[clause for clause in clauses], + filter=[filter for filter in filters], + ) + elif self.operation == 'or': + res = Q( + 'bool', + should=[clause for clause in clauses], + filter=[filter for filter in filters], + ) + elif self.operation == 'not': + res = Q( + 'bool', + must_not=[clause for clause in clauses], + filter=[filter for filter in filters], + ) + else: + if filters: + res = Q('bool', must=[clauses], filter=[filters]) + else: + res = Q('bool', must=[clauses]) + + return res + + +def _build_query(q, cql): + + # this would be handled by the AST with the traverse of CQL model + op, node = get_next_node(cql.__root__) + q.operation = op + if isinstance(node, list): + query_list = [] + for elem in node: + op, next_node = get_next_node(elem) + if not getattr(next_node, 'between', 0) == 0: + property = next_node.between.value.__root__.__root__.property + lower = next_node.between.lower.__root__.__root__ + upper = next_node.between.upper.__root__.__root__ + query_list.append( + Q({'range': {f'{property}': {'gte': lower, 'lte': upper}}}) + ) + if not getattr(next_node, '__root__', 0) == 0: + scalars = tuple(next_node.__root__.eq.__root__) + property = scalars[0].__root__.property + value = scalars[1].__root__.__root__ + query_list.append(Q({'match': {f'{property}': f'{value}'}})) + q.must(query_list) + elif not getattr(node, 'between', 0) == 0: + property = node.between.value.__root__.__root__.property + lower = None + if not getattr(node.between.lower, '__root__', 0) == 0: + lower = node.between.lower.__root__.__root__ + upper = None + if not getattr(node.between.upper, '__root__', 0) == 0: + upper = node.between.upper.__root__.__root__ + query = Q({'range': {f'{property}': {'gte': lower, 'lte': upper}}}) + q.must(query) + elif not getattr(node, '__root__', 0) == 0: + next_op, next_node = get_next_node(node) + if not getattr(next_node, 'eq', 0) == 0: + scalars = tuple(next_node.eq.__root__) + property = scalars[0].__root__.property + value = scalars[1].__root__.__root__ + query = Q({'match': {f'{property}': f'{value}'}}) + q.must(query) + elif not getattr(node, 'intersects', 0) == 0: + property = node.intersects.__root__[0].__root__.property + if property == 'geometry': + geom_type = node.intersects.__root__[ + 1 + ].__root__.__root__.__root__.type + if geom_type == 'Polygon': + coordinates = node.intersects.__root__[ + 1 + ].__root__.__root__.__root__.coordinates + coords_list = [ + poly_coords.__root__ for poly_coords in coordinates[0] + ] + filter_ = Q( + { + 'geo_shape': { + 'geometry': { + 'shape': { + 'type': 'envelope', + 'coordinates': get_envelope(coords_list), + }, + 'relation': 'intersects', + } + } + } + ) + query_all = Q({'match_all': {}}) + q.must(query_all) + q.filter(filter_) + return q.build() + + +def update_query(input_query: Dict, cql: CQLModel): + s = Search.from_dict(input_query) + query = ESQueryBuilder() + output_query = _build_query(query, cql) + s = s.query(output_query) + + LOGGER.debug(f'Enhanced query: {json.dumps(s.to_dict())}') + return s.to_dict() diff --git a/msc_pygeoapi/util.py b/msc_pygeoapi/util.py index 27c09dca..b1604294 100644 --- a/msc_pygeoapi/util.py +++ b/msc_pygeoapi/util.py @@ -119,6 +119,27 @@ def _get_element(node, path, attrib=None): return None +def safe_cast_to_number(value): + """ + helper function to safely cast a value to a number + + :param value: value to cast + + :returns: value cast to int/float or original value if not castable + """ + + if value is None: + return value + + try: + return int(value) + except ValueError: + try: + return float(value) + except ValueError: + return value + + def strftime_rfc3339(datetimeobj): """ helper function to convert datetime object to RFC3393 compliant string.