Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vectorcube applydim with UDF #213

Merged
merged 12 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 40 additions & 69 deletions openeo_driver/ProcessGraphDeserializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,13 +677,13 @@ def apply_neighborhood(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:

@process
def apply_dimension(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
data_cube = args.get_required("data", expected_type=DriverDataCube)
data_cube = args.get_required("data", expected_type=(DriverDataCube, DriverVectorCube))
process = args.get_deep("process", "process_graph", expected_type=dict)
dimension = args.get_required("dimension", expected_type=str)
dimension = args.get_required(
"dimension", expected_type=str, validator=ProcessArgs.validator_one_of(data_cube.get_dimension_names())
)
target_dimension = args.get_optional("target_dimension", default=None, expected_type=str)
context = args.get_optional("context", default=None)
# do check_dimension here for error handling
dimension, band_dim, temporal_dim = _check_dimension(cube=data_cube, dim=dimension, process="apply_dimension")

cube = data_cube.apply_dimension(
process=process, dimension=dimension, target_dimension=target_dimension, context=context, env=env
Expand Down Expand Up @@ -747,10 +747,10 @@ def apply(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
def reduce_dimension(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
data_cube: DriverDataCube = args.get_required("data", expected_type=DriverDataCube)
reduce_pg = args.get_deep("reducer", "process_graph", expected_type=dict)
dimension = args.get_required("dimension", expected_type=str)
dimension = args.get_required(
"dimension", expected_type=str, validator=ProcessArgs.validator_one_of(data_cube.get_dimension_names())
)
context = args.get_optional("context", default=None)
# do check_dimension here for error handling
dimension, band_dim, temporal_dim = _check_dimension(cube=data_cube, dim=dimension, process="reduce_dimension")
return data_cube.reduce_dimension(reducer=reduce_pg, dimension=dimension, context=context, env=env)


Expand Down Expand Up @@ -915,60 +915,35 @@ def rename_labels(args: dict, env: EvalEnv) -> DriverDataCube:
)


def _check_dimension(cube: DriverDataCube, dim: str, process: str):
"""
Helper to check/validate the requested and available dimensions of a cube.

:return: tuple (requested dimension, name of band dimension, name of temporal dimension)
"""
# Note: large part of this is support/adapting for old client
# (pre https://github.com/Open-EO/openeo-python-client/issues/93)
# TODO remove this legacy support when not necessary anymore
metadata = cube.metadata
try:
band_dim = metadata.band_dimension.name
except MetadataException:
band_dim = None
try:
temporal_dim = metadata.temporal_dimension.name
except MetadataException:
temporal_dim = None

if dim not in metadata.dimension_names():
if dim in ["spectral_bands", "bands"] and band_dim:
_log.warning("Probably old client requesting band dimension {d!r},"
" but actual band dimension name is {n!r}".format(d=dim, n=band_dim))
dim = band_dim
elif dim == "temporal" and temporal_dim:
_log.warning("Probably old client requesting temporal dimension {d!r},"
" but actual temporal dimension name is {n!r}".format(d=dim, n=temporal_dim))
dim = temporal_dim
else:
raise ProcessParameterInvalidException(
parameter="dimension", process=process,
reason="got {d!r}, but should be one of {n!r}".format(d=dim, n=metadata.dimension_names()))

return dim, band_dim, temporal_dim


@process
def aggregate_temporal(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
data_cube = args.get_required("data", expected_type=DriverDataCube)
reduce_pg = args.get_deep("reducer", "process_graph", expected_type=dict)
context = args.get_optional("context", default=None)
intervals = args.get_required("intervals")
reduce_pg = args.get_deep("reducer", "process_graph", expected_type=dict)
labels = args.get_optional("labels", default=None)
dimension = _get_time_dim_or_default(args, data_cube)
return data_cube.aggregate_temporal(intervals=intervals,labels=labels,reducer=reduce_pg, dimension=dimension, context=context)
dimension = args.get_optional(
"dimension",
default=lambda: data_cube.metadata.temporal_dimension.name,
validator=ProcessArgs.validator_one_of(data_cube.get_dimension_names()),
)
context = args.get_optional("context", default=None)

return data_cube.aggregate_temporal(
intervals=intervals, labels=labels, reducer=reduce_pg, dimension=dimension, context=context
)


@process_registry_100.add_function
def aggregate_temporal_period(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
data_cube = args.get_required("data", expected_type=DriverDataCube)
period = args.get_required("period")
reduce_pg = args.get_deep("reducer", "process_graph", expected_type=dict)
dimension = args.get_optional(
"dimension",
default=lambda: data_cube.metadata.temporal_dimension.name,
validator=ProcessArgs.validator_one_of(data_cube.get_dimension_names()),
)
context = args.get_optional("context", default=None)
period = args.get_required("period")
dimension = _get_time_dim_or_default(args, data_cube, "aggregate_temporal_period")

dry_run_tracer: DryRunDataTracer = env.get(ENV_DRY_RUN_TRACER)
if dry_run_tracer:
Expand Down Expand Up @@ -1045,24 +1020,6 @@ def _period_to_intervals(start, end, period) -> List[Tuple[pd.Timestamp, pd.Time
return intervals


def _get_time_dim_or_default(args: ProcessArgs, data_cube, process_id="aggregate_temporal"):
dimension = args.get_optional("dimension", None)
if dimension is not None:
dimension, _, _ = _check_dimension(cube=data_cube, dim=dimension, process=process_id)
else:
# default: there is a single temporal dimension
try:
dimension = data_cube.metadata.temporal_dimension.name
except MetadataException:
raise ProcessParameterInvalidException(
parameter="dimension", process=process_id,
reason="No dimension was set, and no temporal dimension could be found. Available dimensions: {n!r}".format(
n=data_cube.metadata.dimension_names()))
# do check_dimension here for error handling
dimension, band_dim, temporal_dim = _check_dimension(cube=data_cube, dim=dimension, process=process_id)
return dimension


@process_registry_100.add_function
def aggregate_spatial(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
cube = args.get_required("data", expected_type=DriverDataCube)
Expand Down Expand Up @@ -1624,14 +1581,28 @@ def load_uploaded_files(args: dict, env: EvalEnv) -> Union[DriverVectorCube,Driv
.returns("vector-cube", schema={"type": "object", "subtype": "vector-cube"})
)
def to_vector_cube(args: Dict, env: EvalEnv):
# TODO: standardization of something like this? https://github.com/Open-EO/openeo-processes/issues/346
_log.warning("Experimental process `to_vector_cube` is deprecated, use `load_geojson` instead")
# TODO: remove this experimental/deprecated process
data = extract_arg(args, "data", process_id="to_vector_cube")
if isinstance(data, dict) and data.get("type") in {"Polygon", "MultiPolygon", "Feature", "FeatureCollection"}:
return env.backend_implementation.vector_cube_cls.from_geojson(data)
# TODO: support more inputs: string with geojson, string with WKT, list of WKT, string with URL to GeoJSON, ...
raise FeatureUnsupportedException(f"Converting {type(data)} to vector cube is not supported")


@process_registry_100.add_function(spec=read_spec("openeo-processes/2.x/proposals/load_geojson.json"))
def load_geojson(args: ProcessArgs, env: EvalEnv) -> DriverVectorCube:
soxofaan marked this conversation as resolved.
Show resolved Hide resolved
data = args.get_required(
"data",
validator=ProcessArgs.validator_geojson_dict(
# TODO: also allow LineString and MultiLineString?
allowed_types=["Point", "MultiPoint", "Polygon", "MultiPolygon", "Feature", "FeatureCollection"]
),
)
properties = args.get_optional("properties", default=[], expected_type=(list, tuple))
vector_cube = env.backend_implementation.vector_cube_cls.from_geojson(data, columns_for_cube=properties)
return vector_cube


@non_standard_process(
ProcessSpec("get_geometries", description="Reads vector data from a file or a URL or get geometries from a FeatureCollection")
.param('filename', description="filename or http url of a vector file", schema={"type": "string"}, required=False)
Expand Down
2 changes: 1 addition & 1 deletion openeo_driver/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.60.0a1"
__version__ = "0.61.0a1"
72 changes: 57 additions & 15 deletions openeo_driver/datacube.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
import abc
import inspect
import io
import logging
import zipfile
from pathlib import Path
from typing import List, Union, Optional, Dict, Any, Tuple, Sequence
import io
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union

import geopandas as gpd
import numpy
import openeo.udf
import pandas
import pyproj
import requests
import shapely.geometry
import shapely.geometry.base
import shapely.ops
import xarray
from pyproj import CRS
import requests

from openeo.metadata import CollectionMetadata
from openeo.util import ensure_dir, str_truncate
import openeo.udf
from openeo_driver.datastructs import SarBackscatterArgs, ResolutionMergeArgs, StacAsset
from pyproj import CRS

from openeo_driver.datastructs import ResolutionMergeArgs, SarBackscatterArgs, StacAsset
from openeo_driver.errors import FeatureUnsupportedException, InternalException
from openeo_driver.util.geometry import GeometryBufferer, validate_geojson_coordinates
from openeo_driver.util.ioformats import IOFORMATS
Expand Down Expand Up @@ -61,6 +62,9 @@ def __eq__(self, o: object) -> bool:
return True
return False

def get_dimension_names(self) -> List[str]:
return self.metadata.dimension_names()

def _not_implemented(self):
"""Helper to raise a NotImplemented exception containing method name"""
raise NotImplementedError("DataCube method not implemented: {m!r}".format(m=inspect.stack()[1].function))
Expand Down Expand Up @@ -221,6 +225,9 @@ class DriverVectorCube:
COLUMN_SELECTION_ALL = "all"
COLUMN_SELECTION_NUMERICAL = "numerical"

# Xarray cube attribute to indicate that it is a dummy cube
CUBE_ATTR_VECTOR_CUBE_DUMMY = "vector_cube_dummy"

def __init__(
self,
geometries: gpd.GeoDataFrame,
Expand Down Expand Up @@ -281,14 +288,21 @@ def from_geodataframe(
elif columns_for_cube == cls.COLUMN_SELECTION_ALL:
columns_for_cube = available_columns
elif isinstance(columns_for_cube, list):
# TODO #114 limit to subset with available columns (and automatically fill in missing columns with nodata)?
columns_for_cube = columns_for_cube
else:
raise ValueError(columns_for_cube)
assert isinstance(columns_for_cube, list)

if columns_for_cube:
cube_df = data[columns_for_cube]
existing = [c for c in columns_for_cube if c in available_columns]
to_add = [c for c in columns_for_cube if c not in available_columns]
if existing:
cube_df = data[existing]
if to_add:
cube_df.loc[:, to_add] = numpy.nan
else:
cube_df = pandas.DataFrame(index=data.index, columns=to_add)

# TODO: remove `columns_for_cube` from geopandas data frame?
# Enabling that triggers failure of som existing tests that use `aggregate_spatial`
# to "enrich" a vector cube with pre-existing properties
Expand All @@ -308,7 +322,14 @@ def from_geodataframe(
return cls(geometries=geometries_df, cube=cube)

else:
return cls(geometries=data)
# Use 1D dummy cube of NaN values
cube: xarray.DataArray = xarray.DataArray(
data=numpy.full(shape=[data.shape[0]], fill_value=numpy.nan),
dims=[cls.DIM_GEOMETRIES],
coords={cls.DIM_GEOMETRIES: data.geometry.index.to_list()},
attrs={cls.CUBE_ATTR_VECTOR_CUBE_DUMMY: True},
)
return cls(geometries=data, cube=cube)

@classmethod
def from_fiona(
Expand Down Expand Up @@ -400,7 +421,7 @@ def _as_geopandas_df(
"""Join geometries and cube as a geopandas dataframe"""
# TODO: avoid copy?
df = self._geometries.copy(deep=True)
if self._cube is not None:
if self._cube is not None and not self._cube.attrs.get(self.CUBE_ATTR_VECTOR_CUBE_DUMMY):
assert self._cube.dims[0] == self.DIM_GEOMETRIES
# TODO: better way to combine cube with geometries
# Flatten multiple (non-geometry) dimensions from cube to new properties in geopandas dataframe
Expand All @@ -426,6 +447,16 @@ def to_wkt(self) -> List[str]:
wkts = [str(g) for g in self._geometries.geometry]
return wkts

def to_internal_json(self) -> dict:
"""
Export to an internal JSON-style representation.
Subject to change any time: not intended for public consumption, just for (unit) test purposes.
"""
return {
"geometries": shapely.geometry.mapping(self._geometries),
"cube": self._cube.to_dict(data="array") if self._cube is not None else None,
}

def get_crs(self) -> pyproj.CRS:
return self._geometries.crs or pyproj.CRS.from_epsg(4326)

Expand Down Expand Up @@ -485,7 +516,7 @@ def to_legacy_save_result(self) -> Union["AggregatePolygonResult", "JSONResult"]
# TODO: eliminate these legacy, non-standard formats?
from openeo_driver.save_result import AggregatePolygonResult, JSONResult

if self._cube is None:
if self._cube is None or self._cube.attrs.get(self.CUBE_ATTR_VECTOR_CUBE_DUMMY):
# No cube: no real data to return (in legacy style), so let's just return a `null` per geometry.
return JSONResult(data=[None] * self.geometry_count())

Expand All @@ -511,6 +542,12 @@ def to_legacy_save_result(self) -> Union["AggregatePolygonResult", "JSONResult"]
f"Unsupported cube configuration {cube.dims} for _write_legacy_aggregate_polygon_result_json"
)

def get_dimension_names(self) -> List[str]:
soxofaan marked this conversation as resolved.
Show resolved Hide resolved
if self._cube is None:
return [self.DIM_GEOMETRIES]
else:
return list(self._cube.dims)

def get_bounding_box(self) -> Tuple[float, float, float, float]:
# TODO: cache bounding box?
# TODO #114 #141 Open-EO/openeo-geopyspark-driver#239: option to buffer point geometries (if any)
Expand Down Expand Up @@ -596,18 +633,23 @@ def apply_dimension(
context: Optional[dict] = None,
env: EvalEnv,
) -> "DriverVectorCube":
# Is callback a single run_udf node process?
single_run_udf = SingleRunUDFProcessGraph.parse_or_none(process)

if single_run_udf:
# Process with single "run_udf" node
# TODO: check provided dimension with actual dimension of the cube
if dimension in (self.DIM_BANDS, self.DIM_PROPERTIES) and target_dimension is None:
if (
dimension == self.DIM_GEOMETRIES
or (dimension in {self.DIM_BANDS, self.DIM_PROPERTIES}.intersection(self.get_dimension_names()))
and target_dimension is None
):
log.warning(
f"Using experimental feature: DriverVectorCube.apply_dimension along dim {dimension} and empty cube"
)
# TODO: this is non-standard special case: vector cube with only geometries, but no "cube" data
# TODO: data chunking (e.g. large feature collections)
gdf = self._as_geopandas_df()
feature_collection = openeo.udf.FeatureCollection(id="_", data=gdf)
# TODO: dedicated UDF signature to indicate to work on vector cube through a feature collection based API
udf_data = openeo.udf.UdfData(
proj={"EPSG": self._geometries.crs.to_epsg()},
feature_collection_list=[feature_collection],
Expand Down
8 changes: 5 additions & 3 deletions openeo_driver/dummy/dummy_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,12 @@ def __init__(self, metadata: CollectionMetadata = None):
self.apply_tiles = Mock(name="apply_tiles", return_value=self)
self.apply_tiles_spatiotemporal = Mock(name="apply_tiles_spatiotemporal", return_value=self)

# Create mock methods for remaining data cube methods that are not yet defined
already_defined = set(DummyDataCube.__dict__.keys()).union(self.__dict__.keys())
# Create mock methods for remaining DriverDataCube methods that are not yet defined directly by DummyDataCube
to_keep = set(DummyDataCube.__dict__.keys()).union(self.__dict__.keys())
to_keep.update(m for m in DriverDataCube.__dict__.keys() if m.startswith("_"))
to_keep.update(["get_dimension_names"])
for name, method in DriverDataCube.__dict__.items():
if not name.startswith('_') and name not in already_defined and callable(method):
if not name in to_keep and callable(method):
setattr(self, name, Mock(name=name, return_value=self))

for name in [n for n, m in DummyDataCube.__dict__.items() if getattr(m, '_mock_side_effect', False)]:
Expand Down
Loading