Skip to content

Commit

Permalink
initial weather story loader
Browse files Browse the repository at this point in the history
  • Loading branch information
Dukestep committed Dec 9, 2024
1 parent 814204c commit 9034492
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 2 deletions.
14 changes: 14 additions & 0 deletions deploy/default/sarracenia/weather-story.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
broker amqps://[email protected]
exchange xs_snlw001
queue_name q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
topic_prefix v03.post
instances 2

subtopic #

strip 2
directory ${MSC_PYGEOAPI_CACHEDIR}
callback ${MSC_PYGEOAPI_METPX_FLOW_CALLBACK}
logLevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL}
discard True
report False
7 changes: 5 additions & 2 deletions msc_pygeoapi/loader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ def metadata():
('msc_pygeoapi.loader.nwp_dataset_footprints', 'nwp_dataset_footprints'),
('msc_pygeoapi.loader.umos_realtime', 'umos_realtime'),
('msc_pygeoapi.loader.thunderstorm_outlook', 'thunderstorm_outlook'),
('msc_pygeoapi.loader.coastal_flood_risk_index',
'coastal_flood_risk_index')
(
'msc_pygeoapi.loader.coastal_flood_risk_index',
'coastal_flood_risk_index',
),
('msc_pygeoapi.loader.weather_story_realtime', 'weather_story_realtime'),
)

for module, name in commands:
Expand Down
130 changes: 130 additions & 0 deletions msc_pygeoapi/loader/weather_story_realtime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import click
import json

from msc_pygeoapi import cli_options
from msc_pygeoapi.connector.elasticsearch_ import ElasticsearchConnector
from msc_pygeoapi.loader.base import BaseLoader
from msc_pygeoapi.util import configure_es_connection

DAYS_TO_KEEP = 7
INDEX_BASENAME = 'weather_story'
MAPPINGS = {
"properties": {
"geometry": {
"properties": {
"coordinates": {"type": "float"},
"type": {
"type": "text",
"fields": {
"keyword": {"type": "keyword", "ignore_above": 256}
},
},
}
}
}
}

SETTINGS = {
'settings': {'number_of_shards': 1, 'number_of_replicas': 0},
'mappings': MAPPINGS,
}


class WeatherStoryRealtimeLoader(BaseLoader):
def __init__(self, conn_config={}):
"""initializer"""

BaseLoader.__init__(self)

self.conn = ElasticsearchConnector(conn_config)
self.filepath = None

SETTINGS['mappings'] = MAPPINGS
self.conn.create(INDEX_BASENAME, SETTINGS)

def load_data(self, filepath: str) -> bool:
"""
loads data from event to target
:param filepath: filepath to data on disk
:returns: `bool` of status result
"""

with open(filepath, 'r') as fh:
json_data = json.load(fh)

features = json_data.get('features')

for feature in features:
properties = feature.get('properties')
if 'subregion_name' in properties:
id_ = f"{str(properties['prov_name_en'][0])}-{properties['subregion_name']}" # noqa
else:
id_ = f"{str(properties['prov_name_en'][0])}"

self.conn.Elasticsearch.index(
index=f"weather_story",
id=id_,
body=feature,
)

return True


# CLI options to interact manually with Weather Story file for testing
@click.group()
def weather_story_realtime():
"""Manages Weather Story index"""
pass


@click.command()
@click.pass_context
@cli_options.OPTION_FILE()
@cli_options.OPTION_ELASTICSEARCH()
@cli_options.OPTION_ES_USERNAME()
@cli_options.OPTION_ES_PASSWORD()
@cli_options.OPTION_ES_IGNORE_CERTS()
def add(ctx, file_, es, username, password, ignore_certs):
"""add data to system"""

if file_ is None:
raise click.ClickException('Missing --file/-f')

conn_config = configure_es_connection(
es, username, password, ignore_certs
)

loader = WeatherStoryRealtimeLoader(conn_config)
result = loader.load_data(file_)

if not result:
click.echo('features not generated')


@click.command()
@click.pass_context
@cli_options.OPTION_ELASTICSEARCH()
@cli_options.OPTION_ES_USERNAME()
@cli_options.OPTION_ES_PASSWORD()
@cli_options.OPTION_ES_IGNORE_CERTS()
@cli_options.OPTION_YES(prompt='Are you sure you want to delete this index?')
def delete_index(ctx, es, username, password, ignore_certs):
"""Delete Weather Story index"""

conn_config = configure_es_connection(
es, username, password, ignore_certs
)
conn = ElasticsearchConnector(conn_config)

all_indexes = f'{INDEX_BASENAME}'

click.echo(f'Deleting index {all_indexes}')
conn.delete(all_indexes)

click.echo('Done')


weather_story_realtime.add_command(add)
weather_story_realtime.add_command(delete_index)
4 changes: 4 additions & 0 deletions msc_pygeoapi/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@
'coastal_flood_risk_index': {
'filename_pattern': 'CoastalFloodRiskIndex',
'handler': 'msc_pygeoapi.loader.coastal_flood_risk_index.CoastalFloodRiskIndexLoader' # noqa
},
'weather-story': {
'filename_pattern': 'wxstory_',
'handler': 'msc_pygeoapi.loader.weather_story_realtime.WeatherStoryRealtimeLoader' # noqa
}
}
}
Expand Down

0 comments on commit 9034492

Please sign in to comment.