diff --git a/doc/source/conf.py b/doc/source/conf.py index 15146ac9..12578468 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -62,7 +62,8 @@ # For myst-nb to find the Jupyter kernel (=environment) to run from nb_kernel_rgx_aliases = {".*xdem.*": "python3"} -nb_execution_raise_on_error = True +nb_execution_raise_on_error = True # To fail documentation build on notebook execution error +nb_execution_show_tb = True # To show full traceback on notebook execution error # autosummary_generate = True diff --git a/doc/source/coregistration.md b/doc/source/coregistration.md index 998c2d93..02f83a47 100644 --- a/doc/source/coregistration.md +++ b/doc/source/coregistration.md @@ -1,5 +1,7 @@ --- file_format: mystnb +mystnb: + execution_timeout: 90 jupytext: formats: md:myst text_representation: diff --git a/tests/test_coreg/test_base.py b/tests/test_coreg/test_base.py index ecffe363..624a2f99 100644 --- a/tests/test_coreg/test_base.py +++ b/tests/test_coreg/test_base.py @@ -12,24 +12,26 @@ import numpy as np import pandas as pd import pytest +import pytransform3d.rotations import rasterio as rio from geoutils import Raster, Vector from geoutils.raster import RasterType +from scipy.ndimage import binary_dilation import xdem from xdem import coreg, examples, misc, spatialstats from xdem._typing import NDArrayf -from xdem.coreg.base import Coreg, _apply_matrix_rst +from xdem.coreg.base import Coreg, apply_matrix def load_examples() -> tuple[RasterType, RasterType, Vector]: """Load example files to try coregistration methods with.""" - reference_raster = Raster(examples.get_path("longyearbyen_ref_dem")) - to_be_aligned_raster = Raster(examples.get_path("longyearbyen_tba_dem")) + reference_dem = Raster(examples.get_path("longyearbyen_ref_dem")) + to_be_aligned_dem = Raster(examples.get_path("longyearbyen_tba_dem")) glacier_mask = Vector(examples.get_path("longyearbyen_glacier_outlines")) - return reference_raster, to_be_aligned_raster, glacier_mask + return reference_dem, to_be_aligned_dem, glacier_mask def assert_coreg_meta_equal(input1: Any, input2: Any) -> bool: @@ -889,116 +891,169 @@ def test_blockwise_coreg_large_gaps(self) -> None: assert np.nanstd(ddem_pre) > np.nanstd(ddem_post) -def test_apply_matrix() -> None: +class TestAffineManipulation: ref, tba, outlines = load_examples() # Load example reference, to-be-aligned and mask. - ref_arr = gu.raster.get_array_and_mask(ref)[0] - - # Test only vertical shift (it should just apply the vertical shift and not make anything else) - vshift = 5 - matrix = np.diag(np.ones(4, float)) - matrix[2, 3] = vshift - transformed_dem = _apply_matrix_rst(ref_arr, ref.transform, matrix) - reverted_dem = transformed_dem - vshift - - # Check that the reverted DEM has the exact same values as the initial one - # (resampling is not an exact science, so this will only apply for vertical shift corrections) - assert np.nanmedian(reverted_dem) == np.nanmedian(np.asarray(ref.data)) - - # Synthesize a shifted and vertically offset DEM - pixel_shift = 11 - vshift = 5 - shifted_dem = ref_arr.copy() - shifted_dem[:, pixel_shift:] = shifted_dem[:, :-pixel_shift] - shifted_dem[:, :pixel_shift] = np.nan - shifted_dem += vshift - - matrix = np.diag(np.ones(4, dtype=float)) - matrix[0, 3] = pixel_shift * tba.res[0] - matrix[2, 3] = -vshift - - transformed_dem = _apply_matrix_rst(shifted_dem, ref.transform, matrix, resampling="bilinear") - diff = np.asarray(ref_arr - transformed_dem) - - # Check that the median is very close to zero - assert np.abs(np.nanmedian(diff)) < 0.01 - # Check that the NMAD is low - assert spatialstats.nmad(diff) < 0.01 - - def rotation_matrix(rotation: float = 30) -> NDArrayf: - rotation = np.deg2rad(rotation) - matrix = np.array( - [ - [1, 0, 0, 0], - [0, np.cos(rotation), -np.sin(rotation), 0], - [0, np.sin(rotation), np.cos(rotation), 0], - [0, 0, 0, 1], - ] - ) - return matrix - rotation = 4 - centroid = ( - np.mean([ref.bounds.left, ref.bounds.right]), - np.mean([ref.bounds.top, ref.bounds.bottom]), - ref.data.mean(), - ) - rotated_dem = _apply_matrix_rst(ref.data.squeeze(), ref.transform, rotation_matrix(rotation), centroid=centroid) - # Make sure that the rotated DEM is way off, but is centered around the same approximate point. - assert np.abs(np.nanmedian(rotated_dem - ref.data.data)) < 1 - assert spatialstats.nmad(rotated_dem - ref.data.data) > 500 - - # Apply a rotation in the opposite direction - unrotated_dem = ( - _apply_matrix_rst(rotated_dem, ref.transform, rotation_matrix(-rotation * 0.99), centroid=centroid) + 4.0 - ) # TODO: Check why the 0.99 rotation and +4 vertical shift were introduced. - - diff = np.asarray(ref.data.squeeze() - unrotated_dem) - - # if False: - # import matplotlib.pyplot as plt - # - # vmin = 0 - # vmax = 1500 - # extent = (ref.bounds.left, ref.bounds.right, ref.bounds.bottom, ref.bounds.top) - # plot_params = dict( - # extent=extent, - # vmin=vmin, - # vmax=vmax - # ) - # plt.figure(figsize=(22, 4), dpi=100) - # plt.subplot(151) - # plt.title("Original") - # plt.imshow(ref.data.squeeze(), **plot_params) - # plt.xlim(*extent[:2]) - # plt.ylim(*extent[2:]) - # plt.subplot(152) - # plt.title(f"Rotated {rotation} degrees") - # plt.imshow(rotated_dem, **plot_params) - # plt.xlim(*extent[:2]) - # plt.ylim(*extent[2:]) - # plt.subplot(153) - # plt.title(f"De-rotated {-rotation} degrees") - # plt.imshow(unrotated_dem, **plot_params) - # plt.xlim(*extent[:2]) - # plt.ylim(*extent[2:]) - # plt.subplot(154) - # plt.title("Original vs. de-rotated") - # plt.imshow(diff, extent=extent, vmin=-10, vmax=10, cmap="coolwarm_r") - # plt.colorbar() - # plt.xlim(*extent[:2]) - # plt.ylim(*extent[2:]) - # plt.subplot(155) - # plt.title("Original vs. de-rotated") - # plt.hist(diff[np.isfinite(diff)], bins=np.linspace(-10, 10, 100)) - # plt.tight_layout(w_pad=0.05) - # plt.show() - - # Check that the median is very close to zero - assert np.abs(np.nanmedian(diff)) < 0.5 - # Check that the NMAD is low - assert spatialstats.nmad(diff) < 5 - print(np.nanmedian(diff), spatialstats.nmad(diff)) + # Identity transformation + matrix_identity = np.diag(np.ones(4, float)) + + # Vertical shift + matrix_vertical = matrix_identity.copy() + matrix_vertical[2, 3] = 1 + + # Vertical and horizontal shifts + matrix_translations = matrix_identity.copy() + matrix_translations[:3, 3] = [0.5, 1, 1.5] + + # Single rotation + rotation = np.deg2rad(5) + matrix_rotations = matrix_identity.copy() + matrix_rotations[1, 1] = np.cos(rotation) + matrix_rotations[2, 2] = np.cos(rotation) + matrix_rotations[2, 1] = -np.sin(rotation) + matrix_rotations[1, 2] = np.sin(rotation) + + # Mix of translations and rotations in all axes (X, Y, Z) simultaneously + rotation_x = 5 + rotation_y = 10 + rotation_z = 3 + e = np.deg2rad(np.array([rotation_x, rotation_y, rotation_z])) + # This is a 3x3 rotation matrix + rot_matrix = pytransform3d.rotations.matrix_from_euler(e=e, i=0, j=1, k=2, extrinsic=True) + matrix_all = matrix_rotations.copy() + matrix_all[0:3, 0:3] = rot_matrix + matrix_all[:3, 3] = [0.5, 1, 1.5] + + list_matrices = [matrix_identity, matrix_vertical, matrix_translations, matrix_rotations, matrix_all] + + @pytest.mark.parametrize("matrix", list_matrices) # type: ignore + def test_apply_matrix__points_opencv(self, matrix: NDArrayf) -> None: + """ + Test that apply matrix's exact transformation for points (implemented with NumPy matrix multiplication) + is exactly the same as the one of OpenCV (optional dependency). + """ + + # Create random points + points = np.random.default_rng(42).normal(size=(10, 3)) + + # Convert to a geodataframe and use apply_matrix for the point cloud + epc = gpd.GeoDataFrame(data={"z": points[:, 2]}, geometry=gpd.points_from_xy(x=points[:, 0], y=points[:, 1])) + trans_epc = apply_matrix(epc, matrix=matrix) + + # Run the same operation with openCV + import cv2 + + trans_cv2_arr = cv2.perspectiveTransform(points[:, :].reshape(1, -1, 3), matrix)[0, :, :] + + # Transform point cloud back to array + trans_numpy = np.array([trans_epc.geometry.x.values, trans_epc.geometry.y.values, trans_epc["z"].values]).T + assert np.allclose(trans_numpy, trans_cv2_arr) + + @pytest.mark.parametrize("regrid_method", [None, "iterative", "griddata"]) # type: ignore + @pytest.mark.parametrize("matrix", list_matrices) # type: ignore + def test_apply_matrix__raster(self, regrid_method: None | str, matrix: NDArrayf) -> None: + """Test that apply matrix gives consistent results between points and rasters (thus validating raster + implementation, as point implementation is validated above), for all possible regridding methods.""" + + # Create a synthetic raster and convert to point cloud + # dem = gu.Raster(self.ref) + dem_arr = np.linspace(0, 2, 25).reshape(5, 5) + transform = rio.transform.from_origin(0, 5, 1, 1) + dem = gu.Raster.from_array(dem_arr, transform=transform, crs=4326, nodata=100) + epc = dem.to_pointcloud(data_column_name="z").ds + + # If a centroid was not given, default to the center of the DEM (at Z=0). + centroid = (np.mean(epc.geometry.x.values), np.mean(epc.geometry.y.values), 0.0) + + # Apply affine transformation to both datasets + trans_dem = apply_matrix(dem, matrix=matrix, centroid=centroid, force_regrid_method=regrid_method) + trans_epc = apply_matrix(epc, matrix=matrix, centroid=centroid) + + # Interpolate transformed DEM at coordinates of the transformed point cloud + # Because the raster created as a constant slope (plan-like), the interpolated values should be very close + z_points = trans_dem.interp_points(points=(trans_epc.geometry.x.values, trans_epc.geometry.y.values)) + valids = np.isfinite(z_points) + assert np.count_nonzero(valids) > 0 + assert np.allclose(z_points[valids], trans_epc.z.values[valids], rtol=10e-5) + + def test_apply_matrix__raster_nodata(self) -> None: + """Test the nodatas created by apply_matrix are consistent between methods""" + + # Use matrix with all transformations + matrix = self.matrix_all + + # Create a synthetic raster, add NaNs, and convert to point cloud + dem_arr = np.linspace(0, 2, 400).reshape(20, 20) + dem_arr[10:14, 10:14] = np.nan + dem_arr[5, 5] = np.nan + dem_arr[:2, :] = np.nan + transform = rio.transform.from_origin(0, 5, 1, 1) + dem = gu.Raster.from_array(dem_arr, transform=transform, crs=4326, nodata=100) + epc = dem.to_pointcloud(data_column_name="z").ds + + centroid = (np.mean(epc.geometry.x.values), np.mean(epc.geometry.y.values), 0.0) + + trans_dem_it = apply_matrix(dem, matrix=matrix, centroid=centroid, force_regrid_method="iterative") + trans_dem_gd = apply_matrix(dem, matrix=matrix, centroid=centroid, force_regrid_method="griddata") + + # Get nodata mask + mask_nodata_it = trans_dem_it.data.mask + mask_nodata_gd = trans_dem_gd.data.mask + + # The iterative mask should be larger and contain the other (as griddata interpolates up to 1 pixel away) + assert np.array_equal(np.logical_or(mask_nodata_gd, mask_nodata_it), mask_nodata_it) + + # Verify nodata masks are located within two pixels of each other (1 pixel can be added by griddata, + # and 1 removed by regular-grid interpolation by the iterative method) + smallest_mask = ~binary_dilation( + ~mask_nodata_it, iterations=2 + ) # Invert before dilate to avoid spreading at the edges + # All smallest mask value should exist in the mask of griddata + assert np.array_equal(np.logical_or(smallest_mask, mask_nodata_gd), mask_nodata_gd) + + def test_apply_matrix__raster_realdata(self) -> None: + """Testing real data no complex matrix only to avoid all loops""" + + # Use real data + dem = self.ref + dem.crop((dem.bounds.left, dem.bounds.bottom, dem.bounds.left + 2000, dem.bounds.bottom + 2000)) + epc = dem.to_pointcloud(data_column_name="z").ds + + # Only testing complex matrices for speed + matrix = self.matrix_all + + # If a centroid was not given, default to the center of the DEM (at Z=0). + centroid = (np.mean(epc.geometry.x.values), np.mean(epc.geometry.y.values), 0.0) + + # Apply affine transformation to both datasets + trans_dem_it = apply_matrix(dem, matrix=matrix, centroid=centroid, force_regrid_method="iterative") + trans_dem_gd = apply_matrix(dem, matrix=matrix, centroid=centroid, force_regrid_method="griddata") + trans_epc = apply_matrix(epc, matrix=matrix, centroid=centroid) + + # Interpolate transformed DEM at coordinates of the transformed point cloud, and check values are very close + z_points_it = trans_dem_it.interp_points(points=(trans_epc.geometry.x.values, trans_epc.geometry.y.values)) + z_points_gd = trans_dem_gd.interp_points(points=(trans_epc.geometry.x.values, trans_epc.geometry.y.values)) + + valids = np.logical_and(np.isfinite(z_points_it), np.isfinite(z_points_gd)) + assert np.count_nonzero(valids) > 0 + + diff_it = z_points_it[valids] - trans_epc.z.values[valids] + diff_gd = z_points_gd[valids] - trans_epc.z.values[valids] + + # Because of outliers, noise and slope near 90°, several solutions can exist + # Additionally, contrary to the check in the __raster test which uses a constant slope DEM, the slopes vary + # here so the interpolation check is less accurate so all values can vary a bit + assert np.percentile(np.abs(diff_it), 90) < 1 + assert np.percentile(np.abs(diff_it), 50) < 0.2 + assert np.percentile(np.abs(diff_gd), 90) < 1 + assert np.percentile(np.abs(diff_gd), 50) < 0.2 + + # But, between themselves, the two re-gridding methods should yield much closer results + # (no errors due to 2D interpolation for checking) + diff_it_gd = z_points_gd[valids] - z_points_it[valids] + assert np.percentile(np.abs(diff_it_gd), 99) < 1 # 99% of values are within a meter (instead of 90%) + assert np.percentile(np.abs(diff_it_gd), 50) < 0.02 # 10 times more precise than above def test_warp_dem() -> None: diff --git a/xdem/coreg/base.py b/xdem/coreg/base.py index 365edd8b..94b1ab3f 100644 --- a/xdem/coreg/base.py +++ b/xdem/coreg/base.py @@ -18,13 +18,6 @@ ) import affine - -try: - import cv2 - - _has_cv2 = True -except ImportError: - _has_cv2 = False import fiona import geopandas as gpd import geoutils as gu @@ -47,12 +40,14 @@ subdivide_array, subsample_array, ) +from geoutils.raster.raster import _shift_transform from tqdm import tqdm from xdem._typing import MArrayf, NDArrayb, NDArrayf from xdem.spatialstats import nmad try: + import pytransform3d.rotations import pytransform3d.transformations from pytransform3d.transform_manager import TransformManager @@ -737,6 +732,11 @@ def fit_ramp(x: NDArrayf, y: NDArrayf) -> NDArrayf: return fit_ramp, coefs +############################################### +# Affine matrix manipulation and transformation +############################################### + + def invert_matrix(matrix: NDArrayf) -> NDArrayf: """Invert a transformation matrix.""" with warnings.catch_warnings(): @@ -748,47 +748,224 @@ def invert_matrix(matrix: NDArrayf) -> NDArrayf: return pytransform3d.transformations.invert_transform(checked_matrix) -def apply_matrix( - elev: gu.Raster | NDArrayf | gpd.GeoDataFrame, +def _apply_matrix_pts_arr( + x: NDArrayf, + y: NDArrayf, + z: NDArrayf, + matrix: NDArrayf, + centroid: tuple[float, float, float] | None = None, + invert: bool = False, +) -> tuple[NDArrayf, NDArrayf, NDArrayf]: + """Apply matrix to points as arrays with array outputs (to improve speed in some functions).""" + + # Invert matrix if required + if invert: + matrix = invert_matrix(matrix) + + # First, get 4xN array, adding a column of ones for translations during matrix multiplication + points = np.vstack([x, y, z, np.ones(len(x))]) + + # Temporarily subtract centroid coordinates + if centroid is not None: + points[:3, :] -= np.array(centroid)[:, None] + + # Transform using matrix multiplication, and get only the first three columns + transformed_points = (matrix @ points)[:3, :] + + # Add back centroid coordinates + if centroid is not None: + transformed_points += np.array(centroid)[:, None] + + return transformed_points[0, :], transformed_points[1, :], transformed_points[2, :] + + +def _apply_matrix_pts( + epc: gpd.GeoDataFrame, matrix: NDArrayf, invert: bool = False, centroid: tuple[float, float, float] | None = None, - resampling: int | str = "bilinear", - transform: rio.transform.Affine = None, z_name: str = "z", -) -> NDArrayf | gu.Raster | gpd.GeoDataFrame: +) -> gpd.GeoDataFrame: """ - Apply a 3D affine transformation matrix to a 3D elevation point cloud or 2.5D DEM. + Apply a 3D affine transformation matrix to a 3D elevation point cloud. - :param elev: Elevation point cloud or DEM to transform, either a 2D array (requires transform) or - geodataframe (requires z_name). + :param epc: Elevation point cloud. :param matrix: Affine (4x4) transformation matrix to apply to the DEM. :param invert: Whether to invert the transformation matrix. :param centroid: The X/Y/Z transformation centroid. Irrelevant for pure translations. Defaults to the midpoint (Z=0). - :param resampling: The resampling method to use, only for DEM 2.5D transformation. Can be `nearest`, `bilinear`, - `cubic` or an integer from 0-5. - :param transform: Geotransform of the DEM, only for DEM passed as 2D array. :param z_name: Column name to use as elevation, only for point elevation data passed as geodataframe. - :return: + + :return: Transformed elevation point cloud. """ - if isinstance(elev, gpd.GeoDataFrame): - return _apply_matrix_pts(epc=elev, matrix=matrix, invert=invert, centroid=centroid, z_name=z_name) - else: - if isinstance(elev, gu.Raster): - transform = elev.transform - dem = elev.data - else: - dem = elev + # Apply transformation to X/Y/Z arrays + tx, ty, tz = _apply_matrix_pts_arr( + x=epc.geometry.x.values, + y=epc.geometry.y.values, + z=epc[z_name].values, + matrix=matrix, + centroid=centroid, + invert=invert, + ) - # TODO: Add exception for translation to update only geotransform, maybe directly in apply_matrix? - applied_dem = _apply_matrix_rst( - dem=dem, transform=transform, matrix=matrix, invert=invert, centroid=centroid, resampling=resampling - ) - if isinstance(elev, gu.Raster): - applied_dem = gu.Raster.from_array(applied_dem, transform, elev.crs, elev.nodata) - return applied_dem + # Finally, transform back to a new GeoDataFrame + transformed_epc = gpd.GeoDataFrame( + geometry=gpd.points_from_xy(x=tx, y=ty, crs=epc.crs), + data={z_name: tz}, + ) + + return transformed_epc + + +def _iterate_affine_regrid_small_rotations( + dem: NDArrayf, + transform: rio.transform.Affine, + matrix: NDArrayf, + centroid: tuple[float, float, float] | None = None, + resampling: Literal["nearest", "linear", "cubic", "quintic"] = "linear", + verbose: bool = True, +) -> tuple[NDArrayf, rio.transform.Affine]: + """ + Iterative process to find the best reprojection of affine transformation for small rotations. + + Faster than regridding point cloud by triangulation of points (for instance with scipy.interpolate.griddata). + """ + + # Convert DEM to elevation point cloud, keeping all exact grid coordinates X/Y even for NaNs + dem_rst = gu.Raster.from_array(dem, transform=transform, crs=None, nodata=99999) + epc = dem_rst.to_pointcloud(data_column_name="z", skip_nodata=False).ds + + # Exact affine transform of elevation point cloud (which yields irregular coordinates in 2D) + tz0 = _apply_matrix_pts_arr( + x=epc.geometry.x.values, y=epc.geometry.y.values, z=epc.z.values, matrix=matrix, centroid=centroid + )[2] + + # We need to find the elevation Z of a transformed DEM at the exact grid coordinates X,Y + # Which means we need to find coordinates X',Y',Z' of the original DEM that, after the exact affine transform, + # fall exactly on regular X,Y coordinates + + # 1/ The elevation of the original DEM, Z', is simply a 2D interpolator function of X',Y' (bilinear, typically) + # (We create the interpolator only once here for computational speed, instead of using Raster.interp_points) + xycoords = dem_rst.coords(grid=False) + z_interp = scipy.interpolate.RegularGridInterpolator( + points=(np.flip(xycoords[1], axis=0), xycoords[0]), values=dem, method=resampling, bounds_error=False + ) + + # 2/ As a first guess of a transformed DEM elevation Z near the grid coordinates, we initialize with the elevations + # of the nearest point from the transformed elevation point cloud + + # OLD METHOD + # (Longest step computationally) + # with warnings.catch_warnings(): + # warnings.filterwarnings("ignore", category=UserWarning, message="Geometry is in a geographic CRS.*") + # nearest = gpd.sjoin_nearest(epc, trans_epc) + # + # # In case several points are found at exactly the same distance, take the mean of their elevations + # new_z = nearest.groupby(by=nearest.index)["z_left"].mean().values + + # NEW METHOD: Use the transformed elevation instead of searching for a nearest neighbour, + # is close enough for small rotations! (and only creates a couple more iterations instead of a full search) + new_z = tz0 + + # 3/ We then iterate between two steps until convergence: + # a/ Use the Z guess to derive invert affine transform X',Y' coordinates for the original DEM, + # b/ Interpolate Z' at new coordinates X',Y' on the original DEM, and apply affine transform to get updated Z guess + + # Start with full array of X/Y regular coordinates (subset during iterations to improve computational efficiency) + x = epc.geometry.x.values + y = epc.geometry.y.values + + # Initialize output z array, and array to store points that have converged + zfinal = np.ones(len(x), dtype=dem.dtype) + ind_converged = np.zeros(len(x), dtype=bool) + + # For small rotations, and large DEMs (elevation range smaller than the DEM extent), this converges fast + max_niter = 20 # Maximum iteration number + niter_check = 5 # Number of iterations between residual checks + tolerance = 10 ** (-4) # Tolerance in X/Y relative to resolution of X/Y + res_x = dem_rst.res[0] # Resolution in X + res_y = dem_rst.res[1] # Resolution in Y + niter = 1 # Starting iteration + + while niter < max_niter: + + # Invert X,Y (exact grid coordinates) with Z guess to find X',Y' coordinates on original DEM + tx, ty = _apply_matrix_pts_arr(x=x, y=y, z=new_z, matrix=matrix, invert=True, centroid=centroid)[:2] + + # Interpolate original DEM at X', Y' to get Z', and convert to point cloud + tz = z_interp((ty, tx)) + + # Transform to see if we fall back on our feet (on the regular grid), or if we need to iterate more + x0, y0, z0 = _apply_matrix_pts_arr(x=tx, y=ty, z=tz, matrix=matrix, centroid=centroid) + + # Only check residuals after first iteration (to remove NaNs) then every 5 iterations to reduce computing time + if niter == 1 or niter == niter_check: + + # Compute difference between exact grid coordinates and current coordinates, and stop if tolerance reached + diff_x = x0 - x + diff_y = y0 - y + + if verbose: + print( + f"Residual check at iteration number {niter}:" + f"\n Mean diff x: {np.nanmean(np.abs(diff_x))}" + f"\n Mean diff y: {np.nanmean(np.abs(diff_y))}" + ) + + # Get index of points below tolerance in both X/Y for this subsample (all points before convergence update) + # Nodata values are considered having converged + subind_diff_x = np.logical_or(np.abs(diff_x) < (tolerance * res_x), ~np.isfinite(diff_x)) + subind_diff_y = np.logical_or(np.abs(diff_y) < (tolerance * res_y), ~np.isfinite(diff_y)) + subind_converged = np.logical_and(subind_diff_x, subind_diff_y) + + if verbose: + print( + f" Points not within tolerance: " + f"{np.count_nonzero(~subind_diff_x)} for X; " + f"{np.count_nonzero(~subind_diff_y)} for Y" + ) + + # If all points left are below convergence, update Z one final time and stop here + if all(subind_converged): + zfinal[~ind_converged] = z0 + break + # Otherwise, save Z for new converged points and keep only not converged in next iterations (for speed) + else: + zfinal[~ind_converged] = z0 + x = x[~subind_converged] + y = y[~subind_converged] + z0 = z0[~subind_converged] + + # Otherwise, for this check, update convergence status for points not having converged yet + ind_converged[~ind_converged] = subind_converged + + # If another iteration is required, update Z guess and increment + new_z = z0 + niter += 1 + + # 4/ Write the regular-grid point cloud back into a raster + epc.z = zfinal # We just replace the Z of the original grid to ensure exact coordinates + transformed_dem = dem_rst.from_pointcloud_regular( + epc, transform=transform, shape=dem.shape, data_column_name="z", nodata=-99999 + ) + + return transformed_dem.data.filled(np.nan), transform + + +def _get_rotations_from_matrix(matrix: NDArrayf) -> tuple[float, float, float]: + """ + Get rotation angles along each axis from the 4x4 affine matrix, derived as Euler extrinsic angles in degrees. + + :param matrix: Affine matrix. + + :return: Euler extrinsic rotation angles along X, Y and Z (degrees). + """ + + # The rotation matrix is composed of the first 3 rows/columns + rot_matrix = matrix[0:3, 0:3] + angles = pytransform3d.rotations.euler_from_matrix(R=rot_matrix, i=0, j=1, k=2, extrinsic=True) + return np.rad2deg(angles) def _apply_matrix_rst( @@ -797,21 +974,14 @@ def _apply_matrix_rst( matrix: NDArrayf, invert: bool = False, centroid: tuple[float, float, float] | None = None, - resampling: int | str = "bilinear", - fill_max_search: int = 0, -) -> NDArrayf: + resampling: Literal["nearest", "linear", "cubic", "quintic"] = "linear", + verbose: bool = True, + force_regrid_method: Literal["iterative", "griddata"] | None = None, +) -> tuple[NDArrayf, rio.transform.Affine]: """ Apply a 3D affine transformation matrix to a 2.5D DEM. - The transformation is applied as a value correction using linear deramping, and 2D image warping. - - 1. Convert the DEM into a point cloud (not for gridding; for estimating the DEM shifts). - 2. Transform the point cloud in 3D using the 4x4 matrix. - 3. Measure the difference in elevation between the original and transformed points. - 4. Estimate a linear deramp from the elevation difference, and apply the correction to the DEM values. - 5. Convert the horizontal coordinates of the transformed points to pixel index coordinates. - 6. Apply the pixel-wise displacement in 2D using the new pixel coordinates. - 7. Apply the same displacement to a nodata-mask to exclude previous and/or new nans. + See details in description of apply_matrix(). :param dem: DEM to transform. :param transform: Geotransform of the DEM. @@ -819,161 +989,152 @@ def _apply_matrix_rst( :param invert: Whether to invert the transformation matrix. :param centroid: The X/Y/Z transformation centroid. Irrelevant for pure translations. Defaults to the midpoint (Z=0). - :param resampling: The resampling method to use. Can be `nearest`, `bilinear`, `cubic` or an integer from 0-5. - :param fill_max_search: Set to > 0 value to fill the DEM before applying the transformation, to avoid spreading\ - gaps. The DEM will be filled with rasterio.fill.fillnodata with max_search_distance set to fill_max_search.\ - This is experimental, use at your own risk ! + :param resampling: Point interpolation method, one of 'nearest', 'linear', 'cubic', or 'quintic'. For more + information, see scipy.ndimage.map_coordinates and scipy.interpolate.interpn. Default is linear. + :param force_regrid_method: Force re-gridding method to convert 3D point cloud to 2.5 DEM, only for testing. - :returns: Transformed DEM with NaNs as nodata values (replaces a potential mask of the input `dem`). + :returns: Transformed DEM, Transform. """ - # Parse the resampling argument given. - if isinstance(resampling, (int, np.integer)): - resampling_order = resampling - elif resampling == "cubic": - resampling_order = 3 - elif resampling == "bilinear": - resampling_order = 1 - elif resampling == "nearest": - resampling_order = 0 - else: - raise ValueError( - f"`{resampling}` is not a valid resampling mode." - " Choices: [`nearest`, `bilinear`, `cubic`] or an integer." - ) - # Copy the DEM to make sure the original is not modified, and convert it into an ndarray - demc = np.array(dem) - - # Check if the matrix only contains a Z correction. In that case, only shift the DEM values by the vertical shift. - empty_matrix = np.diag(np.ones(4, float)) - empty_matrix[2, 3] = matrix[2, 3] - if np.mean(np.abs(empty_matrix - matrix)) == 0.0: - return demc + matrix[2, 3] - - # Opencv is required down from here - if not _has_cv2: - raise ValueError("Optional dependency needed. Install 'opencv'") - - nan_mask = ~np.isfinite(dem) - assert np.count_nonzero(~nan_mask) > 0, "Given DEM had all nans." - # Optionally, fill DEM around gaps to reduce spread of gaps - if fill_max_search > 0: - filled_dem = rio.fill.fillnodata(demc, mask=(~nan_mask).astype("uint8"), max_search_distance=fill_max_search) - else: - filled_dem = demc # np.where(~nan_mask, demc, np.nan) # I don't know why this was needed - to delete - # Get the centre coordinates of the DEM pixels. - x_coords, y_coords = _get_x_and_y_coords(demc.shape, transform) + # Invert matrix if required + if invert: + matrix = invert_matrix(matrix) - bounds, resolution = _transform_to_bounds_and_res(dem.shape, transform) + # Check DEM has valid values + if np.count_nonzero(np.isfinite(dem)) == 0: + raise ValueError("Input DEM has all nans.") - # If a centroid was not given, default to the center of the DEM (at Z=0). - if centroid is None: - centroid = (np.mean([bounds.left, bounds.right]), np.mean([bounds.bottom, bounds.top]), 0.0) - else: - assert len(centroid) == 3, f"Expected centroid to be 3D X/Y/Z coordinate. Got shape of {len(centroid)}" + shift_z_only_matrix = np.diag(np.ones(4, float)) + shift_z_only_matrix[2, 3] = matrix[2, 3] - # Shift the coordinates to centre around the centroid. - x_coords -= centroid[0] - y_coords -= centroid[1] + shift_only_matrix = np.diag(np.ones(4, float)) + shift_only_matrix[:3, 3] = matrix[:3, 3] - # Create a point cloud of X/Y/Z coordinates - point_cloud = np.dstack((x_coords, y_coords, filled_dem)) + # 1/ Check if the matrix only contains a Z correction, in that case only shift the DEM values by the vertical shift + if np.array_equal(shift_z_only_matrix, matrix) and force_regrid_method is None: + return dem + matrix[2, 3], transform - # Shift the Z components by the centroid. - point_cloud[:, 2] -= centroid[2] + # 2/ Check if the matrix contains only translations, in that case only shift by the DEM only by translation + if np.array_equal(shift_only_matrix, matrix) and force_regrid_method is None: + new_transform = _shift_transform(transform, xoff=matrix[0, 3], yoff=matrix[1, 3]) + return dem + matrix[2, 3], new_transform - if invert: - matrix = invert_matrix(matrix) + # 3/ If matrix contains only small rotations (less than 20 degrees), use the fast iterative reprojection + rotations = _get_rotations_from_matrix(matrix) + if all(np.abs(rot) < 20 for rot in rotations) and force_regrid_method is None or force_regrid_method == "iterative": + new_dem, transform = _iterate_affine_regrid_small_rotations( + dem=dem, transform=transform, matrix=matrix, centroid=centroid, resampling=resampling, verbose=verbose + ) + return new_dem, transform + + # 4/ Otherwise, use a delauney triangulation interpolation of the transformed point cloud + # Convert DEM to elevation point cloud, keeping all exact grid coordinates X/Y even for NaNs + dem_rst = gu.Raster.from_array(dem, transform=transform, crs=None, nodata=99999) + epc = dem_rst.to_pointcloud(data_column_name="z").ds + trans_epc = _apply_matrix_pts(epc, matrix=matrix, centroid=centroid) + from geoutils.pointcloud import _grid_pointcloud - # Transform the point cloud using the matrix. - transformed_points = cv2.perspectiveTransform( - point_cloud.reshape((1, -1, 3)), - matrix, - ).reshape(point_cloud.shape) - - # Estimate the vertical difference of old and new point cloud elevations. - deramp, coeffs = deramping( - (point_cloud[:, :, 2] - transformed_points[:, :, 2])[~nan_mask].flatten(), - point_cloud[:, :, 0][~nan_mask].flatten(), - point_cloud[:, :, 1][~nan_mask].flatten(), - degree=1, + new_dem = _grid_pointcloud( + trans_epc, grid_coords=dem_rst.coords(grid=False), data_column_name="z", resampling=resampling ) - # Shift the elevation values of the soon-to-be-warped DEM. - filled_dem -= deramp(x_coords, y_coords) - - # Create arrays of x and y coordinates to be converted into index coordinates. - x_inds = transformed_points[:, :, 0].copy() - x_inds[x_inds == 0] = np.nan - y_inds = transformed_points[:, :, 1].copy() - y_inds[y_inds == 0] = np.nan - - # Divide the coordinates by the resolution to create index coordinates. - x_inds /= resolution - y_inds /= resolution - # Shift the x coords so that bounds.left is equivalent to xindex -0.5 - x_inds -= x_coords.min() / resolution - # Shift the y coords so that bounds.top is equivalent to yindex -0.5 - y_inds = (y_coords.max() / resolution) - y_inds - - # Create a skimage-compatible array of the new index coordinates that the pixels shall have after warping. - inds = np.vstack((y_inds.reshape((1,) + y_inds.shape), x_inds.reshape((1,) + x_inds.shape))) - with warnings.catch_warnings(): - # An skimage warning that will hopefully be fixed soon. (2021-07-30) - warnings.filterwarnings("ignore", message="Passing `np.nan` to mean no clipping in np.clip") - # Warp the DEM - transformed_dem = skimage.transform.warp( - filled_dem, inds, order=resampling_order, mode="constant", cval=np.nan, preserve_range=True - ) + return new_dem, transform - assert np.count_nonzero(~np.isnan(transformed_dem)) > 0, "Transformed DEM has all nans." - return transformed_dem +@overload +def apply_matrix( + elev: NDArrayf, + matrix: NDArrayf, + invert: bool = False, + centroid: tuple[float, float, float] | None = None, + resampling: Literal["nearest", "linear", "cubic", "quintic"] = "linear", + transform: rio.transform.Affine = None, + z_name: str = "z", + **kwargs: Any, +) -> tuple[NDArrayf, affine.Affine]: + ... -def _apply_matrix_pts( - epc: gpd.GeoDataFrame, +@overload +def apply_matrix( + elev: gu.Raster | gpd.GeoDataFrame, matrix: NDArrayf, invert: bool = False, centroid: tuple[float, float, float] | None = None, + resampling: Literal["nearest", "linear", "cubic", "quintic"] = "linear", + transform: rio.transform.Affine = None, z_name: str = "z", -) -> gpd.GeoDataFrame: + **kwargs: Any, +) -> gu.Raster | gpd.GeoDataFrame: + ... + + +def apply_matrix( + elev: gu.Raster | NDArrayf | gpd.GeoDataFrame, + matrix: NDArrayf, + invert: bool = False, + centroid: tuple[float, float, float] | None = None, + resampling: Literal["nearest", "linear", "cubic", "quintic"] = "linear", + transform: rio.transform.Affine = None, + z_name: str = "z", + **kwargs: Any, +) -> tuple[NDArrayf, affine.Affine] | gu.Raster | gpd.GeoDataFrame: """ - Apply a 3D affine transformation matrix to a 3D elevation point cloud. + Apply a 3D affine transformation matrix to a 3D elevation point cloud or 2.5D DEM. - :param epc: Elevation point cloud. + For an elevation point cloud, the transformation is exact. + + For a DEM, it requires re-gridding because the affine-transformed point cloud of the DEM does not fall onto a + regular grid anymore (except if the affine transformation only has translations). For this, this function uses the + three following methods: + + 1. For transformations with only translations, the transform is updated and vertical shift added to the array, + + 2. For transformations with a small rotation (20 degrees or less for all axes), this function maps which 2D + point coordinates will fall back exactly onto the original DEM grid coordinates after affine transformation by + searching iteratively using the invert affine transformation and 2D point regular-grid interpolation on the + original DEM (see geoutils.Raster.interp_points, or scipy.interpolate.interpn), + + 3. For transformations with large rotations (20 degrees or more), scipy.interpolate.griddata is used to + re-grid the irregular affine-transformed 3D point cloud using Delauney triangulation interpolation (slower). + + :param elev: Elevation point cloud or DEM to transform, either a 2D array (requires transform) or + geodataframe (requires z_name). :param matrix: Affine (4x4) transformation matrix to apply to the DEM. :param invert: Whether to invert the transformation matrix. :param centroid: The X/Y/Z transformation centroid. Irrelevant for pure translations. Defaults to the midpoint (Z=0). + :param resampling: Point interpolation method, one of 'nearest', 'linear', 'cubic', or 'quintic'. For more + information, see scipy.ndimage.map_coordinates and scipy.interpolate.interpn. Default is linear. + :param transform: Geotransform of the DEM, only for DEM passed as 2D array. :param z_name: Column name to use as elevation, only for point elevation data passed as geodataframe. + :param kwargs: Keywords passed to _apply_matrix_rst for testing. - :return: Transformed elevation point cloud. + :return: Affine transformed elevation point cloud or DEM. """ - # Invert matrix if required - if invert: - matrix = invert_matrix(matrix) - - # First, get Nx3 array to pass to opencv - points = np.array([epc.geometry.x.values, epc.geometry.y.values, epc[z_name].values]).T - - # Transform the points (around the centroid if it exists). - if centroid is not None: - points -= centroid - transformed_points = cv2.perspectiveTransform(points.reshape(1, -1, 3), matrix)[ - 0, :, : - ] # Select the first dimension that is one - if centroid is not None: - transformed_points += centroid - - # Finally, transform back to a new GeoDataFrame - transformed_epc = gpd.GeoDataFrame( - geometry=gpd.points_from_xy(x=transformed_points[:, 0], y=transformed_points[:, 1], crs=epc.crs), - data={"z": transformed_points[:, 2]}, - ) + if isinstance(elev, gpd.GeoDataFrame): + return _apply_matrix_pts(epc=elev, matrix=matrix, invert=invert, centroid=centroid, z_name=z_name) + else: + if isinstance(elev, gu.Raster): + transform = elev.transform + dem = elev.data.filled(np.nan) + else: + dem = elev - return transformed_epc + applied_dem, out_transform = _apply_matrix_rst( + dem=dem, + transform=transform, + matrix=matrix, + invert=invert, + centroid=centroid, + resampling=resampling, + **kwargs, + ) + if isinstance(elev, gu.Raster): + applied_dem = gu.Raster.from_array(applied_dem, out_transform, elev.crs, elev.nodata) + return applied_dem + return applied_dem, out_transform ########################################### @@ -1734,13 +1895,12 @@ def _apply_func(self, **kwargs: Any) -> tuple[NDArrayf | gpd.GeoDataFrame, affin # Apply the matrix around the centroid (if defined, otherwise just from the center). transform = kwargs.pop("transform") - applied_elev = _apply_matrix_rst( + applied_elev, out_transform = _apply_matrix_rst( dem=kwargs.pop("elev"), transform=transform, matrix=self.to_matrix(), centroid=self._meta.get("centroid"), ) - out_transform = transform else: raise ValueError("Cannot transform, Coreg method is non-affine and has no implemented _apply_rst.")