Skip to content

Commit

Permalink
Merge branch 'feature/load_stac_dev' of github.com:interTwin-eu/opene…
Browse files Browse the repository at this point in the history
…o-processes-dask into feature/load_stac_dev
  • Loading branch information
clausmichele committed Oct 8, 2024
2 parents 3840c31 + 35fedea commit e7a4ec2
Show file tree
Hide file tree
Showing 18 changed files with 975 additions and 44 deletions.
11 changes: 5 additions & 6 deletions openeo_processes_dask/process_implementations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .inspect import *
from .logic import *
from .math import *
from .text import *

try:
from .ml import *
Expand All @@ -16,12 +17,10 @@
"Did not load machine learning processes due to missing dependencies: Install them like this: `pip install openeo-processes-dask[implementations, ml]`"
)

# try:
# from .experimental import *
# except ImportError as e:
# logger.warning(
# "Did not experimental processes due to missing dependencies: Install them like this: `pip install openeo-processes-dask[implementations, experimental]`"
# )
try:
from .experimental import *
except ImportError as e:
logger.warning("Did not load experimental processes.")

import rioxarray as rio # Required for the .rio accessor on xarrays.

Expand Down
68 changes: 63 additions & 5 deletions openeo_processes_dask/process_implementations/arrays.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
import itertools
import logging
from typing import Any, Optional
from typing import Any, Callable, Optional, Union

import dask.array as da
import numpy as np
Expand All @@ -10,6 +11,7 @@
from openeo_pg_parser_networkx.pg_schema import DateTime
from xarray.core.duck_array_ops import isnull, notnull

from openeo_processes_dask.process_implementations.comparison import is_valid
from openeo_processes_dask.process_implementations.cubes.utils import _is_dask_array
from openeo_processes_dask.process_implementations.exceptions import (
ArrayElementNotAvailable,
Expand All @@ -30,11 +32,14 @@
"array_contains",
"array_find",
"array_labels",
"array_apply",
"array_interpolate_linear",
"first",
"last",
"order",
"rearrange",
"sort",
"count",
]


Expand Down Expand Up @@ -161,10 +166,8 @@ def array_contains(data: ArrayLike, value: Any, axis=None) -> bool:
value_is_valid = True
if len(np.shape(data)) != 1 and axis is None:
return False
if not value_is_valid:
if not value_is_valid or pd.isnull(value):
return False
if pd.isnull(value):
return np.isnan(data).any(axis=axis)
else:
return np.isin(data, value).any(axis=axis)

