Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add load_stac #127

Merged
merged 14 commits into from
Jul 13, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
from .aggregate import *
from .apply import *
from .general import *
from .load import *
from .merge import *
from .reduce import *
164 changes: 164 additions & 0 deletions openeo_processes_dask/process_implementations/cubes/load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import datetime
import json
import logging
from collections.abc import Iterator
from pathlib import PurePosixPath
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from urllib.parse import unquote, urljoin, urlparse

import planetary_computer as pc
import pyproj
import pystac_client
import stackstac
import xarray as xr
from openeo_pg_parser_networkx.pg_schema import BoundingBox, TemporalInterval
from stac_validator import stac_validator

from openeo_processes_dask.process_implementations.cubes._filter import (
_reproject_bbox,
filter_bands,
filter_bbox,
filter_temporal,
)
from openeo_processes_dask.process_implementations.data_model import RasterCube
from openeo_processes_dask.process_implementations.exceptions import (
NoDataAvailable,
TemporalExtentEmpty,
)

# "NoDataAvailable": {
# "message": "There is no data available for the given extents."
# },
# "TemporalExtentEmpty": {
# "message": "The temporal extent is empty. The second instant in time must always be greater/later than the first instant in time."
# }
__all__ = ["load_stac"]

logger = logging.getLogger(__name__)


def _validate_stac(url):
logger.debug(f"Validating the provided STAC url: {url}")
stac = stac_validator.StacValidate(url)
is_valid_stac = stac.run()
if not is_valid_stac:
raise Exception(

Check warning on line 45 in openeo_processes_dask/process_implementations/cubes/load.py

View check run for this annotation

Codecov / codecov/patch

openeo_processes_dask/process_implementations/cubes/load.py#L45

Added line #L45 was not covered by tests
f"The provided link is not a valid STAC. stac-validator message: {stac.message}"
)
if len(stac.message) == 1:
try:
asset_type = stac.message[0]["asset_type"]
except:
raise Exception(f"stac-validator returned an error: {stac.message}")

Check warning on line 52 in openeo_processes_dask/process_implementations/cubes/load.py

View check run for this annotation

Codecov / codecov/patch

openeo_processes_dask/process_implementations/cubes/load.py#L51-L52

Added lines #L51 - L52 were not covered by tests
else:
raise Exception(

Check warning on line 54 in openeo_processes_dask/process_implementations/cubes/load.py

View check run for this annotation

Codecov / codecov/patch

openeo_processes_dask/process_implementations/cubes/load.py#L54

Added line #L54 was not covered by tests
f"stac-validator returned multiple items, not supported yet. {stac.message}"
)
return asset_type


def _search_for_parent_catalog(url):
parsed_url = urlparse(url)
root_url = parsed_url.scheme + "://" + parsed_url.netloc
catalog_url = root_url
url_parts = PurePosixPath(unquote(parsed_url.path)).parts
collection_id = url_parts[-1]
for p in url_parts:
if p != "/":
catalog_url = catalog_url + "/" + p
try:
asset_type = _validate_stac(catalog_url)
except Exception as e:
logger.debug(e)
continue
if asset_type == "CATALOG":
break
if asset_type != "CATALOG":
raise Exception(

Check warning on line 77 in openeo_processes_dask/process_implementations/cubes/load.py

View check run for this annotation

Codecov / codecov/patch

openeo_processes_dask/process_implementations/cubes/load.py#L61-L77

Added lines #L61 - L77 were not covered by tests
"It was not possible to find the root STAC Catalog starting from the provided Collection."
)
return catalog_url, collection_id

Check warning on line 80 in openeo_processes_dask/process_implementations/cubes/load.py

View check run for this annotation

Codecov / codecov/patch

openeo_processes_dask/process_implementations/cubes/load.py#L80

Added line #L80 was not covered by tests


def load_stac(
url: str,
spatial_extent: Optional[BoundingBox] = None,
temporal_extent: Optional[TemporalInterval] = None,
bands: Optional[list[str]] = None,
properties: Optional[dict] = None,
) -> RasterCube:
asset_type = _validate_stac(url)

