Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add load_stac #127

Merged
merged 14 commits into from
Jul 13, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
from .aggregate import *
from .apply import *
from .general import *
from .load import *
from .merge import *
from .reduce import *
168 changes: 168 additions & 0 deletions openeo_processes_dask/process_implementations/cubes/load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import datetime
import json
import logging
from collections.abc import Iterator
from pathlib import PurePosixPath
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from urllib.parse import unquote, urljoin, urlparse

import planetary_computer as pc
import pyproj
import pystac_client
import stackstac
import xarray as xr
from openeo_pg_parser_networkx.pg_schema import BoundingBox, TemporalInterval
from stac_validator import stac_validator

from openeo_processes_dask.process_implementations.cubes._filter import (
_reproject_bbox,
filter_bands,
filter_bbox,
filter_temporal,
)
from openeo_processes_dask.process_implementations.data_model import RasterCube
from openeo_processes_dask.process_implementations.exceptions import (
NoDataAvailable,
TemporalExtentEmpty,
)

# "NoDataAvailable": {
# "message": "There is no data available for the given extents."
# },
# "TemporalExtentEmpty": {
# "message": "The temporal extent is empty. The second instant in time must always be greater/later than the first instant in time."
# }
__all__ = ["load_stac"]

logger = logging.getLogger(__name__)


def _validate_stac(url):
logger.debug(f"Validating the provided STAC url: {url}")
stac = stac_validator.StacValidate(url)
is_valid_stac = stac.run()
if not is_valid_stac:
raise Exception(
f"The provided link is not a valid STAC. stac-validator message: {stac.message}"
)
if len(stac.message) == 1:
try:
asset_type = stac.message[0]["asset_type"]
except:
raise Exception(f"stac-validator returned an error: {stac.message}")
else:
raise Exception(
f"stac-validator returned multiple items, not supported yet. {stac.message}"
)
return asset_type


def load_stac(
url: str,
spatial_extent: BoundingBox = None,
clausmichele marked this conversation as resolved.
Show resolved Hide resolved
temporal_extent: Optional[TemporalInterval] = None,
bands: Optional[list[str]] = None,
properties: Optional[dict] = None,
**kwargs,
clausmichele marked this conversation as resolved.
Show resolved Hide resolved
) -> RasterCube:
asset_type = _validate_stac(url)

if asset_type == "COLLECTION":
# If query parameters are passed, try to get the parent Catalog if possible/exists, to use the /search endpoint
if spatial_extent or temporal_extent or bands or properties:
# If query parameters are passed, try to get the parent Catalog if possible/exists, to use the /search endpoint
# url = "https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs"
parsed_url = urlparse(url)
root_url = parsed_url.scheme + "://" + parsed_url.netloc
catalog_url = root_url
url_parts = PurePosixPath(unquote(parsed_url.path)).parts
collection_id = url_parts[-1]
for p in url_parts:
if p != "/":
catalog_url = catalog_url + "/" + p
try:
asset_type = _validate_stac(catalog_url)
except Exception as e:
logger.debug(e)
continue
if asset_type == "CATALOG":
break
if asset_type != "CATALOG":
raise Exception(
"It was not possible to find the root STAC Catalog starting from the provided Collection."
)
clausmichele marked this conversation as resolved.
Show resolved Hide resolved

## Check if we are connecting to Microsoft Planetary Computer, where we need to sign the connection
modifier = pc.sign_inplace if "planetarycomputer" in catalog_url else None

catalog = pystac_client.Client.open(catalog_url, modifier=modifier)

query_params = {"collections": [collection_id]}

if spatial_extent is not None:
try:
spatial_extent_4326 = spatial_extent
if spatial_extent.crs is not None:
if not pyproj.crs.CRS(spatial_extent.crs).equals("EPSG:4326"):
spatial_extent_4326 = _reproject_bbox(
spatial_extent, "EPSG:4326"
)
bbox = [
spatial_extent_4326.west,
spatial_extent_4326.south,
spatial_extent_4326.east,
spatial_extent_4326.north,
]
query_params["bbox"] = bbox
except Exception as e:
raise Exception(f"Unable to parse the provided spatial extent: {e}")

