From 8e51bf443ce275b4589a98a820680c685f3c4329 Mon Sep 17 00:00:00 2001 From: Etienne Pelletier Date: Fri, 26 Apr 2024 17:33:07 +0000 Subject: [PATCH] update citypageweather realtime loader --- deploy/default/msc-pygeoapi-config.yml | 63 ++++++++++++++ .../loader/citypageweather_realtime.py | 82 +++++++++++++++---- 2 files changed, 127 insertions(+), 18 deletions(-) diff --git a/deploy/default/msc-pygeoapi-config.yml b/deploy/default/msc-pygeoapi-config.yml index e5fa2d72..80c7fe8f 100644 --- a/deploy/default/msc-pygeoapi-config.yml +++ b/deploy/default/msc-pygeoapi-config.yml @@ -679,6 +679,69 @@ resources: - DAYS_WITH_VALID_SUNSHINE - COOLING_DEGREE_DAYS - HEATING_DEGREE_DAYS + current-conditions: + type: collection + title: + en: Current Weather Conditions - City Page Weather + fr: Conditions météorologiques actuelles - Prévisions météorologiques par ville + description: + en: Current conditions for select Canadian cities + fr: Conditions actuelles pour une sélection de villes canadiennes + keywords: + en: [Meteorology, Weather] + fr: [Météorologie, Temps] + crs: + - CRS84 + links: + - type: text/html + rel: canonical + title: + en: Weather forecast files by city + fr: Prévisions météorologiques par ville + href: + en: https://eccc-msc.github.io/open-data/msc-data/citypage-weather/readme_citypageweather_en/ + fr: https://eccc-msc.github.io/open-data/msc-data/citypage-weather/readme_citypageweather_fr/ + hreflang: + en: en-CA + fr: fr-CA + extents: + spatial: + bbox: [-145.27, 37.3, -48.11, 87.61] + crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84 + temporal: + begin: null + end: null # or empty + providers: + - type: feature + default: true + name: Elasticsearch + data: ${MSC_PYGEOAPI_ES_URL}/current_conditions + id_field: identifier + time_field: timestamp + properties: + - identifier + - name + - station_en + - station_fr + - icon + - cond_en + - cond_fr + - temp + - dewpoint + - pres_en + - pres_fr + - prestnd_en + - prestnd_fr + - rel_hum + - speed + - direction_en + - direction_fr + - bearing + - timestamp + - url_en + - url_fr + - national + - nom climate-daily: type: collection diff --git a/msc_pygeoapi/loader/citypageweather_realtime.py b/msc_pygeoapi/loader/citypageweather_realtime.py index 02273699..63b91f75 100644 --- a/msc_pygeoapi/loader/citypageweather_realtime.py +++ b/msc_pygeoapi/loader/citypageweather_realtime.py @@ -260,17 +260,19 @@ def load_data(self, filepath): data = self.xml2json_cpw(wxo_lookup, filepath) - try: - r = self.conn.Elasticsearch.index( - index=INDEX_NAME, - id=data['properties']['identifier'], - body=data - ) - LOGGER.debug(f'Result: {r}') - return True - except Exception as err: - LOGGER.warning(f'Error indexing: {err}') - return False + if data: + try: + r = self.conn.Elasticsearch.update( + index=INDEX_NAME, + id=data['properties']['identifier'], + doc_as_upsert=True, + doc=data, + ) + LOGGER.debug(f'Result: {r}') + return True + except Exception as err: + LOGGER.warning(f'Error indexing: {err}') + return False def _get_element(self, node, path, attrib=None): """ @@ -301,10 +303,13 @@ def if_none(self, type_, value): :returns: converted variable """ - if type_ == 'f': - variable = float(value) if value else 'null' - elif type_ == 'i': - variable = int(value) if value else 'null' + try: + if type_ == 'f': + variable = float(value) if value else 'null' + elif type_ == 'i': + variable = int(value) if value else 'null' + except ValueError: + variable = value return variable @@ -328,8 +333,11 @@ def xml2json_cpw(self, wxo_lookup, xml): root = etree.parse(xml).getroot() except Exception as err: LOGGER.error(f'ERROR: cannot process data: {err}') + return None + + current_conditions = root.findall("currentConditions/") - if root.findall("currentConditions/"): + if current_conditions: sitecode = os.path.basename(xml)[:-6] try: citycode = wxo_lookup[sitecode]['citycode'] @@ -416,7 +424,7 @@ def xml2json_cpw(self, wxo_lookup, xml): 'rel_hum': self.if_none('i', row['rel_hum']), 'speed': self.if_none('i', row['speed']), 'gust': self.if_none('i', row['gust']), - 'direction': row['direction'], + 'direction_en': row['direction'], 'bearing': self.if_none('f', row['bearing']), 'timestamp': row['timestamp'], 'url_en': row['url_en'], @@ -467,7 +475,7 @@ def xml2json_cpw(self, wxo_lookup, xml): 'rel_hum': self.if_none('i', row['rel_hum']), 'speed': self.if_none('i', row['speed']), 'gust': self.if_none('i', row['gust']), - 'direction': row['direction'], + 'direction_fr': row['direction'], 'bearing': self.if_none('f', row['bearing']), 'timestamp': row['timestamp'], 'url_fr': row['url_fr'], @@ -481,6 +489,10 @@ def xml2json_cpw(self, wxo_lookup, xml): conditions['properties'] = {key:val for key, val in conditions['properties'].items() if val != 'null'} # noqa return conditions + else: + LOGGER.warning(f'No current conditions found. Skippping file {xml}.') + return None + @click.group() def citypageweather(): @@ -488,6 +500,39 @@ def citypageweather(): pass +@click.command() +@click.pass_context +@cli_options.OPTION_FILE() +@cli_options.OPTION_DIRECTORY() +@cli_options.OPTION_ELASTICSEARCH() +@cli_options.OPTION_ES_USERNAME() +@cli_options.OPTION_ES_PASSWORD() +@cli_options.OPTION_ES_IGNORE_CERTS() +def add(ctx, file_, directory, es, username, password, ignore_certs): + """adds data to system""" + + if all([file_ is None, directory is None]): + raise click.ClickException('Missing --file/-f or --dir/-d option') + + conn_config = configure_es_connection(es, username, password, ignore_certs) + + files_to_process = [] + + if file_ is not None: + files_to_process = [file_] + elif directory is not None: + for root, dirs, files in os.walk(directory): + for f in [file for file in files if file.endswith('.xml')]: + files_to_process.append(os.path.join(root, f)) + files_to_process.sort(key=os.path.getmtime) + + for file_to_process in files_to_process: + loader = CitypageweatherRealtimeLoader(conn_config) + loader.load_data(file_to_process) + + click.echo('Done') + + @click.command() @click.pass_context @cli_options.OPTION_DAYS( @@ -542,5 +587,6 @@ def delete_index(ctx, es, username, password, ignore_certs): conn.delete(INDEX_NAME) +citypageweather.add_command(add) citypageweather.add_command(clean_records) citypageweather.add_command(delete_index)