if asset_type == "COLLECTION":
# If query parameters are passed, try to get the parent Catalog if possible/exists, to use the /search endpoint
if spatial_extent or temporal_extent or bands or properties:

Check warning on line 94 in openeo_processes_dask/process_implementations/cubes/load.py

View check run for this annotation

Codecov / codecov/patch

openeo_processes_dask/process_implementations/cubes/load.py#L94

Added line #L94 was not covered by tests
# If query parameters are passed, try to get the parent Catalog if possible/exists, to use the /search endpoint
catalog_url, collection_id = _search_for_parent_catalog(url)

Check warning on line 96 in openeo_processes_dask/process_implementations/cubes/load.py

View check run for this annotation

Codecov / codecov/patch

openeo_processes_dask/process_implementations/cubes/load.py#L96

Added line #L96 was not covered by tests

# Check if we are connecting to Microsoft Planetary Computer, where we need to sign the connection
modifier = pc.sign_inplace if "planetarycomputer" in catalog_url else None

Check warning on line 99 in openeo_processes_dask/process_implementations/cubes/load.py

View check run for this annotation

Codecov / codecov/patch

openeo_processes_dask/process_implementations/cubes/load.py#L99

Added line #L99 was not covered by tests

catalog = pystac_client.Client.open(catalog_url, modifier=modifier)

Check warning on line 101 in openeo_processes_dask/process_implementations/cubes/load.py

View check run for this annotation

Codecov / codecov/patch

openeo_processes_dask/process_implementations/cubes/load.py#L101

Added line #L101 was not covered by tests

query_params = {"collections": [collection_id]}

Check warning on line 103 in openeo_processes_dask/process_implementations/cubes/load.py

View check run for this annotation

Codecov / codecov/patch

openeo_processes_dask/process_implementations/cubes/load.py#L103

Added line #L103 was not covered by tests

if spatial_extent is not None:
try:
spatial_extent_4326 = spatial_extent
if spatial_extent.crs is not None:
if not pyproj.crs.CRS(spatial_extent.crs).equals("EPSG:4326"):
spatial_extent_4326 = _reproject_bbox(

Check warning on line 110 in openeo_processes_dask/process_implementations/cubes/load.py

View check run for this annotation

Codecov / codecov/patch

openeo_processes_dask/process_implementations/cubes/load.py#L105-L110

Added lines #L105 - L110 were not covered by tests
spatial_extent, "EPSG:4326"
)
bbox = [

Check warning on line 113 in openeo_processes_dask/process_implementations/cubes/load.py

View check run for this annotation

Codecov / codecov/patch

openeo_processes_dask/process_implementations/cubes/load.py#L113

Added line #L113 was not covered by tests
spatial_extent_4326.west,
spatial_extent_4326.south,
spatial_extent_4326.east,
spatial_extent_4326.north,
]
query_params["bbox"] = bbox
except Exception as e:
raise Exception(f"Unable to parse the provided spatial extent: {e}")

Check warning on line 121 in openeo_processes_dask/process_implementations/cubes/load.py

View check run for this annotation

Codecov / codecov/patch

openeo_processes_dask/process_implementations/cubes/load.py#L119-L121

Added lines #L119 - L121 were not covered by tests

if temporal_extent is not None:
start_date = None
end_date = None
if temporal_extent[0] is not None:
start_date = str(temporal_extent[0].to_numpy())
if temporal_extent[1] is not None:
end_date = str(temporal_extent[1].to_numpy())
query_params["datetime"] = [start_date, end_date]

Check warning on line 130 in openeo_processes_dask/process_implementations/cubes/load.py

View check run for this annotation

Codecov / codecov/patch

openeo_processes_dask/process_implementations/cubes/load.py#L123-L130

Added lines #L123 - L130 were not covered by tests

if properties is not None:
query_params["query"] = properties

Check warning on line 133 in openeo_processes_dask/process_implementations/cubes/load.py

