Skip to content

Commit

Permalink
update labeled array, add function for given axis (#294)
Browse files Browse the repository at this point in the history
  • Loading branch information
ValentinaHutter authored Nov 18, 2024
1 parent afa06cc commit 84c24dd
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 24 deletions.
98 changes: 74 additions & 24 deletions openeo_processes_dask/process_implementations/arrays.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@
]


def get_labels(data, dimension="labels"):
def get_labels(data, dimension="labels", axis=0):
if isinstance(data, xr.DataArray):
dimension = data.dims[0] if len(data.dims) == 1 else dimension
if axis:
dimension = data.dims[axis]
labels = data[dimension].values
data = data.values
else:
Expand Down Expand Up @@ -82,7 +84,7 @@ def array_element(
)

if isinstance(data, xr.DataArray):
dim_labels, data = get_labels(data)
dim_labels, data = get_labels(data, axis=axis)

if label is not None:
if len(dim_labels) == 0:
Expand Down Expand Up @@ -143,13 +145,10 @@ def array_create_labeled(data: ArrayLike, labels: ArrayLike) -> ArrayLike:


def array_modify(
data: ArrayLike,
values: ArrayLike,
index: int,
length: Optional[int] = 1,
data: ArrayLike, values: ArrayLike, index: int, length: Optional[int] = 1, axis=None
) -> ArrayLike:
labels, data = get_labels(data)
values_labels, values = get_labels(values)
labels, data = get_labels(data, axis=axis)
values_labels, values = get_labels(values, axis=axis)