if temporal_extent is not None:
start_date = None
end_date = None
if temporal_extent[0] is not None:
start_date = str(temporal_extent[0].to_numpy())
if temporal_extent[1] is not None:
end_date = str(temporal_extent[1].to_numpy())
query_params["datetime"] = [start_date, end_date]

if properties is not None:
query_params["query"] = properties

items = catalog.search(**query_params).item_collection()
if bands is not None:
stack = stackstac.stack(items, assets=bands)
else:
stack = stackstac.stack(items)
if spatial_extent is not None:
stack = filter_bbox(stack, spatial_extent)
return stack

else:
# Load the whole collection wihout filters
raise Exception(
f"No parameters for filtering provided. Loading the whole STAC Collection is not supported yet."
)

elif asset_type == "ITEM":
stac_api = pystac_client.stac_api_io.StacApiIO()
stac_dict = json.loads(stac_api.read_text(url))
stac_item = stac_api.stac_object_from_dict(stac_dict)

assets = None
if bands is not None:
assets = bands

stack = stackstac.stack(stac_item, assets=assets)

if spatial_extent is not None:
stack = filter_bbox(stack, spatial_extent)

if temporal_extent is not None:
stack = filter_temporal(stack, temporal_extent)
clausmichele marked this conversation as resolved.
Show resolved Hide resolved
return stack

else:
raise Exception(
f"The provided URL is a STAC {asset_type}, which is not yet supported. Please provide a valid URL to a STAC Collection."
)
8 changes: 8 additions & 0 deletions openeo_processes_dask/process_implementations/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,11 @@ class DimensionMissing(OpenEOException):

class BandFilterParameterMissing(OpenEOException):
pass


class NoDataAvailable(OpenEOException):
pass


class TemporalExtentEmpty(OpenEOException):
pass
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ rioxarray = { version = ">=0.12.0,<1", optional = true }
odc-algo = { version = "==0.2.3", optional = true }
openeo-pg-parser-networkx = { version = ">=2023.5.1", optional = true }
odc-geo = { version = "^0.3.2", optional = true }
stac_validator = { version = ">=3.3.1", optional = true }
stackstac = { version = ">=0.4.3", optional = true }
pystac_client = { version = ">=0.6.1", optional = true }
planetary_computer = { version = ">=0.5.1", optional = true }

[tool.poetry.group.dev.dependencies]
pytest = "^7.2.0"
Expand All @@ -45,7 +49,7 @@ pre-commit = "^2.20.0"
pytest-cov = "^4.0.0"

[tool.poetry.extras]
implementations = ["geopandas", "xarray", "dask", "rasterio", "dask-geopandas", "rioxarray", "openeo-pg-parser-networkx", "odc-geo"]
implementations = ["geopandas", "xarray", "dask", "rasterio", "dask-geopandas", "rioxarray", "openeo-pg-parser-networkx", "odc-geo", "stackstac", "planetary_computer", "pystac_client", "stac_validator"]
experimental = ["odc-algo"]
ml = ["xgboost"]

Expand Down
20 changes: 20 additions & 0 deletions tests/test_load_stac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pytest

from openeo_processes_dask.process_implementations.cubes.load import load_stac


def test_load_stac(temporal_interval, bounding_box):
clausmichele marked this conversation as resolved.
Show resolved Hide resolved
url = (
"https://planetarycomputer.microsoft.com/api/stac/v1/collections/landsat-c2-l2"
)
output_cube = load_stac(
url=url,
spatial_extent=bounding_box,
temporal_extent=temporal_interval,
bands=["red"],
)

assert output_cube.openeo is not None
assert len(output_cube[output_cube.openeo.x_dim]) > 0
assert len(output_cube[output_cube.openeo.y_dim]) > 0
assert len(output_cube[output_cube.openeo.temporal_dims[0]]) > 0