Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add wis2 compliant mqtt publishing #223

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ services:
volumes:
# - ts-data:/home/postgres/pgdata/data # for timescale image
- ts-data:/var/lib/postgresql # for postgres image
- ./datastore/database/extra.conf:/etc/conf_settings/extra.conf:ro # Extra Postgres configuration
- ./datastore/database/extra.conf:/etc/conf_settings/extra.conf:ro # Extra Postgres configuration
- ./datastore/database/healthcheck_postgis_uptime.sh:/healthcheck_postgis_uptime.sh:ro # for the healthcheck
environment:
- EXTRA_CONF_DIR=/etc/conf_settings
Expand Down Expand Up @@ -105,10 +105,17 @@ services:
environment:
- DSHOST=${DSHOST:-store}
- DSPORT=${DSPORT:-50050}
- MQTT_HOST=${MQTT_HOST}
- MQTT_HOST=${MQTT_HOST:-mqtt}
- MQTT_USERNAME=${MQTT_USERNAME}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • MQTT_USERNAME=${MQTT_USERNAME:-}
  • MQTT_PASSWORD=${MQTT_PASSWORD:-}

- MQTT_PASSWORD=${MQTT_PASSWORD}
- MQTT_TLS=True
- MQTT_PORT=${MQTT_PORT:-1883}
- MQTT_TLS=${MQTT_TLS:-False}
- WIS2_TOPIC=${WIS2_TOPIC:-'TEMP_TOPIC'}
- WIS2_MQTT_HOST=${MQTT_HOST:-mqtt_wis2}
- WIS2_MQTT_USERNAME=${MQTT_USERNAME:-}
- WIS2_MQTT_PASSWORD=${MQTT_PASSWORD:-}
- WIS2_MQTT_TLS=${MQTT_TLS:-False}
- WIS2_MQTT_PORT=${MQTT_PORT:-1884}
- INGEST_LOGLEVEL
- GUNICORN_CMD_ARGS=--bind 0.0.0.0:8001 --workers=4 --access-logfile -
depends_on:
Expand All @@ -123,6 +130,23 @@ services:
volumes:
- ./ingest/test/output:/app/output

mqtt:
image: eclipse-mosquitto
restart: unless-stopped
ports:
- "1883:1883"
volumes:
- ./mosquitto:/etc/mosquitto
- ./mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf

mqtt_wis2:
image: eclipse-mosquitto
restart: unless-stopped
ports:
- "1884:1884"
volumes:
- ./mosquitto:/etc/mosquitto
- ./mosquitto/mosquitto_wis2.conf:/mosquitto/config/mosquitto.conf
client:
profiles: ["test"]
build:
Expand Down Expand Up @@ -208,7 +232,12 @@ services:
depends_on:
db:
condition: service_healthy
command: ["--collector.stat_statements", "--collector.stat_user_tables", "--collector.stat_activity_autovacuum"]
command:
[
"--collector.stat_statements",
"--collector.stat_user_tables",
"--collector.stat_activity_autovacuum",
]

grafana:
profiles: ["monitoring"]
Expand Down
12 changes: 10 additions & 2 deletions ingest/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# e-soh-event-queue

## Enviornment variables

| Variable | Default Value | Description |
Expand All @@ -9,17 +10,24 @@
| `MQTT_USERNAME` | | Username for authentication with the MQTT broker. |
| `MQTT_PASSWORD` | | Password for authentication with the MQTT broker. |
| `MQTT_TLS` | `True` | Whether to use TLS (True/False) for the MQTT connection. Defaults to `True`.|
| `WIS2_MQTT_HOST` | | Host address for the MQTT broker. |
| `WIS2_MQTT_USERNAME` | | Username for authentication with the MQTT broker. |
| `WIS2_MQTT_PASSWORD` | | Password for authentication with the MQTT broker. |
| `WIS2_MQTT_TLS` | `True` | Whether to use TLS (True/False) for the MQTT connection. Defaults to `True`.|
| `WIS2_METADATA_RECORD_ID` | | The ID of the WIS2 global metadata ID for this data service.|
| `WIS2_TOPIC` | | The WIS2 MQTT topic the messages should be published under. |
| `EDR_API_URL` | | If the EDR API is hosted on a different URL then the ingest API, set this to the correct URL for the EDR API.|
| `PROMETHEUS_MULTIPROC_DIR` | `/tmp/metrics` | Directory for Prometheus multiprocess mode metrics. Defaults to `/tmp/metrics`. |
| `INGEST_LOGLEVEL` | | Logging level for the ingestion process. |
| `GUNICORN_CMD_ARGS` | | Command-line arguments for configuring Gunicorn, a Python WSGI HTTP Server. |
| `FASTAPI_ROOT_PATH` | | If this api is behind proxy, this need to be set to the root path |


## Dev install
To install in dev mode run `pip install --editable .` from the top level of this repository.

To install in dev mode run `pip install --editable .` from the top level of this repository.

