From 84c24ddfb57d41a4988f495a90ccb193c01b42c2 Mon Sep 17 00:00:00 2001 From: ValentinaHutter <85164505+ValentinaHutter@users.noreply.github.com> Date: Mon, 18 Nov 2024 10:27:18 +0100 Subject: [PATCH] update labeled array, add function for given axis (#294) --- .../process_implementations/arrays.py | 98 ++++++++++++++----- tests/test_apply.py | 90 +++++++++++++++++ 2 files changed, 164 insertions(+), 24 deletions(-) diff --git a/openeo_processes_dask/process_implementations/arrays.py b/openeo_processes_dask/process_implementations/arrays.py index 9708850..d0f31d2 100644 --- a/openeo_processes_dask/process_implementations/arrays.py +++ b/openeo_processes_dask/process_implementations/arrays.py @@ -50,9 +50,11 @@ ] -def get_labels(data, dimension="labels"): +def get_labels(data, dimension="labels", axis=0): if isinstance(data, xr.DataArray): dimension = data.dims[0] if len(data.dims) == 1 else dimension + if axis: + dimension = data.dims[axis] labels = data[dimension].values data = data.values else: @@ -82,7 +84,7 @@ def array_element( ) if isinstance(data, xr.DataArray): - dim_labels, data = get_labels(data) + dim_labels, data = get_labels(data, axis=axis) if label is not None: if len(dim_labels) == 0: @@ -143,13 +145,10 @@ def array_create_labeled(data: ArrayLike, labels: ArrayLike) -> ArrayLike: def array_modify( - data: ArrayLike, - values: ArrayLike, - index: int, - length: Optional[int] = 1, + data: ArrayLike, values: ArrayLike, index: int, length: Optional[int] = 1, axis=None ) -> ArrayLike: - labels, data = get_labels(data) - values_labels, values = get_labels(values) + labels, data = get_labels(data, axis=axis) + values_labels, values = get_labels(values, axis=axis) if index > len(data): raise ArrayElementNotAvailable( @@ -160,10 +159,25 @@ def array_modify( "At least one label exists in both arrays and the conflict must be resolved before." ) - first = data[:index] - modified = np.append(first, values) - if index + length < len(data): - modified = np.append(modified, data[index + length :]) + def modify(data): + first = data[:index] + modified = np.append(first, values) + if index + length < len(data): + modified = np.append(modified, data[index + length :]) + return modified + + if axis: + if _is_dask_array(data): + if data.size > 50000000: + raise Exception( + f"Cannot load data of shape: {data.shape} into memory. " + ) + # currently, there seems to be no way around loading the values, + # apply_along_axis cannot handle dask arrays + data = data.compute() + modified = np.apply_along_axis(modify, axis=axis, arr=data) + else: + modified = modify(data) if len(labels) > 0: first = labels[:index] @@ -250,7 +264,7 @@ def array_find( reverse: Optional[bool] = False, axis: Optional[int] = None, ) -> np.number: - labels, data = get_labels(data) + labels, data = get_labels(data, axis) if reverse: data = np.flip(data, axis=axis) @@ -290,9 +304,9 @@ def array_find_label(data: ArrayLike, label: Union[str, int, float], dim_labels= def array_filter( - data: ArrayLike, condition: Callable, context: Optional[Any] = None + data: ArrayLike, condition: Callable, context: Optional[Any] = None, axis=None ) -> ArrayLike: - labels, data = get_labels(data) + labels, data = get_labels(data, axis=axis) if not context: context = {} positional_parameters = {"x": 0} @@ -304,7 +318,17 @@ def array_filter( positional_parameters=positional_parameters, named_parameters=named_parameters, ) - data = data[filtered_data.astype(bool)] + if len(np.shape(data)) == 1: + data = data[filtered_data.astype(bool)] + else: + if axis: + n_axis = len(np.shape(data)) + for ax in range(n_axis - 1, -1, -1): + if ax != axis: + filtered_data = filtered_data.astype(bool).all(axis=ax) + filtered_data = np.argwhere(filtered_data).flatten() + data = np.take(data, filtered_data, axis=axis) + return data if len(labels) > 0: labels = labels[filtered_data] data = array_create_labeled(data, labels) @@ -347,10 +371,12 @@ def array_apply( raise Exception(f"Could not apply process as it is not callable. ") -def array_interpolate_linear(data: ArrayLike, dim_labels=None): - x, data = get_labels(data) +def array_interpolate_linear(data: ArrayLike, axis=None, dim_labels=None): + return_label = False + x, data = get_labels(data, axis=axis) if len(x) > 0: dim_labels = x + return_label = True if dim_labels: x = np.array(dim_labels) if np.array(x).dtype.type is np.str_: @@ -362,13 +388,37 @@ def array_interpolate_linear(data: ArrayLike, dim_labels=None): except Exception: x = np.arange(len(data)) if len(x) == 0: - x = np.arange(len(data)) - valid = np.isfinite(data) - if len(x[valid]) < 2: + if axis: + x = np.arange(data.shape[axis]) + else: + x = np.arange(len(data)) + + def interp(data): + valid = np.isfinite(data) + if (valid == 1).all(): + return data + if len(x[valid]) < 2: + return data + data[~valid] = np.interp( + x[~valid], x[valid], data[valid], left=np.nan, right=np.nan + ) + return data - data[~valid] = np.interp( - x[~valid], x[valid], data[valid], left=np.nan, right=np.nan - ) + + if axis: + if _is_dask_array(data): + if data.size > 50000000: + raise Exception( + f"Cannot load data of shape: {data.shape} into memory. " + ) + # currently, there seems to be no way around loading the values, + # apply_along_axis cannot handle dask arrays + data = data.compute() + data = np.apply_along_axis(interp, axis=axis, arr=data) + else: + data = interp(data) + if return_label: + return array_create_labeled(data=data, labels=dim_labels) return data diff --git a/tests/test_apply.py b/tests/test_apply.py index 5582705..be1ac74 100644 --- a/tests/test_apply.py +++ b/tests/test_apply.py @@ -232,6 +232,96 @@ def test_apply_dimension_quantile_processes( assert output_cube_quantile.shape == (6, 5, probability - 1, 4) +@pytest.mark.parametrize("size", [(6, 5, 10, 4)]) +@pytest.mark.parametrize("dtype", [np.float32]) +def test_apply_dimension_interpolate_processes( + temporal_interval, bounding_box, random_raster_data, process_registry +): + input_cube = create_fake_rastercube( + data=random_raster_data, + spatial_extent=bounding_box, + temporal_extent=temporal_interval, + bands=["B02", "B03", "B04", "B08"], + backend="dask", + ) + input_cube[3, 2, 5, 0] = np.nan + + _process_interpolate = partial( + process_registry["array_interpolate_linear"].implementation, + data=ParameterReference(from_parameter="data"), + ) + + output_cube = apply_dimension( + data=input_cube, + process=_process_interpolate, + dimension="t", + ) + assert not np.isfinite(input_cube[3, 2, 5, 0]) + assert np.isfinite(output_cube[3, 2, 5, 0]) + + +@pytest.mark.parametrize("size", [(6, 5, 10, 4)]) +@pytest.mark.parametrize("dtype", [np.float32]) +def test_apply_dimension_modify_processes( + temporal_interval, bounding_box, random_raster_data, process_registry +): + input_cube = create_fake_rastercube( + data=random_raster_data, + spatial_extent=bounding_box, + temporal_extent=temporal_interval, + bands=["B02", "B03", "B04", "B08"], + backend="dask", + ) + + _process_modify = partial( + process_registry["array_modify"].implementation, + data=ParameterReference(from_parameter="data"), + values=[2, 3], + index=3, + ) + + output_cube = apply_dimension( + data=input_cube, + process=_process_modify, + dimension="bands", + ) + assert output_cube.shape == (6, 5, 10, 5) + + +@pytest.mark.parametrize("size", [(6, 5, 10, 4)]) +@pytest.mark.parametrize("dtype", [np.float32]) +def test_apply_dimension_filter_processes( + temporal_interval, bounding_box, random_raster_data, process_registry +): + input_cube = create_fake_rastercube( + data=random_raster_data, + spatial_extent=bounding_box, + temporal_extent=temporal_interval, + bands=["B02", "B03", "B04", "B08"], + backend="dask", + ) + + _condition = partial( + process_registry["gt"].implementation, + x=ParameterReference(from_parameter="x"), + y=10, + ) + + _process_filter = partial( + process_registry["array_filter"].implementation, + data=ParameterReference(from_parameter="data"), + condition=_condition, + ) + + output_cube = apply_dimension( + data=input_cube, + process=_process_filter, + dimension="bands", + ) + print(output_cube) + assert output_cube.shape <= input_cube.shape + + @pytest.mark.parametrize("size", [(6, 5, 4, 4)]) @pytest.mark.parametrize("dtype", [np.float32]) def test_apply_kernel(temporal_interval, bounding_box, random_raster_data):