Expand All @@ -184,8 +187,14 @@ def array_find(
idxs = (data == value).argmax(axis=axis)

mask = ~np.array((data == value).any(axis=axis))
if np.isnan(value):
if not isinstance(value, str) and np.isnan(value):
mask = True
if reverse:
if axis is None:
size = data.size
else:
size = data.shape[axis]
idxs = size - 1 - idxs

logger.warning(
"array_find: numpy has no sentinel value for missing data in integer arrays, therefore np.masked_array is used to return the indices of found elements. Further operations might fail if not defined for masked arrays."
Expand All @@ -210,6 +219,35 @@ def array_labels(data: ArrayLike) -> ArrayLike:
return np.arange(len(data))


def array_apply(
data: ArrayLike, process: Callable, context: Optional[Any] = None
) -> ArrayLike:
if not context:
context = {}
positional_parameters = {"x": 0}
named_parameters = {"x": data, "context": context}
if callable(process):
process_to_apply = np.vectorize(process)
return process_to_apply(
data,
positional_parameters=positional_parameters,
named_parameters=named_parameters,
)


def array_interpolate_linear(data: ArrayLike):
if isinstance(data, list):
data = np.array(data)
x = np.arange(len(data))
valid = np.isfinite(data)
if len(x[valid]) < 2:
return data
data[~valid] = np.interp(
x[~valid], x[valid], data[valid], left=np.nan, right=np.nan
)
return data


def first(
data: ArrayLike,
ignore_nodata: Optional[bool] = True,
Expand Down Expand Up @@ -337,3 +375,23 @@ def sort(
return data_sorted_flip
elif nodata == True: # default sort behaviour, np.nan values are put last
return data_sorted


def count(
data: ArrayLike,
condition: Optional[Union[Callable, bool]] = None,
context: Any = None,
axis=None,
keepdims=False,
):
if condition is None:
valid = is_valid(data)
return np.nansum(valid, axis=axis, keepdims=keepdims)
if condition is True:
return np.nansum(np.ones_like(data), axis=axis, keepdims=keepdims)
if callable(condition):
if not context:
context = {}
context.pop("x", None)
count = condition(x=data, **context)
return np.nansum(count, axis=axis, keepdims=keepdims)
52 changes: 41 additions & 11 deletions openeo_processes_dask/process_implementations/cubes/_filter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import logging
import warnings
from typing import Callable
from typing import Any, Callable, Optional

import dask.array as da
import geopandas as gpd
Expand All @@ -21,6 +21,7 @@
BandFilterParameterMissing,
DimensionMissing,
DimensionNotAvailable,
TemporalExtentEmpty,
TooManyDimensions,
)

Expand Down Expand Up @@ -69,15 +70,33 @@ def filter_temporal(
# https://github.com/numpy/numpy/issues/23904
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
start_time = extent[0]
if start_time is not None:
if isinstance(extent, TemporalInterval):
start_time = extent.start
end_time = extent.end
else:
start_time = extent[0]
end_time = extent[1]

if isinstance(start_time, str):
start_time = np.datetime64(start_time)
elif start_time is not None:
start_time = start_time.to_numpy()
end_time = extent[1]
if end_time is not None:
end_time = extent[1].to_numpy() - np.timedelta64(1, "ms")

if isinstance(end_time, str):
end_time = np.datetime64(end_time)
elif end_time is not None:
end_time = end_time.to_numpy()

# The second element is the end of the temporal interval.
# The specified instance in time is excluded from the interval.
# See https://processes.openeo.org/#filter_temporal
if end_time is not None:
end_time -= np.timedelta64(1, "ms")

if start_time is not None and end_time is not None and end_time < start_time:
raise TemporalExtentEmpty(
"The temporal extent is empty. The second instant in time must always be greater/later than the first instant in time."
)

data = data.where(~np.isnat(data[applicable_temporal_dimension]), drop=True)
filtered = data.loc[
Expand All @@ -87,16 +106,27 @@ def filter_temporal(
return filtered


def filter_labels(data: RasterCube, condition: Callable, dimension: str) -> RasterCube:
def filter_labels(
data: RasterCube, condition: Callable, dimension: str, context: Optional[Any] = None
) -> RasterCube:
if dimension not in data.dims:
raise DimensionNotAvailable(
f"Provided dimension ({dimension}) not found in data.dims: {data.dims}"
)

labels = data[dimension].values
label_mask = condition(x=labels)
label = labels[label_mask]
data = data.sel(**{dimension: label})
labels = np.array(data[dimension].values)
if not context:
context = {}
positional_parameters = {"x": 0}
named_parameters = {"x": labels, "context": context}
filter_condition = np.vectorize(condition)
filtered_labels = filter_condition(
labels,
positional_parameters=positional_parameters,
named_parameters=named_parameters,
)
label = np.argwhere(filtered_labels)
data = data.isel(**{dimension: label[0]})
return data


Expand Down
140 changes: 137 additions & 3 deletions openeo_processes_dask/process_implementations/cubes/general.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Optional
import copy
from typing import Optional, Union

import numpy as np
import xarray as xr
Expand All @@ -11,7 +12,15 @@
DimensionNotAvailable,
)

__all__ = ["create_raster_cube", "drop_dimension", "dimension_labels", "add_dimension"]
__all__ = [
"create_data_cube",
"drop_dimension",
"dimension_labels",
"add_dimension",
"rename_dimension",
"rename_labels",
"trim_cube",
]


def drop_dimension(data: RasterCube, name: str) -> RasterCube:
Expand All @@ -26,10 +35,27 @@ def drop_dimension(data: RasterCube, name: str) -> RasterCube:
return data.drop_vars(name).squeeze(name)


def create_raster_cube() -> RasterCube:
def create_data_cube() -> RasterCube:
return xr.DataArray()


def trim_cube(data) -> RasterCube:
for dim in data.dims:
if (
dim in data.openeo.temporal_dims
or dim in data.openeo.band_dims
or dim in data.openeo.other_dims
):
values = data[dim].values
other_dims = [d for d in data.dims if d != dim]
available_data = values[(np.isnan(data)).all(dim=other_dims) == 0]
if len(available_data) == 0:
raise ValueError(f"Data contains NaN values only. ")
data = data.sel({dim: available_data})

return data


def dimension_labels(data: RasterCube, dimension: str) -> ArrayLike:
if dimension not in data.dims:
raise DimensionNotAvailable(
Expand Down Expand Up @@ -72,3 +98,111 @@ def add_dimension(
# Register dimension in the openeo accessor
data_e.openeo.add_dim_type(name=name, type=type)
return data_e


def rename_dimension(
data: RasterCube,
source: str,
target: str,
):
"""
Parameters
----------
data : xr.DataArray
A data cube.
source : str
The current name of the dimension.
Fails with a DimensionNotAvailable exception if the specified dimension does not exist.
labels : number, str
A new Name for the dimension.
Fails with a DimensionExists exception if a dimension with the specified name exists.
Returns
-------
xr.DataArray :
A data cube with the same dimensions,
but the name of one of the dimensions changes.
The old name can not be referred to any longer.
The dimension properties (name, type, labels, reference system and resolution)
remain unchanged.
"""
if source not in data.dims:
raise DimensionNotAvailable(
f"Provided dimension ({source}) not found in data.dims: {data.dims}"
)
if target in data.dims:
raise Exception(
f"DimensionExists - A dimension with the specified name already exists. The existing dimensions are: {data.dims}"
)
# Register dimension in the openeo accessor
if source in data.openeo.spatial_dims:
dim_type = "spatial"
elif source in data.openeo.temporal_dims:
dim_type = "temporal"
elif source in data.openeo.band_dims:
dim_type = "bands"
else:
dim_type = "other"
data = data.rename({source: target})
data.openeo.add_dim_type(name=target, type=dim_type)
return data


def rename_labels(
data: RasterCube,
dimension: str,
target: list[Union[str, float]],
source: Optional[list[Union[str, float]]] = [],
):
data_rename = copy.deepcopy(data)
if dimension not in data_rename.dims:
raise DimensionNotAvailable(
f"Provided dimension ({dimension}) not found in data.dims: {data_rename.dims}"
)
if source:
if len(source) != len(target):
raise Exception(
f"LabelMismatch - The number of labels in the parameters `source` and `target` don't match."
)

source_labels = data_rename[dimension].values
if isinstance(source_labels, np.ndarray):
source_labels = source_labels.tolist()
if isinstance(target, np.ndarray):
target = target.tolist()

target_values = []

for label in source_labels:
if label in target:
raise Exception(f"LabelExists - A label with the specified name exists.")
if source:
if label in source:
target_values.append(target[source.index(label)])
else:
target_values.append(label)

if not source:
if len(source_labels) == len(target):
data_rename[dimension] = target
elif len(target) < len(source_labels):
if 0 in source_labels:
target_values = target + source_labels[len(target) :]
data_rename[dimension] = target_values
else:
raise Exception(
f"LabelsNotEnumerated - The dimension labels are not enumerated."
)
else:
raise Exception(
f"LabelMismatch - The number of labels in the parameters `source` and `target` don't match."
)

else:
for label in source:
if label not in source_labels:
raise Exception(
f"LabelNotAvailable - A label with the specified name does not exist."
)
data_rename[dimension] = target_values

return data_rename
Loading

0 comments on commit e7a4ec2

Please sign in to comment.