Skip to content

Commit

Permalink
Breaking CFJsonItem part 2: extracting datacube extension code
Browse files Browse the repository at this point in the history
  • Loading branch information
dchandan committed Oct 6, 2023
1 parent 2f5dc39 commit 3f821ce
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 295 deletions.
285 changes: 1 addition & 284 deletions STACpopulator/stac_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
from typing import Any, Literal, MutableMapping

import pystac
from pystac.extensions.datacube import Dimension, DimensionType, Variable, VariableType

from STACpopulator.models import STACItem, STACItemProperties
from STACpopulator.models import STACItem


def url_validate(target: str) -> bool:
Expand Down Expand Up @@ -127,288 +126,6 @@ def STAC_item_from_metadata(iid: str, attrs: MutableMapping[str, Any], item_prop
return item


class CFJsonItem:
"""Return STAC Item from CF JSON metadata, as provided by `xncml.Dataset.to_cf_dict`."""

axis = {"X": "x", "Y": "y", "Z": "z", "T": "t", "longitude": "x", "latitude": "y", "vertical": "z", "time": "t"}

def __init__(self, iid: str, attrs: dict, datamodel=None):
"""
Create STAC Item from CF JSON metadata.
Parameters
----------
iid : str
Unique item ID.
attrs: dict
CF JSON metadata returned by `xncml.Dataset.to_cf_dict`.
datamodel : pydantic.BaseModel, optional
Data model for validating global attributes.
"""
self.attrs = attrs
cfmeta = attrs["groups"]["CFMetadata"]["attributes"]

# Global attributes
gattrs = {
"start_datetime": cfmeta["time_coverage_start"],
"end_datetime": cfmeta["time_coverage_end"],
**attrs["attributes"],
}

# Validate using pydantic data model if given
datamodel = datamodel or STACItemProperties

class MySTACItem(STACItem):
properties: datamodel

# Create STAC item
item = MySTACItem(
id=iid,
geometry=self.ncattrs_to_geometry(),
bbox=self.ncattrs_to_bbox(),
properties=gattrs,
datetime=None,
)

item = pystac.Item(**json.loads(item.model_dump_json(by_alias=True)))

# Add assets
if "access_urls" in attrs:
root = attrs["access_urls"]
elif "THREDDSMetadata" in attrs["groups"]:
root = attrs["groups"]["THREDDSMetadata"]["groups"]["services"]["attributes"]
else:
root = {}

for name, url in root.items():
asset = pystac.Asset(href=url, media_type=media_types.get(name), roles=asset_roles.get(name))
item.add_asset(name, asset)

self.item = item

def to_json(self) -> str:
self.item.model_dump_json()

def ncattrs_to_geometry(self) -> MutableMapping[str, Any]:
"""Create Polygon geometry from CFMetadata."""
attrs = self.attrs["groups"]["CFMetadata"]["attributes"]
return {
"type": "Polygon",
"coordinates": [
[
[
float(attrs["geospatial_lon_min"][0]),
float(attrs["geospatial_lat_min"][0]),
],
[
float(attrs["geospatial_lon_min"][0]),
float(attrs["geospatial_lat_max"][0]),
],
[
float(attrs["geospatial_lon_max"][0]),
float(attrs["geospatial_lat_max"][0]),
],
[
float(attrs["geospatial_lon_max"][0]),
float(attrs["geospatial_lat_min"][0]),
],
[
float(attrs["geospatial_lon_min"][0]),
float(attrs["geospatial_lat_min"][0]),
],
]
],
}

def ncattrs_to_bbox(self) -> list:
"""Create BBOX from CFMetadata."""
attrs = self.attrs["groups"]["CFMetadata"]["attributes"]
return [
float(attrs["geospatial_lon_min"][0]),
float(attrs["geospatial_lat_min"][0]),
float(attrs["geospatial_lon_max"][0]),
float(attrs["geospatial_lat_max"][0]),
]

def dimensions(self) -> dict:
"""Return Dimension objects required for Datacube extension."""

dims = {}
for name, length in self.attrs["dimensions"].items():
v = self.attrs["variables"].get(name)
if v:
bbox = self.obj.ncattrs_to_bbox()
for key, criteria in coordinate_criteria.items():
for criterion, expected in criteria.items():
if v["attributes"].get(criterion, None) in expected:
axis = self.axis[key]
type_ = DimensionType.SPATIAL if axis in ["x", "y", "z"] else DimensionType.TEMPORAL

if v["type"] == "int":
extent = [0, int(length)]
else: # Not clear the logic is sound
if key == "X":
extent = bbox[0], bbox[2]
elif key == "Y":
extent = bbox[1], bbox[3]
else:
extent = None

dims[name] = Dimension(
properties=dict(
axis=axis,
type=type_,
extent=extent,
description=v.get("description", v.get("long_name", criteria["standard_name"])),
)
)

return dims

def variables(self) -> dict:
"""Return Variable objects required for Datacube extension."""
variables = {}

for name, meta in self.attrs["variables"].items():
if name in self.attrs["dimensions"]:
continue

attrs = meta["attributes"]
variables[name] = Variable(
properties=dict(
dimensions=meta["shape"],
type=VariableType.AUXILIARY.value if self.is_coordinate(attrs) else VariableType.DATA.value,
description=attrs.get("description", attrs.get("long_name")),
unit=attrs.get("units", None),
)
)
return variables

def is_coordinate(self, attrs: dict) -> bool:
"""Return whether variable is a coordinate."""
for key, criteria in coordinate_criteria.items():
for criterion, expected in criteria.items():
if attrs.get(criterion, None) in expected:
return True
return False


# From CF-Xarray
coordinate_criteria = {
"latitude": {
"standard_name": ("latitude",),
"units": ("degree_north", "degree_N", "degreeN", "degrees_north", "degrees_N", "degreesN"),
"_CoordinateAxisType": ("Lat",),
"long_name": ("latitude",),
},
"longitude": {
"standard_name": ("longitude",),
"units": ("degree_east", "degree_E", "degreeE", "degrees_east", "degrees_E", "degreesE"),
"_CoordinateAxisType": ("Lon",),
"long_name": ("longitude",),
},
"Z": {
"standard_name": (
"model_level_number",
"atmosphere_ln_pressure_coordinate",
"atmosphere_sigma_coordinate",
"atmosphere_hybrid_sigma_pressure_coordinate",
"atmosphere_hybrid_height_coordinate",
"atmosphere_sleve_coordinate",
"ocean_sigma_coordinate",
"ocean_s_coordinate",
"ocean_s_coordinate_g1",
"ocean_s_coordinate_g2",
"ocean_sigma_z_coordinate",
"ocean_double_sigma_coordinate",
),
"_CoordinateAxisType": ("GeoZ", "Height", "Pressure"),
"axis": ("Z",),
"cartesian_axis": ("Z",),
"grads_dim": ("z",),
"long_name": (
"model_level_number",
"atmosphere_ln_pressure_coordinate",
"atmosphere_sigma_coordinate",
"atmosphere_hybrid_sigma_pressure_coordinate",
"atmosphere_hybrid_height_coordinate",
"atmosphere_sleve_coordinate",
"ocean_sigma_coordinate",
"ocean_s_coordinate",
"ocean_s_coordinate_g1",
"ocean_s_coordinate_g2",
"ocean_sigma_z_coordinate",
"ocean_double_sigma_coordinate",
),
},
"vertical": {
"standard_name": (
"air_pressure",
"height",
"depth",
"geopotential_height",
"altitude",
"height_above_geopotential_datum",
"height_above_reference_ellipsoid",
"height_above_mean_sea_level",
),
"positive": ("up", "down"),
"long_name": (
"air_pressure",
"height",
"depth",
"geopotential_height",
"altitude",
"height_above_geopotential_datum",
"height_above_reference_ellipsoid",
"height_above_mean_sea_level",
),
},
"X": {
"standard_name": ("projection_x_coordinate", "grid_longitude", "projection_x_angular_coordinate"),
"_CoordinateAxisType": ("GeoX",),
"axis": ("X",),
"cartesian_axis": ("X",),
"grads_dim": ("x",),
"long_name": (
"projection_x_coordinate",
"grid_longitude",
"projection_x_angular_coordinate",
"cell index along first dimension",
),
},
"Y": {
"standard_name": ("projection_y_coordinate", "grid_latitude", "projection_y_angular_coordinate"),
"_CoordinateAxisType": ("GeoY",),
"axis": ("Y",),
"cartesian_axis": ("Y",),
"grads_dim": ("y",),
"long_name": (
"projection_y_coordinate",
"grid_latitude",
"projection_y_angular_coordinate",
"cell index along second dimension",
),
},
"T": {
"standard_name": ("time",),
"_CoordinateAxisType": ("Time",),
"axis": ("T",),
"cartesian_axis": ("T",),
"grads_dim": ("t",),
"long_name": ("time",),
},
"time": {
"standard_name": ("time",),
"_CoordinateAxisType": ("Time",),
"axis": ("T",),
"cartesian_axis": ("T",),
"grads_dim": ("t",),
"long_name": ("time",),
},
}


media_types = {
"httpserver_service": "application/x-netcdf",
"opendap_service": pystac.MediaType.HTML,
Expand Down
19 changes: 8 additions & 11 deletions implementations/CMIP6-UofT/add_CMIP6.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
import argparse
import hashlib
import logging
from datetime import datetime
from typing import Any, Dict, List, Literal, MutableMapping

import pyessv
from colorlog import ColoredFormatter
from extensions import DataCubeHelper
from pydantic import AnyHttpUrl, Field, FieldValidationInfo, field_validator

from STACpopulator import STACpopulatorBase
from STACpopulator.input import THREDDSLoader
from STACpopulator.models import STACItemProperties
from STACpopulator.stac_utils import (
CFJsonItem,
STAC_item_from_metadata,
collection2literal,
)
from STACpopulator.stac_utils import STAC_item_from_metadata, collection2literal

LOGGER = logging.getLogger(__name__)
LOGFORMAT = " %(log_color)s%(levelname)s:%(reset)s %(blue)s[%(name)-30s]%(reset)s %(message)s"
Expand Down Expand Up @@ -150,11 +146,12 @@ def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any])
item = STAC_item_from_metadata(iid, item_data, self.item_properties_model)

# Add datacube extension
# try:
# dc_ext = DatacubeExtension.ext(obj.item, add_if_missing=True)
# dc_ext.apply(dimensions=obj.dimensions(), variables=obj.variables())
# except:
# LOGGER.warning(f"Failed to add Datacube extension to item {item_name}")
try:
dchelper = DataCubeHelper(item_data)
dc_ext = DatacubeExtension.ext(item, add_if_missing=True)
dc_ext.apply(dimensions=dchelper.dimensions(), variables=dchelper.variables())
except:
LOGGER.warning(f"Failed to add Datacube extension to item {item_name}")

# print(obj.item.to_dict())
# return obj.item.to_dict()
Expand Down
Loading

0 comments on commit 3f821ce

Please sign in to comment.