Skip to content

Commit

Permalink
update for sr3 configuration and callback (ECCC-MSC#356)
Browse files Browse the repository at this point in the history
rmv "lp-" from loader for testing

test updated main.yml

minor code updates

update sr3 event handler

flake8

copy paste error

removed changes to main.yml

Co-authored-by: Louis-Philippe Rousseau Lambert <[email protected]>
  • Loading branch information
RousseauLambertLP and Louis-Philippe Rousseau Lambert authored Dec 31, 2024
1 parent 355f9da commit 976aeb8
Show file tree
Hide file tree
Showing 17 changed files with 165 additions and 247 deletions.
4 changes: 2 additions & 2 deletions debian/postinst
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ export MSC_PYGEOAPI_ES_URL=http://localhost:9200
export MSC_PYGEOAPI_CACHEDIR=/tmp
export MSC_PYGEOAPI_OGC_API_URL=https://api.wxod-dev.cmc.ec.gc.ca
export MSC_PYGEOAPI_OGC_API_URL_BASEPATH=/
export MSC_PYGEOAPI_METPX_EVENT_FILE_PY=/opt/msc-pygeoapi/event/file_.py
export MSC_PYGEOAPI_METPX_EVENT_MESSAGE_PY=/opt/msc-pygeoapi/event/message.py
export MSC_PYGEOAPI_METPX_EVENT_FILE_PY=msc_pygeoapi.event.EventAfterWork
export MSC_PYGEOAPI_METPX_EVENT_MESSAGE_PY=msc_pygeoapi.event.EventAfterAccept

export XDG_CACHE_HOME=/tmp/geoadm-sarra-logs

Expand Down
16 changes: 9 additions & 7 deletions deploy/default/sarracenia/aqhi-realtime.conf
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
broker amqps://[email protected]
queue_name q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
exchange xpublic
queueName q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
instances 2

subtopic *.WXO-DD.air_quality.aqhi.*.*.realtime.json.#

mirror True
discard on
report_back False
discard True
report False
directory ${MSC_PYGEOAPI_CACHEDIR}
loglevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL}
plugin ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY}
accept .*
instances 2
logLevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL}
callback ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY}
18 changes: 8 additions & 10 deletions deploy/default/sarracenia/bulletins-realtime.conf
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
broker amqps://[email protected]
queue_name q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
directory /dev/null
exchange xpublic
queueName q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
instances 4
subtopic *.WXO-DD.bulletins.alphanumeric.#
mirror True
notify_only True
accept .*

plugin ${MSC_PYGEOAPI_METPX_EVENT_MESSAGE_PY}

loglevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL}
subtopic *.WXO-DD.bulletins.alphanumeric.#

report_back False
mirror True
directory /tmp/bulletins
callback ${MSC_PYGEOAPI_METPX_EVENT_MESSAGE_PY}
logLevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL}
report False
11 changes: 6 additions & 5 deletions deploy/default/sarracenia/cap-alerts.conf
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
broker amqps://[email protected]
queue_name q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
directory /data/geomet/feeds/hpfx
exchange xpublic
queueName q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
instances 2

subtopic *.WXO-DD.alerts.cap.#

callback ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY}
directory /data/geomet/feeds/hpfx
mirror True
discard True
skip 3
accept .*

plugin ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY}
13 changes: 7 additions & 6 deletions deploy/default/sarracenia/citypageweather.conf
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
broker amqps://[email protected]
queue_name q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
directory ${MSC_PYGEOAPI_CACHEDIR}
exchange xpublic
queueName q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
instances 2

subtopic *.WXO-DD.citypage_weather.xml.#

directory ${MSC_PYGEOAPI_CACHEDIR}
callback ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY}
mirror True
discard True
slip 3
accept .*

plugin ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY}
strip 3
22 changes: 8 additions & 14 deletions deploy/default/sarracenia/coastal-flood-risk-index.conf
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
# TODO: Update for HPFX once the data is on HPFX
broker amqps://anonymous:[email protected]
queue_name q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
exchange xpublic
queueName q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
instances 2

subtopic coastal-flooding.risk-index.#

mirror True

discard on

plugin ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY}

directory ${MSC_PYGEOAPI_CACHEDIR}

loglevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL}

report_back False

instances 2
accept .*
callback ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY}
mirror True
discard True
logLevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL}
report False
11 changes: 6 additions & 5 deletions deploy/default/sarracenia/hurricanes.conf
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
broker amqps://[email protected]
queue_name q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
directory ${MSC_PYGEOAPI_CACHEDIR}
exchange xpublic
queueName q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
instances 2

subtopic *.WXO-DD.hurricanes.#

directory ${MSC_PYGEOAPI_CACHEDIR}
callback ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY}
mirror True
discard True
slip 3
accept .*

plugin ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY}
21 changes: 9 additions & 12 deletions deploy/default/sarracenia/hydrometric-realtime.conf
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
broker amqps://[email protected]
queue_name q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
subtopic *.WXO-DD.hydrometric.#
exchange xpublic
queueName q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
instances 4
reject .*_.*_hourly_.*

discard on

plugin ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY}
subtopic *.WXO-DD.hydrometric.#

directory ${MSC_PYGEOAPI_CACHEDIR}

loglevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL}

report_back False

callback ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY}
logLevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL}
discard True
report False
skip 3

reject .*_.*_hourly_.*
accept .*hydrometric_StationList.csv

accept .*_.*_daily_.*
accept .*_hourly_.*
acceptUnmatched False
15 changes: 9 additions & 6 deletions deploy/default/sarracenia/marine_weather.conf
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
broker amqps://[email protected]
queue_name q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
directory /data/geomet/feeds/hpfx
exchange xpublic
queueName q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
instances 2