## C++ dependencies

| Module | Version |
| --------------- | ------- |
| libeccodes-data | 2.24.2 |
Expand Down
72 changes: 72 additions & 0 deletions ingest/api/generate_wis2_payload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import os
import json


from api.model import Geometry
from api.model import Link
from api.wis2_model import Wis2MessageSchema
from api.wis2_model import Properties
from api.wis2_model import Content


def get_api_timeseries_query(location_id: str, baseURL: str) -> str:
query = "/collections/observations/locations/" + location_id
baseURL = os.getenv("EDR_API_URL", baseURL)
return baseURL + query


def generate_wis2_topic() -> str:
"""This function will generate the WIS2 complient toipc name"""
wis2_topic = os.getenv("WIS2_TOPIC")
if not wis2_topic:
raise ValueError("WIS2_TOPIC env variable not set. Aborting publish to wis2")
return wis2_topic


def generate_wis2_payload(message: dict, request_url: str) -> Wis2MessageSchema:
"""
This function will generate the WIS2 complient payload based on the JSON schema for ESOH
"""
wis2_payload = Wis2MessageSchema(
type="Feature",
id=message["id"],
version="v04",
geometry=Geometry(**message["geometry"]),
properties=Properties(
producer=message["properties"]["naming_authority"],
data_id=message["properties"]["data_id"],
metadata_id=os.getenv(
"WIS2_METADATA_RECORD_ID", None
), # Need to figure out how we generate this? Is it staic or dynamic?
datetime=message["properties"]["datetime"],
pubtime=message["properties"]["pubtime"],
content=Content(
value=json.dumps(
{
"type": "Feature",
"geometry": message["geometry"],
"properties": {
"observation": message["properties"]["content"]["value"],
"CF_standard_name": message["properties"]["content"]["standard_name"],
"unit": message["properties"]["content"]["unit"],
},
},
separators=(",", ":"),
),
unit=message["properties"]["content"]["unit"],
encoding="utf-8",
),
),
links=(
[
Link(
href=get_api_timeseries_query(message["properties"]["platform"], request_url),
rel="canonical",
type="application/prs.coverage+json",
)
]
)
+ (lambda x: x if x else [])(message["links"]),
)

return wis2_payload
52 changes: 41 additions & 11 deletions ingest/api/ingest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
from typing import Union

from api.generate_wis2_payload import generate_wis2_payload
from api.generate_wis2_payload import generate_wis2_topic
import grpc
import json

Expand Down Expand Up @@ -28,26 +30,38 @@ def __init__(
self,
mqtt_conf: dict,
uuid_prefix: str,
mqtt_WIS2_conf: dict | None = None,
):
self.uuid_prefix = uuid_prefix
self.client = None
if mqtt_conf["host"]:
try:
self.WIS2_client = None
try:
if mqtt_conf["host"]:
self.client = connect_mqtt(mqtt_conf)
except Exception as e:
logger.error("Failed to establish connection to mqtt, " + "\n" + str(e))
raise e
if mqtt_WIS2_conf:
self.WIS2_client = connect_mqtt(mqtt_WIS2_conf)
except Exception as e:
logger.error(
"Failed to establish connection to mqtt, "
+ "\n"
+ str(e)
+ "\n"
+ json.dumps(mqtt_conf)
+ "\n"
+ json.dumps(mqtt_WIS2_conf)
)
raise e

async def ingest(self, message: Union[str, object]):
async def ingest(self, message: Union[str, object], publishWIS2: bool, baseURL: str):
"""
This method will interpret call all methods for deciding input type, build the mqtt messages, and
publish them.

"""
messages = build_messages(message, self.uuid_prefix)
await self.publish_messages(messages)
await self.publish_messages(messages, publishWIS2, baseURL)