View check run for this annotation

Codecov / codecov/patch

openeo_processes_dask/process_implementations/cubes/load.py#L132-L133

Added lines #L132 - L133 were not covered by tests

items = catalog.search(**query_params).item_collection()

Check warning on line 135 in openeo_processes_dask/process_implementations/cubes/load.py

View check run for this annotation

Codecov / codecov/patch

openeo_processes_dask/process_implementations/cubes/load.py#L135

Added line #L135 was not covered by tests

else:
# Load the whole collection wihout filters
raise Exception(

Check warning on line 139 in openeo_processes_dask/process_implementations/cubes/load.py

View check run for this annotation

Codecov / codecov/patch

openeo_processes_dask/process_implementations/cubes/load.py#L139

Added line #L139 was not covered by tests
f"No parameters for filtering provided. Loading the whole STAC Collection is not supported yet."
)

elif asset_type == "ITEM":
stac_api = pystac_client.stac_api_io.StacApiIO()
stac_dict = json.loads(stac_api.read_text(url))
items = stac_api.stac_object_from_dict(stac_dict)

else:
raise Exception(

Check warning on line 149 in openeo_processes_dask/process_implementations/cubes/load.py

View check run for this annotation

Codecov / codecov/patch

openeo_processes_dask/process_implementations/cubes/load.py#L149

Added line #L149 was not covered by tests
f"The provided URL is a STAC {asset_type}, which is not yet supported. Please provide a valid URL to a STAC Collection or Item."
)

if bands is not None:
stack = stackstac.stack(items, assets=bands)
else:
stack = stackstac.stack(items)

Check warning on line 156 in openeo_processes_dask/process_implementations/cubes/load.py

View check run for this annotation

Codecov / codecov/patch

openeo_processes_dask/process_implementations/cubes/load.py#L156

Added line #L156 was not covered by tests

if spatial_extent is not None:
stack = filter_bbox(stack, spatial_extent)

if temporal_extent is not None and asset_type == "ITEM":
stack = filter_temporal(stack, temporal_extent)

Check warning on line 162 in openeo_processes_dask/process_implementations/cubes/load.py

View check run for this annotation

Codecov / codecov/patch

openeo_processes_dask/process_implementations/cubes/load.py#L162

Added line #L162 was not covered by tests

return stack
8 changes: 8 additions & 0 deletions openeo_processes_dask/process_implementations/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,11 @@ class DimensionMissing(OpenEOException):

class BandFilterParameterMissing(OpenEOException):
pass


class NoDataAvailable(OpenEOException):
pass


class TemporalExtentEmpty(OpenEOException):
pass
2 changes: 1 addition & 1 deletion openeo_processes_dask/specs/openeo-processes
Submodule openeo-processes updated 125 files
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ rioxarray = { version = ">=0.12.0,<1", optional = true }
odc-algo = { version = "==0.2.3", optional = true }
openeo-pg-parser-networkx = { version = ">=2023.5.1", optional = true }
odc-geo = { version = "^0.3.2", optional = true }
stac_validator = { version = ">=3.3.1", optional = true }
stackstac = { version = ">=0.4.3", optional = true }
pystac_client = { version = ">=0.6.1", optional = true }
planetary_computer = { version = ">=0.5.1", optional = true }

[tool.poetry.group.dev.dependencies]
pytest = "^7.2.0"
Expand All @@ -45,7 +49,7 @@ pre-commit = "^2.20.0"
pytest-cov = "^4.0.0"

[tool.poetry.extras]
implementations = ["geopandas", "xarray", "dask", "rasterio", "dask-geopandas", "rioxarray", "openeo-pg-parser-networkx", "odc-geo"]
implementations = ["geopandas", "xarray", "dask", "rasterio", "dask-geopandas", "rioxarray", "openeo-pg-parser-networkx", "odc-geo", "stackstac", "planetary_computer", "pystac_client", "stac_validator"]
experimental = ["odc-algo"]
ml = ["xgboost"]

Expand Down
Loading