Skip to content

Commit

Permalink
refactoring to allow more flexibility
Browse files Browse the repository at this point in the history
  • Loading branch information
dchandan committed Oct 23, 2023
1 parent 6d675bc commit 65bd5bb
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 100 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ testcmip6:
python $(IMP_DIR)/CMIP6_UofT/add_CMIP6.py $(STAC_HOST) https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/catalog/birdhouse/testdata/xclim/cmip6/catalog.html

delcmip6:
curl --location --request DELETE '$(STAC_HOST)/collections/CMIP6'
curl --location --request DELETE '$(STAC_HOST)/collections/CMIP6_UofT'
@echo ""

starthost:
Expand Down
20 changes: 12 additions & 8 deletions STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from STACpopulator import STACpopulatorBase
from STACpopulator.implementations.CMIP6_UofT.extensions import DataCubeHelper
from STACpopulator.input import THREDDSLoader
from STACpopulator.input import GenericLoader, THREDDSLoader
from STACpopulator.models import GeoJSONPolygon, STACItemProperties
from STACpopulator.stac_utils import STAC_item_from_metadata, collection2literal

Expand Down Expand Up @@ -122,21 +122,16 @@ class CMIP6populator(STACpopulatorBase):
item_properties_model = CMIP6ItemProperties
item_geometry_model = GeoJSONPolygon

def __init__(self, stac_host: str, thredds_catalog_url: str, update: Optional[bool] = False) -> None:
def __init__(self, stac_host: str, data_loader: GenericLoader, update: Optional[bool] = False) -> None:
"""Constructor
:param stac_host: URL to the STAC API
:type stac_host: str
:param thredds_catalog_url: the URL to the THREDDS catalog to ingest
:type thredds_catalog_url: str
"""
data_loader = THREDDSLoader(thredds_catalog_url)

super().__init__(stac_host, data_loader, update)

def handle_ingestion_error(self, error: str, item_name: str, item_data: MutableMapping[str, Any]):
pass

def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""Creates the STAC item.
Expand Down Expand Up @@ -172,5 +167,14 @@ def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any])
args = parser.parse_args()

LOGGER.info(f"Arguments to call: {args}")
c = CMIP6populator(args.stac_host, args.thredds_catalog_URL, args.update)

mode = "full"

if mode == "full":
data_loader = THREDDSLoader(args.thredds_catalog_URL)
else:
# To be implemented
data_loader = ErrorLoader(args.error_file)

c = CMIP6populator(args.stac_host, data_loader, args.update)
c.ingest()
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
title: CMIP6
id: CMIP6_UofT
description: Coupled Model Intercomparison Project phase 6
keywords: ['CMIP', 'CMIP6', 'WCRP', 'Climate Change']
license: "CC-BY-4.0"
Expand Down
60 changes: 30 additions & 30 deletions STACpopulator/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@

import pystac
import requests
import siphon
import xncml
from colorlog import ColoredFormatter
from siphon.catalog import TDSCatalog

from STACpopulator.stac_utils import numpy_to_python_datatypes
from STACpopulator.stac_utils import numpy_to_python_datatypes, url_validate

LOGGER = logging.getLogger(__name__)
LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s"
Expand Down Expand Up @@ -52,64 +51,65 @@ def __init__(self, thredds_catalog_url: str, depth: Optional[int] = None) -> Non
super().__init__()
self._depth = depth if depth is not None else 1000

if thredds_catalog_url.endswith(".html"):
thredds_catalog_url = thredds_catalog_url.replace(".html", ".xml")
LOGGER.info("Converting catalog URL from html to xml")
self.thredds_catalog_URL = self.validate_catalog_url(thredds_catalog_url)

self.thredds_catalog_URL = thredds_catalog_url
self.catalog = TDSCatalog(self.thredds_catalog_URL)
self.catalog_head = self.catalog
self.links.append(self.magpie_collection_link())

def magpie_collection_link(self):
"""Return Link to THREDDS catalog."""
def validate_catalog_url(self, url: str) -> str:
"""Validate the user-provided catalog URL.
:param url: URL to the THREDDS catalog
:type url: str
:raises RuntimeError: if URL is invalid or contains query parameters.
:return: a valid URL
:rtype: str
"""
if url_validate(url):
if "?" in url:
raise RuntimeError("THREDDS catalog URL should not contain query parameter")
else:
raise RuntimeError("Invalid URL")

return url.replace(".html", ".xml") if url.endswith(".html") else url

def magpie_collection_link(self) -> pystac.Link:
"""Creates a PySTAC Link for the collection that is used by Cowbird and Magpie.
:return: A PySTAC Link
:rtype: pystac.Link
"""
url = self.thredds_catalog_URL
parts = url.split("/")
i = parts.index("catalog")
service = parts[i - 1]
# service = parts[i - 1]
path = "/".join(parts[i + 1 : -1])
return pystac.Link(rel="source", target=url, media_type="text/xml", title=f"{service}:{path}")
return pystac.Link(rel="source", target=url, media_type="text/xml", title=path)

def reset(self):
"""Reset the generator."""
self.catalog_head = self.catalog