async def publish_messages(self, messages: list):
async def publish_messages(self, messages: list, publishWIS2: bool, baseURL: str):
"""
This method accepts a list of json strings ready to be ingest to datastore
and published to the mqtt topic.
Expand All @@ -60,7 +74,7 @@ async def publish_messages(self, messages: list):
logger.error("Failed to reach datastore, " + "\n" + str(e))
raise HTTPException(status_code=500, detail="API could not reach datastore")

if self.client is not None:
if self.client or self.WIS2_client:
for msg in messages:
topic = (
msg["properties"]["naming_authority"]
Expand All @@ -76,11 +90,27 @@ async def publish_messages(self, messages: list):
msg["properties"]["level"] = level_string
msg["properties"]["period"] = period_iso
try:
send_message(topic, json.dumps(msg), self.client)
logger.debug("Succesfully published to mqtt")
if self.client:
send_message(topic, json.dumps(msg), self.client)
logger.debug("Succesfully published to mqtt")
except Exception as e:
logger.error("Failed to publish to mqtt, " + str(e))
raise HTTPException(
status_code=500,
detail="Data ingested to datastore. But unable to publish to mqtt",
)
try:
if publishWIS2 and self.WIS2_client:
send_message(
generate_wis2_topic(),
generate_wis2_payload(msg, baseURL).model_dump(exclude_unset=True, exclude_none=True),
self.WIS2_client,
)
logger.debug("Succesfully published to mqtt")
except Exception as e:
logger.error("Failed to publish to WIS2 mqtt, " + str(e))
print(e)
raise HTTPException(
status_code=500,
detail="Data ingested to datastore. But unable to publish to WIS2 mqtt",
)
35 changes: 30 additions & 5 deletions ingest/api/main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
import os

from api.utilities import get_base_url_from_request
from fastapi import FastAPI
from fastapi import UploadFile
from fastapi import Request
from pydantic import BaseModel

from typing import List
Expand Down Expand Up @@ -37,23 +39,46 @@ class Response(BaseModel):
"port": int(os.getenv("MQTT_PORT", 8883)),
}

ingester = IngestToPipeline(mqtt_conf=mqtt_configuration, uuid_prefix="uuid")
mqtt_wis2_configuration = {
"host": os.getenv("WIS2_MQTT_HOST", None),
"username": os.getenv("WIS2_MQTT_USERNAME", None),
"password": os.getenv("WIS2_MQTT_PASSWORD", None),
"enable_tls": os.getenv("WIS2_MQTT_TLS", "False").lower() in ("true", "1", "t"),
"port": int(os.getenv("WIS2_MQTT_PORT", 8883)),
}

if not all(
[
mqtt_wis2_configuration["host"],
]
):
mqtt_wis2_configuration = None

ingester = IngestToPipeline(
mqtt_conf=mqtt_configuration,
uuid_prefix="uuid",
mqtt_WIS2_conf=mqtt_wis2_configuration,
)

app = FastAPI(root_path=os.getenv("FASTAPI_ROOT_PATH", ""))
add_metrics(app)


@app.post("/bufr")
async def upload_bufr_file(files: UploadFile):
async def upload_bufr_file(http_request: Request, files: UploadFile, publishWIS2: bool = False):
contents = await files.read()
json_data = build_json_payload(contents)
await post_json(json_data)
await post_json(http_request, json_data, publishWIS2)

return Response(status_message="Successfully ingested", status_code=200)


@app.post("/json")
async def post_json(request: JsonMessageSchema | List[JsonMessageSchema]) -> Response:
async def post_json(
http_request: Request,
request: JsonMessageSchema | List[JsonMessageSchema],
publishWIS2: bool = False,
) -> Response:
status = "Successfully ingested"
if isinstance(request, list):
hash_list = [i.__hash__() for i in request]
Expand All @@ -65,6 +90,6 @@ async def post_json(request: JsonMessageSchema | List[JsonMessageSchema]) -> Res
else:
json_data = [request.model_dump(exclude_none=True)]

await ingester.ingest(json_data)
await ingester.ingest(json_data, publishWIS2, get_base_url_from_request(http_request))

return Response(status_message=status, status_code=200)
7 changes: 4 additions & 3 deletions ingest/api/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class Geometry(BaseModel):

class Integrity(BaseModel):
method: Literal["sha256", "sha384", "sha512", "sha3-256", "sha3-384", "sha3-512"] = Field(
..., description="A specific set of methods for calculating the checksum algorithms"
...,
description="A specific set of methods for calculating the checksum algorithms",
)
value: str = Field(..., description="Checksum value.")

Expand Down Expand Up @@ -290,7 +291,8 @@ class Properties(BaseModel):
)
content: Content = Field(..., description="Actual data content")
integrity: Optional[Integrity] = Field(
None, description="Specifies a checksum to be applied to the data to ensure that the download is accurate."
None,
description="Specifies a checksum to be applied to the data to ensure that the download is accurate.",
)

@field_validator("period", mode="before")
Expand Down Expand Up @@ -320,7 +322,6 @@ def check_datetime_iso(self) -> "Properties":

@model_validator(mode="after")
def validate_wigos_id(self):

blocks = self.platform.split("-")
assert len(blocks) == 4, f"Not enough blocks in input 'platform', '{self.platform}'"
for i in blocks[:-1]:
Expand Down
1 change: 0 additions & 1 deletion ingest/api/send_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@


def connect_mqtt(mqtt_conf: dict):

def on_connect(client, userdata, flags, rc, properties=None):
if rc == 0:
logger.info("Connected to MQTT Broker!")
Expand Down
5 changes: 5 additions & 0 deletions ingest/api/utilities.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from fastapi import Request
import isodate


def get_base_url_from_request(http_request: Request) -> str:
return f"{http_request.url.components.scheme}://{http_request.url.components.netloc}"


def seconds_to_iso_8601_duration(seconds: int) -> str:
duration = isodate.Duration(seconds=seconds)
iso_duration = isodate.duration_isoformat(duration)
Expand Down
Loading