diff --git a/docker-compose.yml b/docker-compose.yml index a31e0b85..0c6a99ad 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,7 +9,7 @@ services: volumes: # - ts-data:/home/postgres/pgdata/data # for timescale image - ts-data:/var/lib/postgresql # for postgres image - - ./datastore/database/extra.conf:/etc/conf_settings/extra.conf:ro # Extra Postgres configuration + - ./datastore/database/extra.conf:/etc/conf_settings/extra.conf:ro # Extra Postgres configuration - ./datastore/database/healthcheck_postgis_uptime.sh:/healthcheck_postgis_uptime.sh:ro # for the healthcheck environment: - EXTRA_CONF_DIR=/etc/conf_settings @@ -105,10 +105,17 @@ services: environment: - DSHOST=${DSHOST:-store} - DSPORT=${DSPORT:-50050} - - MQTT_HOST=${MQTT_HOST} + - MQTT_HOST=${MQTT_HOST:-mqtt} - MQTT_USERNAME=${MQTT_USERNAME} - MQTT_PASSWORD=${MQTT_PASSWORD} - - MQTT_TLS=True + - MQTT_PORT=${MQTT_PORT:-1883} + - MQTT_TLS=${MQTT_TLS:-False} + - WIS2_TOPIC=${WIS2_TOPIC:-'TEMP_TOPIC'} + - WIS2_MQTT_HOST=${MQTT_HOST:-mqtt_wis2} + - WIS2_MQTT_USERNAME=${MQTT_USERNAME:-} + - WIS2_MQTT_PASSWORD=${MQTT_PASSWORD:-} + - WIS2_MQTT_TLS=${MQTT_TLS:-False} + - WIS2_MQTT_PORT=${MQTT_PORT:-1884} - INGEST_LOGLEVEL - GUNICORN_CMD_ARGS=--bind 0.0.0.0:8001 --workers=4 --access-logfile - depends_on: @@ -123,6 +130,23 @@ services: volumes: - ./ingest/test/output:/app/output + mqtt: + image: eclipse-mosquitto + restart: unless-stopped + ports: + - "1883:1883" + volumes: + - ./mosquitto:/etc/mosquitto + - ./mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf + + mqtt_wis2: + image: eclipse-mosquitto + restart: unless-stopped + ports: + - "1884:1884" + volumes: + - ./mosquitto:/etc/mosquitto + - ./mosquitto/mosquitto_wis2.conf:/mosquitto/config/mosquitto.conf client: profiles: ["test"] build: @@ -208,7 +232,12 @@ services: depends_on: db: condition: service_healthy - command: ["--collector.stat_statements", "--collector.stat_user_tables", "--collector.stat_activity_autovacuum"] + command: + [ + "--collector.stat_statements", + "--collector.stat_user_tables", + "--collector.stat_activity_autovacuum", + ] grafana: profiles: ["monitoring"] diff --git a/ingest/README.md b/ingest/README.md index f1542a9c..093dae8d 100644 --- a/ingest/README.md +++ b/ingest/README.md @@ -1,4 +1,5 @@ # e-soh-event-queue + ## Enviornment variables | Variable | Default Value | Description | @@ -9,17 +10,24 @@ | `MQTT_USERNAME` | | Username for authentication with the MQTT broker. | | `MQTT_PASSWORD` | | Password for authentication with the MQTT broker. | | `MQTT_TLS` | `True` | Whether to use TLS (True/False) for the MQTT connection. Defaults to `True`.| +| `WIS2_MQTT_HOST` | | Host address for the MQTT broker. | +| `WIS2_MQTT_USERNAME` | | Username for authentication with the MQTT broker. | +| `WIS2_MQTT_PASSWORD` | | Password for authentication with the MQTT broker. | +| `WIS2_MQTT_TLS` | `True` | Whether to use TLS (True/False) for the MQTT connection. Defaults to `True`.| +| `WIS2_METADATA_RECORD_ID` | | The ID of the WIS2 global metadata ID for this data service.| +| `WIS2_TOPIC` | | The WIS2 MQTT topic the messages should be published under. | +| `EDR_API_URL` | | If the EDR API is hosted on a different URL then the ingest API, set this to the correct URL for the EDR API.| | `PROMETHEUS_MULTIPROC_DIR` | `/tmp/metrics` | Directory for Prometheus multiprocess mode metrics. Defaults to `/tmp/metrics`. | | `INGEST_LOGLEVEL` | | Logging level for the ingestion process. | | `GUNICORN_CMD_ARGS` | | Command-line arguments for configuring Gunicorn, a Python WSGI HTTP Server. | | `FASTAPI_ROOT_PATH` | | If this api is behind proxy, this need to be set to the root path | - ## Dev install -To install in dev mode run `pip install --editable .` from the top level of this repository. +To install in dev mode run `pip install --editable .` from the top level of this repository. ## C++ dependencies + | Module | Version | | --------------- | ------- | | libeccodes-data | 2.24.2 | diff --git a/ingest/api/generate_wis2_payload.py b/ingest/api/generate_wis2_payload.py new file mode 100644 index 00000000..062edf3c --- /dev/null +++ b/ingest/api/generate_wis2_payload.py @@ -0,0 +1,72 @@ +import os +import json + + +from api.model import Geometry +from api.model import Link +from api.wis2_model import Wis2MessageSchema +from api.wis2_model import Properties +from api.wis2_model import Content + + +def get_api_timeseries_query(location_id: str, baseURL: str) -> str: + query = "/collections/observations/locations/" + location_id + baseURL = os.getenv("EDR_API_URL", baseURL) + return baseURL + query + + +def generate_wis2_topic() -> str: + """This function will generate the WIS2 complient toipc name""" + wis2_topic = os.getenv("WIS2_TOPIC") + if not wis2_topic: + raise ValueError("WIS2_TOPIC env variable not set. Aborting publish to wis2") + return wis2_topic + + +def generate_wis2_payload(message: dict, request_url: str) -> Wis2MessageSchema: + """ + This function will generate the WIS2 complient payload based on the JSON schema for ESOH + """ + wis2_payload = Wis2MessageSchema( + type="Feature", + id=message["id"], + version="v04", + geometry=Geometry(**message["geometry"]), + properties=Properties( + producer=message["properties"]["naming_authority"], + data_id=message["properties"]["data_id"], + metadata_id=os.getenv( + "WIS2_METADATA_RECORD_ID", None + ), # Need to figure out how we generate this? Is it staic or dynamic? + datetime=message["properties"]["datetime"], + pubtime=message["properties"]["pubtime"], + content=Content( + value=json.dumps( + { + "type": "Feature", + "geometry": message["geometry"], + "properties": { + "observation": message["properties"]["content"]["value"], + "CF_standard_name": message["properties"]["content"]["standard_name"], + "unit": message["properties"]["content"]["unit"], + }, + }, + separators=(",", ":"), + ), + unit=message["properties"]["content"]["unit"], + encoding="utf-8", + ), + ), + links=( + [ + Link( + href=get_api_timeseries_query(message["properties"]["platform"], request_url), + rel="canonical", + type="application/prs.coverage+json", + ) + ] + ) + + (lambda x: x if x else [])(message["links"]), + ) + + return wis2_payload diff --git a/ingest/api/ingest.py b/ingest/api/ingest.py index e1780f13..5de37f86 100644 --- a/ingest/api/ingest.py +++ b/ingest/api/ingest.py @@ -1,6 +1,8 @@ import logging from typing import Union +from api.generate_wis2_payload import generate_wis2_payload +from api.generate_wis2_payload import generate_wis2_topic import grpc import json @@ -28,26 +30,38 @@ def __init__( self, mqtt_conf: dict, uuid_prefix: str, + mqtt_WIS2_conf: dict | None = None, ): self.uuid_prefix = uuid_prefix self.client = None - if mqtt_conf["host"]: - try: + self.WIS2_client = None + try: + if mqtt_conf["host"]: self.client = connect_mqtt(mqtt_conf) - except Exception as e: - logger.error("Failed to establish connection to mqtt, " + "\n" + str(e)) - raise e + if mqtt_WIS2_conf: + self.WIS2_client = connect_mqtt(mqtt_WIS2_conf) + except Exception as e: + logger.error( + "Failed to establish connection to mqtt, " + + "\n" + + str(e) + + "\n" + + json.dumps(mqtt_conf) + + "\n" + + json.dumps(mqtt_WIS2_conf) + ) + raise e - async def ingest(self, message: Union[str, object]): + async def ingest(self, message: Union[str, object], publishWIS2: bool, baseURL: str): """ This method will interpret call all methods for deciding input type, build the mqtt messages, and publish them. """ messages = build_messages(message, self.uuid_prefix) - await self.publish_messages(messages) + await self.publish_messages(messages, publishWIS2, baseURL) - async def publish_messages(self, messages: list): + async def publish_messages(self, messages: list, publishWIS2: bool, baseURL: str): """ This method accepts a list of json strings ready to be ingest to datastore and published to the mqtt topic. @@ -60,7 +74,7 @@ async def publish_messages(self, messages: list): logger.error("Failed to reach datastore, " + "\n" + str(e)) raise HTTPException(status_code=500, detail="API could not reach datastore") - if self.client is not None: + if self.client or self.WIS2_client: for msg in messages: topic = ( msg["properties"]["naming_authority"] @@ -76,11 +90,27 @@ async def publish_messages(self, messages: list): msg["properties"]["level"] = level_string msg["properties"]["period"] = period_iso try: - send_message(topic, json.dumps(msg), self.client) - logger.debug("Succesfully published to mqtt") + if self.client: + send_message(topic, json.dumps(msg), self.client) + logger.debug("Succesfully published to mqtt") except Exception as e: logger.error("Failed to publish to mqtt, " + str(e)) raise HTTPException( status_code=500, detail="Data ingested to datastore. But unable to publish to mqtt", ) + try: + if publishWIS2 and self.WIS2_client: + send_message( + generate_wis2_topic(), + generate_wis2_payload(msg, baseURL).model_dump(exclude_unset=True, exclude_none=True), + self.WIS2_client, + ) + logger.debug("Succesfully published to mqtt") + except Exception as e: + logger.error("Failed to publish to WIS2 mqtt, " + str(e)) + print(e) + raise HTTPException( + status_code=500, + detail="Data ingested to datastore. But unable to publish to WIS2 mqtt", + ) diff --git a/ingest/api/main.py b/ingest/api/main.py index 435c4ccf..b9dc7c05 100644 --- a/ingest/api/main.py +++ b/ingest/api/main.py @@ -1,8 +1,10 @@ import logging import os +from api.utilities import get_base_url_from_request from fastapi import FastAPI from fastapi import UploadFile +from fastapi import Request from pydantic import BaseModel from typing import List @@ -37,23 +39,46 @@ class Response(BaseModel): "port": int(os.getenv("MQTT_PORT", 8883)), } -ingester = IngestToPipeline(mqtt_conf=mqtt_configuration, uuid_prefix="uuid") +mqtt_wis2_configuration = { + "host": os.getenv("WIS2_MQTT_HOST", None), + "username": os.getenv("WIS2_MQTT_USERNAME", None), + "password": os.getenv("WIS2_MQTT_PASSWORD", None), + "enable_tls": os.getenv("WIS2_MQTT_TLS", "False").lower() in ("true", "1", "t"), + "port": int(os.getenv("WIS2_MQTT_PORT", 8883)), +} + +if not all( + [ + mqtt_wis2_configuration["host"], + ] +): + mqtt_wis2_configuration = None + +ingester = IngestToPipeline( + mqtt_conf=mqtt_configuration, + uuid_prefix="uuid", + mqtt_WIS2_conf=mqtt_wis2_configuration, +) app = FastAPI(root_path=os.getenv("FASTAPI_ROOT_PATH", "")) add_metrics(app) @app.post("/bufr") -async def upload_bufr_file(files: UploadFile): +async def upload_bufr_file(http_request: Request, files: UploadFile, publishWIS2: bool = False): contents = await files.read() json_data = build_json_payload(contents) - await post_json(json_data) + await post_json(http_request, json_data, publishWIS2) return Response(status_message="Successfully ingested", status_code=200) @app.post("/json") -async def post_json(request: JsonMessageSchema | List[JsonMessageSchema]) -> Response: +async def post_json( + http_request: Request, + request: JsonMessageSchema | List[JsonMessageSchema], + publishWIS2: bool = False, +) -> Response: status = "Successfully ingested" if isinstance(request, list): hash_list = [i.__hash__() for i in request] @@ -65,6 +90,6 @@ async def post_json(request: JsonMessageSchema | List[JsonMessageSchema]) -> Res else: json_data = [request.model_dump(exclude_none=True)] - await ingester.ingest(json_data) + await ingester.ingest(json_data, publishWIS2, get_base_url_from_request(http_request)) return Response(status_message=status, status_code=200) diff --git a/ingest/api/model.py b/ingest/api/model.py index 21820b47..5ed22530 100644 --- a/ingest/api/model.py +++ b/ingest/api/model.py @@ -40,7 +40,8 @@ class Geometry(BaseModel): class Integrity(BaseModel): method: Literal["sha256", "sha384", "sha512", "sha3-256", "sha3-384", "sha3-512"] = Field( - ..., description="A specific set of methods for calculating the checksum algorithms" + ..., + description="A specific set of methods for calculating the checksum algorithms", ) value: str = Field(..., description="Checksum value.") @@ -290,7 +291,8 @@ class Properties(BaseModel): ) content: Content = Field(..., description="Actual data content") integrity: Optional[Integrity] = Field( - None, description="Specifies a checksum to be applied to the data to ensure that the download is accurate." + None, + description="Specifies a checksum to be applied to the data to ensure that the download is accurate.", ) @field_validator("period", mode="before") @@ -320,7 +322,6 @@ def check_datetime_iso(self) -> "Properties": @model_validator(mode="after") def validate_wigos_id(self): - blocks = self.platform.split("-") assert len(blocks) == 4, f"Not enough blocks in input 'platform', '{self.platform}'" for i in blocks[:-1]: diff --git a/ingest/api/send_mqtt.py b/ingest/api/send_mqtt.py index 5baacee9..d1b72068 100644 --- a/ingest/api/send_mqtt.py +++ b/ingest/api/send_mqtt.py @@ -7,7 +7,6 @@ def connect_mqtt(mqtt_conf: dict): - def on_connect(client, userdata, flags, rc, properties=None): if rc == 0: logger.info("Connected to MQTT Broker!") diff --git a/ingest/api/utilities.py b/ingest/api/utilities.py index ac3dd9df..afd348a0 100644 --- a/ingest/api/utilities.py +++ b/ingest/api/utilities.py @@ -1,6 +1,11 @@ +from fastapi import Request import isodate +def get_base_url_from_request(http_request: Request) -> str: + return f"{http_request.url.components.scheme}://{http_request.url.components.netloc}" + + def seconds_to_iso_8601_duration(seconds: int) -> str: duration = isodate.Duration(seconds=seconds) iso_duration = isodate.duration_isoformat(duration) diff --git a/ingest/api/wis2_model.py b/ingest/api/wis2_model.py new file mode 100644 index 00000000..cb33925b --- /dev/null +++ b/ingest/api/wis2_model.py @@ -0,0 +1,76 @@ +from typing import List +from typing import Literal +from typing import Optional + +from pydantic import BaseModel +from pydantic import Field + +from api.model import Link +from api.model import Geometry + + +class Content(BaseModel): + encoding: Literal["utf-8", "base64", "gzip"] = Field(..., description="Encoding of content") + size: int | None = Field( + None, + description=( + "Number of bytes contained in the file. Together with the ``integrity`` property," + " it provides additional assurance that file content was accurately received." + "Note that the limit takes into account the data encoding used, " + "including data compression (for example `gzip`)." + ), + ) + value: str = Field(..., description="The inline content of the file.") + unit: str = Field(..., description="Unit for the data") + + class Config: + str_strip_whitespace = True + + +class Integrity(BaseModel): + method: Literal["sha256", "sha384", "sha512", "sha3-256", "sha3-384", "sha3-512"] + value: str = Field(..., desciption="The hash value for the value field in the message content.") + + +class Properties(BaseModel): + datetime: str = Field( + ..., + description="Identifies the date/time of the data being recorded, in RFC3339 format.", + ) + pubtime: str = Field( + ..., + description="Identifies the date/time of the message being published, in RFC3339 format.", + ) + data_id: str = Field( + ..., + description="Unique identifier of the data as defined by the data producer.", + ) + metadata_id: Optional[str] = Field( + ..., + description="Identifier for associated discovery metadata record to which the notification applies", + ) + producer: Optional[str] = Field( + ..., + description="Identifies the provider that initially captured and processed the source data," + " in support of data distribution on behalf of other Members", + ) + start_datetime: Optional[str] = Field( + None, + description="Identifies the start date/time date of the data being recorded, in RFC3339 format.", + ) + end_datetime: Optional[str] = Field( + None, + description="Identifies the end date/time date of the data being recorded, in RFC3339 format.", + ) + content: Optional[Content] = Field(..., description="Actual data content") + integrity: Optional[Integrity] = Field( + None, + description="Specifies a checksum to be applied to the data to ensure that the download is accurate.", + ) + + +class Wis2MessageSchema(BaseModel): + type: Literal["Feature"] + geometry: Geometry + properties: Properties + links: List[Link] = Field(..., min_length=1) diff --git a/mosquitto/mosquitto.conf b/mosquitto/mosquitto.conf new file mode 100644 index 00000000..76c1e955 --- /dev/null +++ b/mosquitto/mosquitto.conf @@ -0,0 +1,2 @@ +allow_anonymous true +listener 1883 diff --git a/mosquitto/mosquitto_wis2.conf b/mosquitto/mosquitto_wis2.conf new file mode 100644 index 00000000..4879ad3d --- /dev/null +++ b/mosquitto/mosquitto_wis2.conf @@ -0,0 +1,2 @@ +allow_anonymous true +listener 1884