subtopic *.WXO-DD.marine_weather.xml.#

directory /data/geomet/feeds/hpfx
callback ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY}
mirror True
discard True
reject .*regionList.xml
skip 3
accept .*
plugin ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY}
chmod_log 0644
permLog 0644

reject .*regionList.xml
18 changes: 10 additions & 8 deletions deploy/default/sarracenia/metnotes.conf
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
broker amqps://[email protected]
queue_name q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
exchange xpublic
queueName q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
instances 2

subtopic *.WXO-DD.metnotes.#
mirror True
discard on
report_back False

directory ${MSC_PYGEOAPI_CACHEDIR}
loglevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL}
plugin ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY}
accept .*
instances 2
callback ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY}}
mirror True
discard True
report False
logLevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL}
22 changes: 8 additions & 14 deletions deploy/default/sarracenia/swob-realtime.conf
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
broker amqps://[email protected]
queue_name q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
queueName q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
exchange xpublic
instances 4

subtopic *.WXO-DD.observations.swob-ml.#

mirror True

discard on

plugin ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY}

directory ${MSC_PYGEOAPI_CACHEDIR}

loglevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL}

report_back False

instances 4
callback ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY}
mirror True
discard True
logLevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL}
report False
skip 3
accept .*
22 changes: 8 additions & 14 deletions deploy/default/sarracenia/thunderstorm-outlook.conf
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
# TODO: Update for HPFX once the data is on HPFX
broker amqps://anonymous:[email protected]
queue_name q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
exchange xpublic
queueName q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
instances 2

subtopic thunderstorm-outlooks.#

mirror True

discard on

plugin ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY}

directory ${MSC_PYGEOAPI_CACHEDIR}

loglevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL}

report_back False

instances 2
accept .*
callback ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY}
mirror True
discard True
logLevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL}
report False
22 changes: 8 additions & 14 deletions deploy/default/sarracenia/umos-realtime.conf
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
broker amqps://[email protected]
queue_name q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
exchange xpublic
queueName q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME}
instances 4

subtopic *.WXO-DD.model_gem_global.stat-post-processing.#
subtopic *.WXO-DD.model_gem_regional.stat-post-processing.#

mirror True

discard on

plugin ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY}

directory ${MSC_PYGEOAPI_CACHEDIR}

loglevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL}

report_back False

instances 4
accept .*
callback ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY}
mirror True
discard True
logLevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL}
report False
4 changes: 2 additions & 2 deletions msc-pygeoapi.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ export MSC_PYGEOAPI_ES_URL=http://${MSC_PYGEOAPI_ES_USERNAME}:${MSC_PYGEOAPI_ES_
export MSC_PYGEOAPI_CACHEDIR=/tmp
export MSC_PYGEOAPI_OGC_API_URL=https://api.wxod-dev.cmc.ec.gc.ca
export MSC_PYGEOAPI_OGC_API_URL_BASEPATH=/
export MSC_PYGEOAPI_METPX_EVENT_FILE_PY=/opt/msc-pygeoapi/event/file_.py
export MSC_PYGEOAPI_METPX_EVENT_MESSAGE_PY=/opt/msc-pygeoapi/event/message.py
export MSC_PYGEOAPI_METPX_EVENT_FILE_PY=msc_pygeoapi.event.EventAfterWork
export MSC_PYGEOAPI_METPX_EVENT_MESSAGE_PY=msc_pygeoapi.event.EventAfterAccept
export MSC_PYGEOAPI_TEMPLATES=theme/templates
export MSC_PYGEOAPI_STATIC=theme/static
export MSC_PYGEOAPI_LOCALE=locale
Expand Down
65 changes: 65 additions & 0 deletions msc_pygeoapi/event/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
# =================================================================
#
# Author: Tom Kralidis <[email protected]>
# Louis-Philippe Rousseau-Lambert
# <[email protected]>
# Etienne Pelletier <[email protected]>
#
# Copyright (c) 2021 Tom Kralidis
# Copyright (c) 2024 Louis-Philippe Rousseau-Lambert
# Copyright (c) 2024 Etienne Pelletier
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
Expand All @@ -26,3 +31,63 @@
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================
import logging

from sarracenia.flowcb import FlowCB

LOGGER = logging.getLogger(__name__)


class EventBase(FlowCB):

def process_messages(self, worklist) -> bool:
"""
Process messages from the worklist
:param worklist: `sarracenia.flow.worklist`
:returns: `bool`
"""

for msg in worklist.incoming:

try:
from msc_pygeoapi.handler.core import CoreHandler

filepath = f"{msg['new_dir']}/{msg['new_file']}"
LOGGER.debug(f'Filepath: {filepath}')
handler = CoreHandler(filepath)
result = handler.handle()
LOGGER.debug(f'Result: {result}')
except Exception as err:
LOGGER.error(f'Error handling message: {err}')
worklist.failed.append(msg)
return False

return True


class EventAfterWork(EventBase):

def after_work(self, worklist) -> None:
"""
sarracenia after_work dispatcher
:param worklist: `sarracenia.flow.worklist`
:returns: `bool`
"""
return self.process_messages(worklist)


class EventAfterAccept(EventBase):

def after_accept(self, worklist) -> None:
"""
sarracenia after_accept dispatcher
:param worklist: `sarracenia.flow.worklist`
:returns: `bool`
"""
return self.process_messages(worklist)
Loading

0 comments on commit 976aeb8

Please sign in to comment.