Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cordex extension and trying to build on higher level abstractions #64

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3c85233
typos
huard Oct 3, 2024
f1c5683
work on extension abstraction and cordex example
huard Oct 10, 2024
c0bdeb2
removed json from .gitignore. Add CORDEX6 json schema
huard Oct 10, 2024
3a4b6ef
merge
huard Oct 10, 2024
b7d9b75
embedding datacube and thredds extension in the base logic
huard Oct 10, 2024
98c6420
got it to work
huard Oct 10, 2024
33d78b9
cordex implementation
huard Oct 11, 2024
0212008
add missing item_geometry_model
huard Oct 11, 2024
04d94df
get the cordex extension to work with the stac-populator cli.
huard Oct 11, 2024
ac0cec2
add some notes and comments
huard Oct 11, 2024
03e5738
clean-up
huard Oct 11, 2024
8cceeab
remove break
huard Oct 11, 2024
8b583bb
suggestions from review
huard Oct 15, 2024
8db0847
added `apply` method to extension helpers
huard Oct 15, 2024
df35431
include schemas in installation source
huard Oct 15, 2024
c6e68e8
Put generic STAC item logic into BaseSTAC class
huard Oct 15, 2024
d76fdd8
docstring
huard Oct 16, 2024
adb1065
added xscen extension. automated the creation of an alias generator f…
huard Oct 16, 2024
fd49161
added license_type to xscen schema. Test over ncml
huard Oct 17, 2024
08ccd42
update README and CHANGES.
huard Oct 17, 2024
2fb0eb6
Merge branch 'xscen_extension' into cordex_extension
huard Oct 17, 2024
312694c
suggestions from Mischa
huard Oct 17, 2024
a847c3f
need to pass properties dict
huard Oct 17, 2024
72a8271
suggestions from review
huard Oct 17, 2024
4817d66
upgrade numpy type conversion function
huard Oct 15, 2024
2b43022
add extensions automatically
huard Oct 17, 2024
118810c
rename _extensions to _helpers
huard Oct 18, 2024
d1d838e
format test data jsons
huard Oct 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ build
*.jsonl
*.json
huard marked this conversation as resolved.
Show resolved Hide resolved

## Exclude schemas
!schemas/**/*.json

# Old Submodule Path
# Could be used locally
pyessv-archive/
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ docker-build:
docker build "$(APP_ROOT)" -f "$(APP_ROOT)/docker/Dockerfile" -t "$(DOCKER_TAG)"

del_docker_volume: stophost
docker volume rm stac-populator_stac-db
docker volume rm docker_stac-db
huard marked this conversation as resolved.
Show resolved Hide resolved

resethost: del_docker_volume starthost

Expand Down
1 change: 1 addition & 0 deletions STACpopulator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@