if index > len(data):
raise ArrayElementNotAvailable(
Expand All @@ -160,10 +159,25 @@ def array_modify(
"At least one label exists in both arrays and the conflict must be resolved before."
)

first = data[:index]
modified = np.append(first, values)
if index + length < len(data):
modified = np.append(modified, data[index + length :])
def modify(data):
first = data[:index]
modified = np.append(first, values)
if index + length < len(data):
modified = np.append(modified, data[index + length :])
return modified

if axis:
if _is_dask_array(data):
if data.size > 50000000:
raise Exception(
f"Cannot load data of shape: {data.shape} into memory. "
)
# currently, there seems to be no way around loading the values,
# apply_along_axis cannot handle dask arrays
data = data.compute()
modified = np.apply_along_axis(modify, axis=axis, arr=data)
else:
modified = modify(data)

if len(labels) > 0:
first = labels[:index]
Expand Down Expand Up @@ -250,7 +264,7 @@ def array_find(
reverse: Optional[bool] = False,
axis: Optional[int] = None,
) -> np.number:
labels, data = get_labels(data)
labels, data = get_labels(data, axis)

if reverse:
data = np.flip(data, axis=axis)
Expand Down Expand Up @@ -290,9 +304,9 @@ def array_find_label(data: ArrayLike, label: Union[str, int, float], dim_labels=


def array_filter(
data: ArrayLike, condition: Callable, context: Optional[Any] = None
data: ArrayLike, condition: Callable, context: Optional[Any] = None, axis=None
) -> ArrayLike:
labels, data = get_labels(data)
labels, data = get_labels(data, axis=axis)
if not context:
context = {}
positional_parameters = {"x": 0}
Expand All @@ -304,7 +318,17 @@ def array_filter(
positional_parameters=positional_parameters,
named_parameters=named_parameters,
)
data = data[filtered_data.astype(bool)]
if len(np.shape(data)) == 1:
data = data[filtered_data.astype(bool)]
else:
if axis:
n_axis = len(np.shape(data))
for ax in range(n_axis - 1, -1, -1):
if ax != axis:
filtered_data = filtered_data.astype(bool).all(axis=ax)
filtered_data = np.argwhere(filtered_data).flatten()
data = np.take(data, filtered_data, axis=axis)
return data
if len(labels) > 0:
labels = labels[filtered_data]
data = array_create_labeled(data, labels)
Expand Down Expand Up @@ -347,10 +371,12 @@ def array_apply(
raise Exception(f"Could not apply process as it is not callable. ")


def array_interpolate_linear(data: ArrayLike, dim_labels=None):
x, data = get_labels(data)
def array_interpolate_linear(data: ArrayLike, axis=None, dim_labels=None):
return_label = False
x, data = get_labels(data, axis=axis)
if len(x) > 0:
dim_labels = x
return_label = True
if dim_labels:
x = np.array(dim_labels)
if np.array(x).dtype.type is np.str_:
Expand All @@ -362,13 +388,37 @@ def array_interpolate_linear(data: ArrayLike, dim_labels=None):
except Exception:
x = np.arange(len(data))
if len(x) == 0:
x = np.arange(len(data))
valid = np.isfinite(data)
if len(x[valid]) < 2:
if axis:
x = np.arange(data.shape[axis])
else:
x = np.arange(len(data))

def interp(data):
valid = np.isfinite(data)
if (valid == 1).all():
return data
if len(x[valid]) < 2:
return data
data[~valid] = np.interp(
x[~valid], x[valid], data[valid], left=np.nan, right=np.nan
)

return data
data[~valid] = np.interp(
x[~valid], x[valid], data[valid], left=np.nan, right=np.nan
)

if axis:
if _is_dask_array(data):
if data.size > 50000000:
raise Exception(
f"Cannot load data of shape: {data.shape} into memory. "
)
# currently, there seems to be no way around loading the values,
# apply_along_axis cannot handle dask arrays
data = data.compute()
data = np.apply_along_axis(interp, axis=axis, arr=data)
else:
data = interp(data)
if return_label:
return array_create_labeled(data=data, labels=dim_labels)
return data


Expand Down
90 changes: 90 additions & 0 deletions tests/test_apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,96 @@ def test_apply_dimension_quantile_processes(
assert output_cube_quantile.shape == (6, 5, probability - 1, 4)


@pytest.mark.parametrize("size", [(6, 5, 10, 4)])
@pytest.mark.parametrize("dtype", [np.float32])
def test_apply_dimension_interpolate_processes(
temporal_interval, bounding_box, random_raster_data, process_registry
):
input_cube = create_fake_rastercube(
data=random_raster_data,
spatial_extent=bounding_box,
temporal_extent=temporal_interval,
bands=["B02", "B03", "B04", "B08"],
backend="dask",
)
input_cube[3, 2, 5, 0] = np.nan

_process_interpolate = partial(
process_registry["array_interpolate_linear"].implementation,
data=ParameterReference(from_parameter="data"),
)

output_cube = apply_dimension(
data=input_cube,
process=_process_interpolate,
dimension="t",
)
assert not np.isfinite(input_cube[3, 2, 5, 0])
assert np.isfinite(output_cube[3, 2, 5, 0])


@pytest.mark.parametrize("size", [(6, 5, 10, 4)])
@pytest.mark.parametrize("dtype", [np.float32])
def test_apply_dimension_modify_processes(
temporal_interval, bounding_box, random_raster_data, process_registry
):
input_cube = create_fake_rastercube(
data=random_raster_data,
spatial_extent=bounding_box,
temporal_extent=temporal_interval,
bands=["B02", "B03", "B04", "B08"],
backend="dask",
)

_process_modify = partial(
process_registry["array_modify"].implementation,
data=ParameterReference(from_parameter="data"),
values=[2, 3],
index=3,
)

output_cube = apply_dimension(
data=input_cube,
process=_process_modify,
dimension="bands",
)
assert output_cube.shape == (6, 5, 10, 5)


@pytest.mark.parametrize("size", [(6, 5, 10, 4)])
@pytest.mark.parametrize("dtype", [np.float32])
def test_apply_dimension_filter_processes(
temporal_interval, bounding_box, random_raster_data, process_registry
):
input_cube = create_fake_rastercube(
data=random_raster_data,
spatial_extent=bounding_box,
temporal_extent=temporal_interval,
bands=["B02", "B03", "B04", "B08"],
backend="dask",
)

_condition = partial(
process_registry["gt"].implementation,
x=ParameterReference(from_parameter="x"),
y=10,
)

_process_filter = partial(
process_registry["array_filter"].implementation,
data=ParameterReference(from_parameter="data"),
condition=_condition,
)

output_cube = apply_dimension(
data=input_cube,
process=_process_filter,
dimension="bands",
)
print(output_cube)
assert output_cube.shape <= input_cube.shape


@pytest.mark.parametrize("size", [(6, 5, 4, 4)])
@pytest.mark.parametrize("dtype", [np.float32])
def test_apply_kernel(temporal_interval, bounding_box, random_raster_data):
Expand Down

0 comments on commit 84c24dd

Please sign in to comment.