From 277831b95ef433c010aedfd4e18330c3da3a94f1 Mon Sep 17 00:00:00 2001 From: Etienne Pelletier Date: Thu, 28 Nov 2024 18:10:53 +0000 Subject: [PATCH] initial weather story loader --- deploy/default/sarracenia/aqhi-realtime.conf | 2 +- .../sarracenia/bulletins-realtime.conf | 2 +- deploy/default/sarracenia/cap-alerts.conf | 2 +- .../default/sarracenia/citypageweather.conf | 2 +- .../sarracenia/coastal-flood-risk-index.conf | 2 +- deploy/default/sarracenia/hurricanes.conf | 2 +- .../sarracenia/hydrometric-realtime.conf | 2 +- deploy/default/sarracenia/marine_weather.conf | 2 +- deploy/default/sarracenia/metnotes.conf | 2 +- deploy/default/sarracenia/swob-realtime.conf | 2 +- .../sarracenia/thunderstorm-outlook.conf | 2 +- deploy/default/sarracenia/umos-realtime.conf | 2 +- deploy/default/sarracenia/weather-story.conf | 13 ++ msc-pygeoapi.env | 1 - msc_pygeoapi/event/__init__.py | 65 +++++++++ msc_pygeoapi/event/event_plugin.py | 73 ---------- msc_pygeoapi/loader/__init__.py | 7 +- msc_pygeoapi/loader/weather_story_realtime.py | 130 ++++++++++++++++++ msc_pygeoapi/plugin.py | 4 + 19 files changed, 229 insertions(+), 88 deletions(-) create mode 100644 deploy/default/sarracenia/weather-story.conf delete mode 100644 msc_pygeoapi/event/event_plugin.py create mode 100644 msc_pygeoapi/loader/weather_story_realtime.py diff --git a/deploy/default/sarracenia/aqhi-realtime.conf b/deploy/default/sarracenia/aqhi-realtime.conf index d2bb5da7..71208b5f 100644 --- a/deploy/default/sarracenia/aqhi-realtime.conf +++ b/deploy/default/sarracenia/aqhi-realtime.conf @@ -10,4 +10,4 @@ discard True report False directory ${MSC_PYGEOAPI_CACHEDIR} logLevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL} -callback ${MSC_PYGEOAPI_METPX_FLOW_CALLBACK} +callback msc_pygeoapi.event.EventAfterWork diff --git a/deploy/default/sarracenia/bulletins-realtime.conf b/deploy/default/sarracenia/bulletins-realtime.conf index 45bca4a5..8eb01f61 100644 --- a/deploy/default/sarracenia/bulletins-realtime.conf +++ b/deploy/default/sarracenia/bulletins-realtime.conf @@ -7,6 +7,6 @@ subtopic *.WXO-DD.bulletins.alphanumeric.# mirror True directory /tmp/bulletins -callback ${MSC_PYGEOAPI_METPX_FLOW_CALLBACK} +callback msc_pygeoapi.event.EventAfterAccept logLevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL} report False diff --git a/deploy/default/sarracenia/cap-alerts.conf b/deploy/default/sarracenia/cap-alerts.conf index 988cc5dc..c5a07631 100644 --- a/deploy/default/sarracenia/cap-alerts.conf +++ b/deploy/default/sarracenia/cap-alerts.conf @@ -5,7 +5,7 @@ instances 2 subtopic *.WXO-DD.alerts.cap.# -callback ${MSC_PYGEOAPI_METPX_FLOW_CALLBACK} +callback msc_pygeoapi.event.EventAfterWork directory /data/geomet/feeds/hpfx mirror True discard True diff --git a/deploy/default/sarracenia/citypageweather.conf b/deploy/default/sarracenia/citypageweather.conf index 1f04184a..4151d627 100644 --- a/deploy/default/sarracenia/citypageweather.conf +++ b/deploy/default/sarracenia/citypageweather.conf @@ -6,7 +6,7 @@ instances 2 subtopic *.WXO-DD.citypage_weather.xml.# directory ${MSC_PYGEOAPI_CACHEDIR} -callback ${MSC_PYGEOAPI_METPX_FLOW_CALLBACK} +callback msc_pygeoapi.event.EventAfterWork mirror True discard True strip 3 diff --git a/deploy/default/sarracenia/coastal-flood-risk-index.conf b/deploy/default/sarracenia/coastal-flood-risk-index.conf index e07777d5..d9b4982a 100644 --- a/deploy/default/sarracenia/coastal-flood-risk-index.conf +++ b/deploy/default/sarracenia/coastal-flood-risk-index.conf @@ -7,7 +7,7 @@ instances 2 subtopic coastal-flooding.risk-index.# directory ${MSC_PYGEOAPI_CACHEDIR} -callback ${MSC_PYGEOAPI_METPX_FLOW_CALLBACK} +callback msc_pygeoapi.event.EventAfterWork mirror True discard True logLevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL} diff --git a/deploy/default/sarracenia/hurricanes.conf b/deploy/default/sarracenia/hurricanes.conf index 79d7200b..5cb3ed64 100644 --- a/deploy/default/sarracenia/hurricanes.conf +++ b/deploy/default/sarracenia/hurricanes.conf @@ -6,7 +6,7 @@ instances 2 subtopic *.WXO-DD.hurricanes.# directory ${MSC_PYGEOAPI_CACHEDIR} -callback ${MSC_PYGEOAPI_METPX_FLOW_CALLBACK} +callback msc_pygeoapi.event.EventAfterWork mirror True discard True slip 3 diff --git a/deploy/default/sarracenia/hydrometric-realtime.conf b/deploy/default/sarracenia/hydrometric-realtime.conf index f7cf54b6..7503884d 100644 --- a/deploy/default/sarracenia/hydrometric-realtime.conf +++ b/deploy/default/sarracenia/hydrometric-realtime.conf @@ -6,7 +6,7 @@ instances 4 subtopic *.WXO-DD.hydrometric.# directory ${MSC_PYGEOAPI_CACHEDIR} -callback ${MSC_PYGEOAPI_METPX_FLOW_CALLBACK} +callback msc_pygeoapi.event.EventAfterWork logLevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL} discard True report False diff --git a/deploy/default/sarracenia/marine_weather.conf b/deploy/default/sarracenia/marine_weather.conf index 07961c75..774675e9 100644 --- a/deploy/default/sarracenia/marine_weather.conf +++ b/deploy/default/sarracenia/marine_weather.conf @@ -6,7 +6,7 @@ instances 2 subtopic *.WXO-DD.marine_weather.xml.# directory /data/geomet/feeds/hpfx -callback ${MSC_PYGEOAPI_METPX_FLOW_CALLBACK} +callback msc_pygeoapi.event.EventAfterWork mirror True discard True skip 3 diff --git a/deploy/default/sarracenia/metnotes.conf b/deploy/default/sarracenia/metnotes.conf index bf9799b6..042209ec 100644 --- a/deploy/default/sarracenia/metnotes.conf +++ b/deploy/default/sarracenia/metnotes.conf @@ -6,7 +6,7 @@ instances 2 subtopic *.WXO-DD.metnotes.# directory ${MSC_PYGEOAPI_CACHEDIR} -callback ${MSC_PYGEOAPI_METPX_FLOW_CALLBACK} +callback msc_pygeoapi.event.EventAfterWork mirror True discard True report False diff --git a/deploy/default/sarracenia/swob-realtime.conf b/deploy/default/sarracenia/swob-realtime.conf index 1348c0db..8c070b2a 100644 --- a/deploy/default/sarracenia/swob-realtime.conf +++ b/deploy/default/sarracenia/swob-realtime.conf @@ -6,7 +6,7 @@ instances 4 subtopic *.WXO-DD.observations.swob-ml.# directory ${MSC_PYGEOAPI_CACHEDIR} -callback ${MSC_PYGEOAPI_METPX_FLOW_CALLBACK} +callback msc_pygeoapi.event.EventAfterWork mirror True discard True logLevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL} diff --git a/deploy/default/sarracenia/thunderstorm-outlook.conf b/deploy/default/sarracenia/thunderstorm-outlook.conf index 0ee16906..692a66a9 100644 --- a/deploy/default/sarracenia/thunderstorm-outlook.conf +++ b/deploy/default/sarracenia/thunderstorm-outlook.conf @@ -7,7 +7,7 @@ instances 2 subtopic thunderstorm-outlooks.# directory ${MSC_PYGEOAPI_CACHEDIR} -callback ${MSC_PYGEOAPI_METPX_FLOW_CALLBACK} +callback msc_pygeoapi.event.EventAfterWork mirror True discard True logLevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL} diff --git a/deploy/default/sarracenia/umos-realtime.conf b/deploy/default/sarracenia/umos-realtime.conf index 4b907a61..0a043583 100644 --- a/deploy/default/sarracenia/umos-realtime.conf +++ b/deploy/default/sarracenia/umos-realtime.conf @@ -7,7 +7,7 @@ subtopic *.WXO-DD.model_gem_global.stat-post-processing.# subtopic *.WXO-DD.model_gem_regional.stat-post-processing.# directory ${MSC_PYGEOAPI_CACHEDIR} -callback ${MSC_PYGEOAPI_METPX_FLOW_CALLBACK} +callback msc_pygeoapi.event.EventAfterWork mirror True discard True logLevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL} diff --git a/deploy/default/sarracenia/weather-story.conf b/deploy/default/sarracenia/weather-story.conf new file mode 100644 index 00000000..f92a9b7f --- /dev/null +++ b/deploy/default/sarracenia/weather-story.conf @@ -0,0 +1,13 @@ +broker amqps://snlw001@goc-dx.science.gc.ca +exchange xs_snlw001 +queue_name q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME} +topic_prefix v03.post +instances 2 + +subtopic # + +strip 2 +directory ${MSC_PYGEOAPI_CACHEDIR} +callback msc_pygeoapi.event.EventAfterWork +logLevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL} +report False diff --git a/msc-pygeoapi.env b/msc-pygeoapi.env index 7138c3d3..dcc3b8b1 100644 --- a/msc-pygeoapi.env +++ b/msc-pygeoapi.env @@ -8,7 +8,6 @@ export MSC_PYGEOAPI_ES_URL=http://${MSC_PYGEOAPI_ES_USERNAME}:${MSC_PYGEOAPI_ES_ export MSC_PYGEOAPI_CACHEDIR=/tmp export MSC_PYGEOAPI_OGC_API_URL=https://api.wxod-dev.cmc.ec.gc.ca export MSC_PYGEOAPI_OGC_API_URL_BASEPATH=/ -export MSC_PYGEOAPI_METPX_FLOW_CALLBACK=msc_pygeoapi.event.event_plugin.Event export MSC_PYGEOAPI_TEMPLATES=theme/templates export MSC_PYGEOAPI_STATIC=theme/static export MSC_PYGEOAPI_LOCALE=locale diff --git a/msc_pygeoapi/event/__init__.py b/msc_pygeoapi/event/__init__.py index 9394230c..1ed01c4c 100644 --- a/msc_pygeoapi/event/__init__.py +++ b/msc_pygeoapi/event/__init__.py @@ -1,8 +1,13 @@ # ================================================================= # # Author: Tom Kralidis +# Louis-Philippe Rousseau-Lambert +# +# Etienne Pelletier # # Copyright (c) 2021 Tom Kralidis +# Copyright (c) 2024 Louis-Philippe Rousseau-Lambert +# Copyright (c) 2024 Etienne Pelletier # # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation @@ -26,3 +31,63 @@ # OTHER DEALINGS IN THE SOFTWARE. # # ================================================================= +import logging + +from sarracenia.flowcb import FlowCB + +LOGGER = logging.getLogger(__name__) + + +class EventBase(FlowCB): + + def process_messages(self, worklist) -> bool: + """ + Process messages from the worklist + + :param worklist: `sarracenia.flow.worklist` + + :returns: `bool` + """ + + for msg in worklist.incoming: + + try: + from msc_pygeoapi.handler.core import CoreHandler + + filepath = f"{msg['new_dir']}/{msg['new_file']}" + LOGGER.debug(f'Filepath: {filepath}') + handler = CoreHandler(filepath) + result = handler.handle() + LOGGER.debug(f'Result: {result}') + except Exception as err: + LOGGER.error(f'Error handling message: {err}') + worklist.failed.append(msg) + return False + + return True + + +class EventAfterWork(EventBase): + + def after_work(self, worklist) -> None: + """ + sarracenia after_work dispatcher + + :param worklist: `sarracenia.flow.worklist` + + :returns: `bool` + """ + return self.process_messages(worklist) + + +class EventAfterAccept(EventBase): + + def after_accept(self, worklist) -> None: + """ + sarracenia after_accept dispatcher + + :param worklist: `sarracenia.flow.worklist` + + :returns: `bool` + """ + return self.process_messages(worklist) diff --git a/msc_pygeoapi/event/event_plugin.py b/msc_pygeoapi/event/event_plugin.py deleted file mode 100644 index 14c24b5e..00000000 --- a/msc_pygeoapi/event/event_plugin.py +++ /dev/null @@ -1,73 +0,0 @@ -# ================================================================= -# -# Author: Tom Kralidis -# Louis-Philippe Rousseau-Lambert -# -# Etienne Pelletier -# -# Copyright (c) 2023 Tom Kralidis -# Copyright (c) 2024 Louis-Philippe Rousseau-Lambert -# Copyright (c) 2024 Etienne Pelletier -# -# Permission is hereby granted, free of charge, to any person -# obtaining a copy of this software and associated documentation -# files (the "Software"), to deal in the Software without -# restriction, including without limitation the rights to use, -# copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the -# Software is furnished to do so, subject to the following -# conditions: -# -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES -# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT -# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR -# OTHER DEALINGS IN THE SOFTWARE. -# -# ================================================================= -import logging - -from sarracenia.flowcb import FlowCB - -LOGGER = logging.getLogger(__name__) - - -class Event(FlowCB): - """sr3 plugin callback""" - - # replace the previous sr2 on_file event - def after_work(self, worklist) -> None: - """ - sarracenia dispatcher - - :param worklist: `sarracenia.flowcb` - - :returns: None - """ - - for msg in worklist.incoming: - - try: - from msc_pygeoapi.handler.core import CoreHandler - - filepath = f"{msg['new_dir']}/{msg['new_file']}" - LOGGER.debug(f'Filepath: {filepath}') - handler = CoreHandler(filepath) - result = handler.handle() - LOGGER.debug(f'Result: {result}') - except Exception as err: - LOGGER.error(f'Error handling message: {err}') - worklist.failed.append(msg) - return False - - # replace the previous sr2 on_message event - after_accept = after_work - - def __repr__(self): - return '' diff --git a/msc_pygeoapi/loader/__init__.py b/msc_pygeoapi/loader/__init__.py index c0e8f868..7ac1bc2c 100644 --- a/msc_pygeoapi/loader/__init__.py +++ b/msc_pygeoapi/loader/__init__.py @@ -72,8 +72,11 @@ def metadata(): ('msc_pygeoapi.loader.nwp_dataset_footprints', 'nwp_dataset_footprints'), ('msc_pygeoapi.loader.umos_realtime', 'umos_realtime'), ('msc_pygeoapi.loader.thunderstorm_outlook', 'thunderstorm_outlook'), - ('msc_pygeoapi.loader.coastal_flood_risk_index', - 'coastal_flood_risk_index') + ( + 'msc_pygeoapi.loader.coastal_flood_risk_index', + 'coastal_flood_risk_index', + ), + ('msc_pygeoapi.loader.weather_story_realtime', 'weather_story_realtime'), ) for module, name in commands: diff --git a/msc_pygeoapi/loader/weather_story_realtime.py b/msc_pygeoapi/loader/weather_story_realtime.py new file mode 100644 index 00000000..c9cd969c --- /dev/null +++ b/msc_pygeoapi/loader/weather_story_realtime.py @@ -0,0 +1,130 @@ +import click +import json + +from msc_pygeoapi import cli_options +from msc_pygeoapi.connector.elasticsearch_ import ElasticsearchConnector +from msc_pygeoapi.loader.base import BaseLoader +from msc_pygeoapi.util import configure_es_connection + +DAYS_TO_KEEP = 7 +INDEX_BASENAME = 'weather_story' +MAPPINGS = { + "properties": { + "geometry": { + "properties": { + "coordinates": {"type": "float"}, + "type": { + "type": "text", + "fields": { + "keyword": {"type": "keyword", "ignore_above": 256} + }, + }, + } + } + } +} + +SETTINGS = { + 'settings': {'number_of_shards': 1, 'number_of_replicas': 0}, + 'mappings': MAPPINGS, +} + + +class WeatherStoryRealtimeLoader(BaseLoader): + def __init__(self, conn_config={}): + """initializer""" + + BaseLoader.__init__(self) + + self.conn = ElasticsearchConnector(conn_config) + self.filepath = None + + SETTINGS['mappings'] = MAPPINGS + self.conn.create(INDEX_BASENAME, SETTINGS) + + def load_data(self, filepath: str) -> bool: + """ + loads data from event to target + + :param filepath: filepath to data on disk + + :returns: `bool` of status result + """ + + with open(filepath, 'r') as fh: + json_data = json.load(fh) + + features = json_data.get('features') + + for feature in features: + properties = feature.get('properties') + if 'subregion_name' in properties: + id_ = f"{str(properties['prov_name_en'][0])}-{properties['subregion_name']}" # noqa + else: + id_ = f"{str(properties['prov_name_en'][0])}" + + self.conn.Elasticsearch.index( + index="weather_story", + id=id_, + body=feature, + ) + + return True + + +# CLI options to interact manually with Weather Story file for testing +@click.group() +def weather_story_realtime(): + """Manages Weather Story index""" + pass + + +@click.command() +@click.pass_context +@cli_options.OPTION_FILE() +@cli_options.OPTION_ELASTICSEARCH() +@cli_options.OPTION_ES_USERNAME() +@cli_options.OPTION_ES_PASSWORD() +@cli_options.OPTION_ES_IGNORE_CERTS() +def add(ctx, file_, es, username, password, ignore_certs): + """add data to system""" + + if file_ is None: + raise click.ClickException('Missing --file/-f') + + conn_config = configure_es_connection( + es, username, password, ignore_certs + ) + + loader = WeatherStoryRealtimeLoader(conn_config) + result = loader.load_data(file_) + + if not result: + click.echo('features not generated') + + +@click.command() +@click.pass_context +@cli_options.OPTION_ELASTICSEARCH() +@cli_options.OPTION_ES_USERNAME() +@cli_options.OPTION_ES_PASSWORD() +@cli_options.OPTION_ES_IGNORE_CERTS() +@cli_options.OPTION_YES(prompt='Are you sure you want to delete this index?') +def delete_index(ctx, es, username, password, ignore_certs): + """Delete Weather Story index""" + + conn_config = configure_es_connection( + es, username, password, ignore_certs + ) + conn = ElasticsearchConnector(conn_config) + + all_indexes = f'{INDEX_BASENAME}' + + click.echo(f'Deleting index {all_indexes}') + conn.delete(all_indexes) + + click.echo('Done') + + +weather_story_realtime.add_command(add) +weather_story_realtime.add_command(delete_index) diff --git a/msc_pygeoapi/plugin.py b/msc_pygeoapi/plugin.py index e702137c..587a612a 100644 --- a/msc_pygeoapi/plugin.py +++ b/msc_pygeoapi/plugin.py @@ -91,6 +91,10 @@ 'coastal_flood_risk_index': { 'filename_pattern': 'CoastalFloodRiskIndex', 'handler': 'msc_pygeoapi.loader.coastal_flood_risk_index.CoastalFloodRiskIndexLoader' # noqa + }, + 'weather-story': { + 'filename_pattern': 'wxstory_', + 'handler': 'msc_pygeoapi.loader.weather_story_realtime.WeatherStoryRealtimeLoader' # noqa } } }