def add_parser_args(parser: argparse.ArgumentParser) -> dict[str, Callable]:
"""Common CLI arguments for all implementations."""
parser.add_argument(
"--version",
"-V",
Expand Down
Empty file.
360 changes: 360 additions & 0 deletions STACpopulator/extensions/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,360 @@
"""
# Base classes for STAC extensions

What we have:
- `Loader`, which returns attributes.
- An external json schema describing a subset of the attributes returned by the Loader. This schema might preclude
additional properties, so it cannot be applied wholesale to the Loader's output. (maybe overkill since not a lot of schemas can be found in the wild...)
- `data model` describing the content we want included in the catalog. It includes a subset of the schema properties,
as well as additional attributes desired by the catalog admins.

Desiderata:
- Not having to replicate existing validation logic in the schema
- Not having to create a modified schema
- Being able to supplement the schema validation by pydantic validation logic
- Streamline the creation of new data models (reduce boilerplate, allow subclassing)
- Developer-friendly validation error messages


How-to:
- Instructions to create basic datamodel from schema (codegen)



"""
from __future__ import annotations

from datetime import datetime
import json
import jsonschema
import logging
from typing import Any, Dict, Generic, TypeVar, Union, cast, Optional
from pydantic import (BaseModel, create_model, Field, FilePath, field_validator, model_validator, HttpUrl, ConfigDict,
PrivateAttr)
import pystac
from pystac.extensions import item_assets
from pystac.extensions.base import (
ExtensionManagementMixin,
PropertiesExtension,
SummariesExtension,
)
from pystac import STACValidationError
from pystac.extensions.base import S # generic pystac.STACObject
from STACpopulator.models import AnyGeometry, GeoJSONPolygon
from STACpopulator.stac_utils import (
ServiceType,
ncattrs_to_bbox,
ncattrs_to_geometry,
)
import types
from STACpopulator.extensions.datacube import DataCubeHelper
from STACpopulator.extensions.thredds import THREDDSHelper

T = TypeVar("T", pystac.Collection, pystac.Item, pystac.Asset, item_assets.AssetDefinition)

LOGGER = logging.getLogger(__name__)


class ExtensionHelper(BaseModel):
"""Base class for dataset properties going into the catalog.

Subclass this with attributes.

Attributes
----------
_prefix : str
If not None, this prefix is added to ingested data before the jsonschema validation.
_schema_uri : str
URI of the json schema to validate against.
_schema_exclude : list[str]
Properties not meant to be validated by json schema, but still included in the data model.
"""
_prefix: str = PrivateAttr()
_schema_uri: FilePath = PrivateAttr(None)
_schema_exclude: list[str] = PrivateAttr([])

model_config = ConfigDict(populate_by_name=True, extra="ignore")

@model_validator(mode="before")
@classmethod
def validate_jsonschema(cls, data):
"""Validate the data model against the json schema, if given."""
# Load schema
uri = cls._schema_uri.default
if uri is not None:
schema = json.load(open(uri))
Comment on lines +88 to +90
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be improved with requests file-handler, allowing either local or remote URI, but not "blocking" for the PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was unsure how to deal with references within a schema if it was not local.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible with the jsonschema library that we're using to have the library resolve remote references with requests. See the example here (which uses httpx not requests but the idea is the same).

I agree that this can be for another PR though

validator_cls = jsonschema.validators.validator_for(schema)
validator_cls.check_schema(schema)
validator = validator_cls(schema)

attrs = {f"{cls._prefix.default}:{k}": v for (k,v) in data.items() if k not in cls._schema_exclude.default}
errors = list(validator.iter_errors(attrs))
if errors:
raise ValueError(errors)

return data

def apply(self, item, add_if_missing=False):
"""Add extension for the properties of the dataset to the STAC item.
The extension class is created dynamically from the properties.
"""
ExtSubCls = metacls_extension(self._prefix, schema_uri=str(self._schema_uri))
item_ext = ExtSubCls.ext(item, add_if_missing=add_if_missing)
item_ext.apply(self.model_dump(mode="json", by_alias=True))
return item


class BaseSTAC(BaseModel):
"""Base class for STAC item data models.

Attributes
----------
geometry : AnyGeometry
The geometry of the dataset.
bbox : list[float]
The bounding box of the dataset.
start_datetime : datetime
The start datetime of the dataset.
end_datetime : datetime
The end datetime of the dataset.
extensions : list[str]
Name of the class attributes that point to STAC extension helper classes. Those extension classes should have an `apply` method.
"""
# STAC item properties
geometry: AnyGeometry | None
bbox: list[float]
start_datetime: datetime
end_datetime: datetime

extensions: list = []
Copy link
Contributor

@mishaschwartz mishaschwartz Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason these can't be a list of the extensions themselves? Why have this as a list of strings referring to class attributes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I'm not sure then how we would pass the relevant attributes to each helper. I'll try to see if I can do something about this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to let the helpers add them automatically, as they might change over time (e.g.: new extension version with modified attributes).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'll try to explain a bit better (or maybe I'm not understanding the issue).

Right now we do something like this:

class BaseSTAC:
    ...
    def stac_item(self) -> "pystac.Item":
        ...
        for ext in self.extensions:
            getattr(self, ext).apply(item)

class THREDDSCatalogDataModel(BaseSTAC):
    ...
    properties: ExtensionHelper
    datacube: DataCubeHelper
    thredds: THREDDSHelper
    ...
    extensions: list = ["properties", "datacube", "thredds"]

Why can't we do this?

class BaseSTAC:
    ...
    def stac_item(self) -> "pystac.Item":
        ...
        for ext in self.extensions:
            ext.apply(item)

class THREDDSCatalogDataModel(BaseSTAC):
    ...
    extensions: list = [ExtensionHelper, DataCubeHelper, THREDDSHelper]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of data ingestion.

For example, the THREDDS extension needs to be passed THREDDSHelper(data['data']["access_urls"])

I currently manage this with model_validator:

    @model_validator(mode="before")
    @classmethod
    def thredds_helper(cls, data):
        """Instantiate the THREDDSHelper."""
        data["thredds"] = THREDDSHelper(data['data']["access_urls"])
        return data

I'm not sure how we'd do that with your proposal without adding some obscure magic.
What I've now done is automatically detect extensions from the annotation (if it's a Helper subclass). Hope that's ok for now.

Copy link
Collaborator

@fmigneault fmigneault Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was actually misinterpreting the question thinking of extensions as the STAC.Item.extensions (ie: the URI to the applied extensions). I think we should rename the attribute, because that is very confusing. It should be helpers to highlight the use of the helpers that have extended capabilities for applying the STAC extensions (and sometimes non-extension attributes).

Ideally, we should have something like:

helpers: list[Type[Helper]] = [ExtensionHelper, DataCubeHelper, THREDDSHelper]

Because only classes of the helpers are used (not instances), they should be able to receive the item data dynamically for the apply() call.

If the data source is needed, the Helper base class could have it as a required argument for apply() or in its __init__(), whichever makes more sense.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed _extensions to _helpers.

I don't see how I could implement your proposal, without hard-coding the data ingestion logic into the helpers themselves, which would couple them tightly with the Loader, which I thought we should avoid.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nitpick.

I think the BaseSTAC class should define something like:

class BaseSTAC(abc.ABC):
   @classmethod
   @abc.abstractmethod
   def helpers(cls) -> list[Type[Helper]]:
       raise NotImplementedError

This way, any derived class and smart IDE flags right away that helpers must be overridden.

I think helpers should be used instead of _helpers because it is part of the "public" interface of that class, for anyone that derives a new implementation from it.


model_config = ConfigDict(populate_by_name=True, extra="ignore", arbitrary_types_allowed=True)

@property
def uid(self) -> str:
"""Return a unique ID. When subclassing, use a combination of properties uniquely identifying a dataset."""
# TODO: Should this be an abstract method?
import uuid
return str(uuid.uuid4())
Comment on lines +154 to +159
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably be a class method.

As a property, the name feels like an attribute to the current model. However, it seems to be employed as a "generate new UUID" helper method for each STAC Item created.

Also, import at the top.


def stac_item(self) -> "pystac.Item":
"""Create a STAC item and add extensions."""
item = pystac.Item(
id=self.uid,
geometry=self.geometry.model_dump(),
bbox=self.bbox,
properties={
"start_datetime": str(self.start_datetime),
huard marked this conversation as resolved.
Show resolved Hide resolved
"end_datetime": str(self.end_datetime),
},
datetime=None,
)

# Add extensions
for ext in self.extensions:
getattr(self, ext).apply(item)

try:
item.validate()
except STACValidationError as e:
raise Exception("Failed to validate STAC item") from e

return json.loads(json.dumps(item.to_dict()))


class THREDDSCatalogDataModel(BaseSTAC):
"""Base class ingesting attributes loaded by `THREDDSLoader` and creating a STAC item.

This is meant to be subclassed for each extension.

It includes two validation mechanisms:
- pydantic validation using type hints, and
- json schema validation.
"""
# Data from loader
data: dict

# Extensions classes
properties: ExtensionHelper
datacube: DataCubeHelper
thredds: THREDDSHelper

extensions: list = ["properties", "datacube", "thredds"]

model_config = ConfigDict(populate_by_name=True, extra="ignore", arbitrary_types_allowed=True)

@classmethod
def from_data(cls, data):
"""Instantiate class from data provided by THREDDS Loader.
"""
# This is where we match the Loader's output to the STAC item and extensions inputs. If we had multiple
# loaders, that's probably the only thing that would be different between them.
return cls(data=data,
start_datetime=data["groups"]["CFMetadata"]["attributes"]["time_coverage_start"],
end_datetime=data["groups"]["CFMetadata"]["attributes"]["time_coverage_end"],
geometry=ncattrs_to_geometry(data),
bbox=ncattrs_to_bbox(data),
)

@model_validator(mode="before")
@classmethod
def properties_helper(cls, data):
"""Instantiate the properties helper."""
data["properties"] = data['data']['attributes']
return data

@model_validator(mode="before")
@classmethod
def datacube_helper(cls, data):
"""Instantiate the DataCubeHelper."""
data["datacube"] = DataCubeHelper(data['data'])
return data

@model_validator(mode="before")
@classmethod
def thredds_helper(cls, data):
"""Instantiate the THREDDSHelper."""
data["thredds"] = THREDDSHelper(data['data']["access_urls"])
return data


def metacls_extension(name, schema_uri):
"""Create an extension class dynamically from the properties."""
cls_name = f"{name.upper()}Extension"

bases = (MetaExtension,
Generic[T],
PropertiesExtension,
ExtensionManagementMixin[Union[pystac.Asset, pystac.Item, pystac.Collection]]
)

attrs = {"name": name, "schema_uri": schema_uri}
return types.new_class(name=cls_name, bases=bases, kwds=None, exec_body=lambda ns: ns.update(attrs))


class MetaExtension:
name: str
schema_uri: str

def apply(self, properties: dict[str, Any]) -> None:
"""Applies CMIP6 Extension properties to the extended
:class:`~pystac.Item` or :class:`~pystac.Asset`.
"""
for prop, val in properties.items():
self._set_property(prop, val)

@classmethod
def get_schema_uri(cls) -> str:
"""We have already validated the JSON schema."""
return cls.schema_uri

@classmethod
def has_extension(cls, obj: S):
# FIXME: this override should be removed once an official and versioned schema is released
# ignore the original implementation logic for a version regex
# since in our case, the VERSION_REGEX is not fulfilled (ie: using 'main' branch, no tag available...)
ext_uri = cls.get_schema_uri()
return obj.stac_extensions is not None and any(uri == ext_uri for uri in obj.stac_extensions)

@classmethod
def ext(cls, obj: T, add_if_missing: bool = False) -> "Extension[T]":
"""Extends the given STAC Object with properties from the
:stac-ext:`Extension`.

This extension can be applied to instances of :class:`~pystac.Item` or
:class:`~pystac.Asset`.

Raises:

pystac.ExtensionTypeError : If an invalid object type is passed.
"""
cls_map = {pystac.Item: MetaItemExtension}

for key, meta in cls_map.items():
if isinstance(obj, key):
# cls.ensure_has_extension(obj, add_if_missing)
kls = extend_type(key, meta, cls[key])
return cast(cls[T], kls(obj))
else:
raise pystac.ExtensionTypeError(cls._ext_error_message(obj))


def extend_type(stac, cls, ext):
"""Create an extension subclass for different STAC objects.

Note: This is super confusing... we should come up with some better nomenclature.

Parameters
----------
stac: pystac.Item, pystac.Asset, pystac.Collection
The STAC object.
cls: MetaItemExtension
The generic extension class for the STAC object.
ext: MetaExtension[T]
The meta extension class.
"""
cls_name = f"{stac.__name__ }{ext.__name__}"
return types.new_class(cls_name, (cls, ext), {}, lambda ns: ns)


class MetaItemExtension:
"""A concrete implementation of :class:`Extension` on an :class:`~pystac.Item`
that extends the properties of the Item to include properties defined in the
:stac-ext:`Extension`.

This class should generally not be instantiated directly. Instead, call
:meth:`Extension.ext` on an :class:`~pystac.Item` to extend it.
"""
def __init__(self, item: pystac.Item):
self.item = item
self.properties = item.properties

def get_assets(
self,
service_type: Optional[ServiceType] = None,
) -> dict[str, pystac.Asset]:
"""Get the item's assets where eo:bands are defined.

Args:
service_type: If set, filter the assets such that only those with a
matching :class:`~STACpopulator.stac_utils.ServiceType` are returned.

Returns:
Dict[str, Asset]: A dictionary of assets that match ``service_type``
if set or else all of this item's assets were service types are defined.
"""
return {
key: asset
for key, asset in self.item.get_assets().items()
if (service_type is ServiceType and service_type.value in asset.extra_fields)
or any(ServiceType.from_value(field, default=None) is ServiceType for field in asset.extra_fields)
}

def __repr__(self) -> str:
return f"<{self.__class__.__name__} Item id={self.item.id}>"


# TODO: Add the other STAC item meta extensions

def schema_properties(schema: dict) -> list[str]:
"""Return the list of properties described by schema."""
out = []
for key, val in schema["properties"].items():
prefix, name = key.split(":") if ":" in key else (None, key)
out.append(name)
return out


def model_from_schema(model_name, schema: dict):
"""Create pydantic BaseModel from JSON schema."""
type_map = {"string": str, "number": float, "integer": int, "boolean": bool, "array": list, "object": dict,
None: Any}

fields = {}
for key, val in schema["properties"].items():
prefix, name = key.split(":") if ":" in key else (None, key)
typ = type_map[val.get("type")]
default = ... if key in schema["required"] else None
fields[name] = (typ, Field(default, alias=key))
return create_model(model_name, **fields)

2 changes: 1 addition & 1 deletion STACpopulator/extensions/cmip6.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@

SchemaName = Literal["cmip6"]
# FIXME: below reference (used as ID in the schema itself) should be updated once the extension is officially released
# SCHEMA_URI: str = "https://stac-extensions.github.io/cmip6/v1.0.0/schema.json"
# SCHEMA_URI: str = "https://raw.githubusercontent.com/stac-extensions/cmip6/refs/heads/main/json-schema/schema.json"
# below is the temporary resolvable URI
SCHEMA_URI: str = "https://raw.githubusercontent.com/dchandan/stac-extension-cmip6/main/json-schema/schema.json"
PREFIX = f"{get_args(SchemaName)[0]}:"
Expand Down
Loading
Loading