def __iter__(self) -> Iterator[Tuple[str, MutableMapping[str, Any]]]:
"""Return a generator walking a THREDDS data catalog for datasets."""
# print(f"At START catalog head is: {self.catalog_head}")
print(self.catalog_head.__dict__)
if self.catalog_head.datasets.items():
for item_name, ds in self.catalog_head.datasets.items():
attrs = self.extract_metadata(ds)
attrs = self.extract_metadata(ds.access_urls["NCML"], self.catalog_head.catalog_url, ds.url_path)
yield item_name, attrs

if self._depth > 0:
for name, ref in self.catalog_head.catalog_refs.items():
self.catalog_head = ref.follow()
print(f"catalog head is: {self.catalog_head}")
self._depth -= 1
yield from self

def extract_metadata(self, ds: siphon.catalog.Dataset) -> MutableMapping[str, Any]:
# Get URL for NCML service
url = ds.access_urls["NCML"]

print(url)
# print(self.catalog_head)
print(f"ds = {ds}")
print(ds.__dict__)
print(self.catalog_head.catalog_url)
def extract_metadata(self, ncml_url: str, catalog_url: str, dataset_path: str) -> MutableMapping[str, Any]:
LOGGER.info("Requesting NcML dataset description")
# r = requests.get(url)
r = requests.get(url, params={"catalog": self.catalog_head, "dataset": ds})

r = requests.get(ncml_url, params={"catalog": catalog_url, "dataset": dataset_path})
# Convert NcML to CF-compliant dictionary
attrs = xncml.Dataset.from_text(r.content).to_cf_dict()

attrs["attributes"] = numpy_to_python_datatypes(attrs["attributes"])

attrs["access_urls"] = ds.access_urls

return attrs


Expand Down
53 changes: 17 additions & 36 deletions STACpopulator/populator_base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import logging
import os
import sys
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, MutableMapping, Optional

import pystac
import yaml
from colorlog import ColoredFormatter

from STACpopulator.api_requests import (
Expand All @@ -15,7 +12,7 @@
stac_host_reachable,
)
from STACpopulator.input import GenericLoader
from STACpopulator.stac_utils import url_validate
from STACpopulator.stac_utils import load_collection_configuration, url_validate

LOGGER = logging.getLogger(__name__)
LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s"
Expand Down Expand Up @@ -44,20 +41,7 @@ def __init__(
"""

super().__init__()
self._collection_info_filename = "collection_config.yml"
self._app_directory = os.path.dirname(sys.argv[0])

if not os.path.exists(os.path.join(self._app_directory, self._collection_info_filename)):
raise RuntimeError(f"Missing {self._collection_info_filename} file for this implementation")

with open(os.path.join(self._app_directory, self._collection_info_filename)) as f:
self._collection_info = yaml.load(f, yaml.Loader)

req_definitions = ["title", "description", "keywords", "license"]
for req in req_definitions:
if req not in self._collection_info.keys():
LOGGER.error(f"'{req}' is required in the configuration file")
raise RuntimeError(f"'{req}' is required in the configuration file")
self._collection_info = load_collection_configuration()

self._ingest_pipeline = data_loader
self._stac_host = self.validate_host(stac_host)
Expand All @@ -78,7 +62,7 @@ def stac_host(self) -> str:

@property
def collection_id(self) -> str:
return self._collection_id
return self._collection_info["id"]

@property
@abstractmethod
Expand All @@ -87,15 +71,26 @@ def item_properties_model(self):
models.STACItemProperties."""
pass

@property
@abstractmethod
def item_geometry_model(self):
"""In derived classes, this property should be defined as a pydantic data model that derives from
models.STACItemProperties."""
pass

@abstractmethod
def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
pass

def validate_host(self, stac_host: str) -> str:
if not url_validate(stac_host):
raise ValueError("stac_host URL is not appropriately formatted")
if not stac_host_reachable(stac_host):
raise ValueError("stac_host is not reachable")
raise RuntimeError("stac_host is not reachable")

return stac_host

def create_stac_collection(self):
def create_stac_collection(self) -> None:
"""
Create a basic STAC collection.
Expand All @@ -114,8 +109,7 @@ def create_stac_collection(self):
)
self._collection_info["extent"] = pystac.Extent(sp_extent, tmp_extent)
self._collection_info["summaries"] = pystac.Summaries({"needs_summaries_update": ["true"]})

collection = pystac.Collection(id=self.collection_id, **self._collection_info)
collection = pystac.Collection(**self._collection_info)

collection.add_links(self._ingest_pipeline.links)

Expand All @@ -127,16 +121,3 @@ def ingest(self) -> None:
LOGGER.info(f"Creating STAC representation for {item_name}")
stac_item = self.create_stac_item(item_name, item_data)
post_stac_item(self.stac_host, self.collection_id, item_name, stac_item, self.update)
# try:
# pass
# except Exception:
# LOGGER.error(f"Failed adding STAC item {item_name}")
# self.handle_ingestion_error("Posting Error", item_name, item_data)

@abstractmethod
def handle_ingestion_error(self, error: str, item_name: str, item_data: MutableMapping[str, Any]):
pass

@abstractmethod
def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
pass
Loading

0 comments on commit 65bd5bb

Please sign in to comment.