From f85e9293df2b0af192ab31ad4db4e0d57f929967 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Thu, 6 Oct 2022 19:19:19 +0200 Subject: [PATCH] fixup! fixup! fixup! Issue #114/#141 convert inline GeoJSON in aggregate_spatial to VectorCube --- openeo_driver/ProcessGraphDeserializer.py | 9 +++- openeo_driver/datacube.py | 49 ++++++++++---------- tests/data/pg/1.0/run_udf_on_timeseries.json | 4 ++ tests/test_views_execute.py | 6 ++- 4 files changed, 41 insertions(+), 27 deletions(-) diff --git a/openeo_driver/ProcessGraphDeserializer.py b/openeo_driver/ProcessGraphDeserializer.py index 08eab421..d88982a1 100644 --- a/openeo_driver/ProcessGraphDeserializer.py +++ b/openeo_driver/ProcessGraphDeserializer.py @@ -1321,6 +1321,10 @@ def run_udf(args: dict, env: EvalEnv): # TODO #114 add support for DriverVectorCube if isinstance(data, AggregatePolygonResult): pass + if isinstance(data, DriverVectorCube): + # TODO: this is temporary adaption to old style save results. Better have proper DriverVectorCube support in run_udf? + data = data.to_legacy_save_result() + if isinstance(data, (DelayedVector, dict)): if isinstance(data, dict): data = DelayedVector.from_json_dict(data) @@ -1338,7 +1342,10 @@ def run_udf(args: dict, env: EvalEnv): ) else: raise ProcessParameterInvalidException( - parameter='data', process='run_udf', reason=f"Invalid data type {type(data)!r} expected raster-cube.") + parameter="data", + process="run_udf", + reason=f"Unsupported data type {type(data)}.", + ) _log.info(f"[run_udf] Running UDF {str_truncate(udf, width=256)!r} on {data!r}") result_data = openeo.udf.run_udf_code(udf, data) diff --git a/openeo_driver/datacube.py b/openeo_driver/datacube.py index 5025f8a0..9b3b3a79 100644 --- a/openeo_driver/datacube.py +++ b/openeo_driver/datacube.py @@ -280,40 +280,39 @@ def write_assets( def to_multipolygon(self) -> shapely.geometry.MultiPolygon: return shapely.ops.unary_union(self._geometries.geometry) - def _write_legacy_aggregate_polygon_result_json( - self, directory: Path - ) -> Dict[str, StacAsset]: - """Export to legacy AggregatePolygonResult JSON format""" - # TODO: eliminate this legacy, non-standard format? + def to_legacy_save_result(self) -> Union["AggregatePolygonResult", "JSONResult"]: + """ + Export to legacy AggregatePolygonResult/JSONResult objects. + Provided as temporary adaption layer while migrating to real vector cubes. + """ + # TODO: eliminate these legacy, non-standard format? from openeo_driver.save_result import AggregatePolygonResult, JSONResult - def write_spatiotemporal(cube: xarray.DataArray) -> Dict[str, StacAsset]: - """Export to legacy AggregatePolygonResult JSON format""" + cube = self._cube + # TODO: more flexible temporal/band dimension detection? + if cube.dims == (self.DIM_GEOMETRIES, "t"): + # Add single band dimension + cube = cube.expand_dims({"bands": ["band"]}, axis=-1) + if cube.dims == (self.DIM_GEOMETRIES, "t", "bands"): cube = cube.transpose("t", self.DIM_GEOMETRIES, "bands") timeseries = { t.item(): t_slice.values.tolist() for t, t_slice in zip(cube.coords["t"], cube) } - result = AggregatePolygonResult(timeseries=timeseries, regions=self) - return result.write_assets(directory=directory / "ignored") - - def write_spatial(cube: xarray.DataArray) -> Dict[str, StacAsset]: + return AggregatePolygonResult(timeseries=timeseries, regions=self) + elif cube.dims == (self.DIM_GEOMETRIES, "bands"): cube = cube.transpose(self.DIM_GEOMETRIES, "bands") - result = JSONResult(data=cube.values.tolist()) - return result.write_assets(directory / "ignored") + return JSONResult(data=cube.values.tolist()) + raise ValueError( + f"Unsupported cube configuration {cube.dims} for _write_legacy_aggregate_polygon_result_json" + ) - cube = self._cube - # TODO: more flexible temporal/band dimension detection? - if cube.dims == (self.DIM_GEOMETRIES, "t"): - return write_spatiotemporal(cube.expand_dims({"bands": ["band"]}, axis=-1)) - elif cube.dims == (self.DIM_GEOMETRIES, "t", "bands"): - return write_spatiotemporal(cube) - elif cube.dims == (self.DIM_GEOMETRIES, "bands"): - return write_spatial(cube) - else: - raise ValueError( - f"Unsupported cube configuration {cube.dims} for _write_legacy_aggregate_polygon_result_json" - ) + def _write_legacy_aggregate_polygon_result_json( + self, directory: Path + ) -> Dict[str, StacAsset]: + """Export to legacy AggregatePolygonResult JSON format""" + # TODO: eliminate this legacy, non-standard format? + return self.to_legacy_save_result().write_assets(directory=directory) def get_bounding_box(self) -> Tuple[float, float, float, float]: return tuple(self._geometries.total_bounds) diff --git a/tests/data/pg/1.0/run_udf_on_timeseries.json b/tests/data/pg/1.0/run_udf_on_timeseries.json index 7c4d107f..b25cc3e8 100644 --- a/tests/data/pg/1.0/run_udf_on_timeseries.json +++ b/tests/data/pg/1.0/run_udf_on_timeseries.json @@ -12,6 +12,10 @@ "temporal_extent": [ "2017-11-21", "2017-11-21" + ], + "bands": [ + "B02", + "B03" ] } }, diff --git a/tests/test_views_execute.py b/tests/test_views_execute.py index ed2f4af0..1611789d 100644 --- a/tests/test_views_execute.py +++ b/tests/test_views_execute.py @@ -1115,7 +1115,11 @@ def test_run_udf_on_json(api100, udf_code): preprocess=lambda s: s.replace('"PLACEHOLDER_UDF"', repr(udf_code)) ) resp = api100.check_result(process_graph) - assert resp.json == {'len': 2, 'keys': ['2015-07-06T00:00:00Z', '2015-08-22T00:00:00Z'], 'values': [[[2.345]], [[None]]]} + assert resp.json == { + "len": 2, + "keys": ["2015-07-06T00:00:00Z", "2015-08-22T00:00:00Z"], + "values": [[[2.345, None]], [[2.0, 3.0]]], + } @pytest.mark.parametrize("udf_code", [