From cb32fc798139a9fa53a8c333596a87f4cc0062bd Mon Sep 17 00:00:00 2001 From: Scott Staniewicz Date: Fri, 11 Oct 2024 23:15:36 -0400 Subject: [PATCH] Add stitched phase cosine similarity raster to outputs (#446) * Modify `io.process_blocks` to accept overlapping blocks Result is similar to `map_overlaps`, but with arbitrary number of input or output rasters * create and pass through similarity raster * fix existing similarity file search * use `nan` for nodata, mask pixels with all nan/zero * fix sim tests --- src/dolphin/io/_process.py | 64 ++++++++++++++++++----- src/dolphin/masking.py | 2 + src/dolphin/similarity.py | 18 ++++--- src/dolphin/workflows/displacement.py | 19 +++++-- src/dolphin/workflows/sequential.py | 23 +++++++- src/dolphin/workflows/stitching_bursts.py | 55 +++++++++++++++---- src/dolphin/workflows/wrapped_phase.py | 55 ++++++++++--------- tests/test_similarity.py | 63 ++++++++++++---------- 8 files changed, 215 insertions(+), 84 deletions(-) diff --git a/src/dolphin/io/_process.py b/src/dolphin/io/_process.py index a7a76525..0fc9ecce 100644 --- a/src/dolphin/io/_process.py +++ b/src/dolphin/io/_process.py @@ -4,9 +4,10 @@ from numpy.typing import ArrayLike from tqdm.auto import tqdm +from dolphin import HalfWindow, Strides from dolphin.utils import DummyProcessPoolExecutor -from ._blocks import iter_blocks +from ._blocks import BlockIndices, StridedBlockManager from ._readers import StackReader from ._writers import DatasetWriter @@ -21,7 +22,10 @@ class BlockProcessor(Protocol): """ def __call__( - self, readers: Sequence[StackReader], rows: slice, cols: slice + self, + readers: Sequence[StackReader], + rows: slice, + cols: slice, ) -> tuple[ArrayLike, slice, slice]: ... @@ -30,22 +34,39 @@ def process_blocks( writer: DatasetWriter, func: BlockProcessor, block_shape: tuple[int, int] = (512, 512), + overlaps: tuple[int, int] = (0, 0), num_threads: int = 5, ): """Perform block-wise processing over blocks in `readers`, writing to `writer`. - Used to read and process a stack of rasters in parallel, setting up a queue - of results for the `writer` to save. + Parameters + ---------- + readers : Sequence[StackReader] + Sequence of input readers to read data from. + writer : DatasetWriter + Output writer to write data to. + func : BlockProcessor + Function to process each block. + block_shape : tuple[int, int], optional + Shape of each block to process. + overlaps : tuple[int, int], optional + Amount of overlap between blocks in (row, col) directions. + By default (0, 0). + num_threads : int, optional + Number of threads to use, by default 5. - Note that the parallelism happens using a `ThreadPoolExecutor`, so `func` should - be a function which releases the GIL during computation (e.g. using numpy). """ - shape = readers[0].shape[-2:] - slices = list(iter_blocks(shape, block_shape=block_shape)) + block_manager = StridedBlockManager( + arr_shape=readers[0].shape[-2:], + block_shape=block_shape, + # Here we are not using the striding mechanism + strides=Strides(1, 1), + # The "half window" is how much overlap we read in + half_window=HalfWindow(*overlaps), + ) + total_blocks = sum(1 for _ in block_manager.iter_blocks()) + pbar = tqdm(total=total_blocks) - pbar = tqdm(total=len(slices)) - - # Define the callback to write the result to an output DatasetWrite def write_callback(fut: Future): data, rows, cols = fut.result() writer[..., rows, cols] = data @@ -53,9 +74,26 @@ def write_callback(fut: Future): Executor = ThreadPoolExecutor if num_threads > 1 else DummyProcessPoolExecutor futures: set[Future] = set() + + # Create a helper function to perform the trimming after processing + def _run_and_trim( + func, + readers, + out_idxs: BlockIndices, + trim_idxs: BlockIndices, + in_idxs: BlockIndices, + ): + in_rows, in_cols = in_idxs + out_rows, out_cols = out_idxs + trim_rows, trim_cols = trim_idxs + out_data, _, _ = func(readers=readers, rows=in_rows, cols=in_cols) + return out_data[..., trim_rows, trim_cols], out_rows, out_cols + with Executor(num_threads) as exc: - for rows, cols in slices: - future = exc.submit(func, readers=readers, rows=rows, cols=cols) + for out_idxs, trim_idxs, in_idxs, _, _ in block_manager.iter_blocks(): + future = exc.submit( + _run_and_trim, func, readers, out_idxs, trim_idxs, in_idxs + ) future.add_done_callback(write_callback) futures.add(future) diff --git a/src/dolphin/masking.py b/src/dolphin/masking.py index 1a7e58b2..c60e2916 100644 --- a/src/dolphin/masking.py +++ b/src/dolphin/masking.py @@ -53,6 +53,8 @@ def combine_mask_files( ): """Combine multiple mask files into a single mask file. + All `mask_files` must be the same size and projected on the same grid. + Parameters ---------- mask_files : list of Path or str diff --git a/src/dolphin/similarity.py b/src/dolphin/similarity.py index 87d76c12..c28d4b42 100644 --- a/src/dolphin/similarity.py +++ b/src/dolphin/similarity.py @@ -79,7 +79,7 @@ def max_similarity( Returns ------- np.ndarray - 2D array (shape (rows, cols)) of the maximum similarity for any neighrbor + 2D array (shape (rows, cols)) of the maximum similarity for any neighbor at a pixel. """ @@ -102,9 +102,12 @@ def _create_loop_and_run( raise ValueError("ifg_stack must be complex") unit_ifgs = np.exp(1j * np.angle(ifg_stack)) - out_similarity = np.zeros((rows, cols), dtype="float32") + out_similarity = np.full((rows, cols), fill_value=np.nan, dtype="float32") if mask is None: mask = np.ones((rows, cols), dtype="bool") + # Mark any nans/all zeros as invalid + invalid_mask = np.nan_to_num(ifg_stack).sum(axis=0) == 0 + mask[invalid_mask] = False if mask.shape != (rows, cols): raise ValueError(f"{ifg_stack.shape = }, but {mask.shape = }") @@ -125,7 +128,7 @@ def _make_loop_function( """ @numba.njit(nogil=True, parallel=True) - def _masked_median_sim_loop( + def _masked_sim_loop( ifg_stack: np.ndarray, idxs: np.ndarray, mask: np.ndarray, @@ -145,6 +148,7 @@ def _masked_median_sim_loop( if not m0: continue x0 = ifg_stack[:, r0, c0] + cur_sim_vec = cur_sim[r0, c0] count = 0 @@ -169,7 +173,7 @@ def _masked_median_sim_loop( out_similarity[r0, c0] = summary_func(cur_sim_vec[:count]) return out_similarity - return _masked_median_sim_loop + return _masked_sim_loop def get_circle_idxs( @@ -289,12 +293,12 @@ def create_similarities( else: raise ValueError(f"Unrecognized {sim_type = }") - zero_block = np.zeros(block_shape, dtype="float32") + nodata_block = np.full(block_shape, fill_value=np.nan, dtype="float32") def calc_sim(readers, rows, cols): block = readers[0][:, rows, cols] if np.sum(block) == 0 or np.isnan(block).all(): - return zero_block[rows, cols], rows, cols + return nodata_block[rows, cols], rows, cols out_avg = sim_function(ifg_stack=block, search_radius=search_radius) logger.debug(f"{rows = }, {cols = }, {block.shape = }, {out_avg.shape = }") @@ -312,12 +316,14 @@ def calc_sim(readers, rows, cols): like_filename=ifg_file_list[0], dtype="float32", driver="GTiff", + nodata=np.nan, ) process_blocks( [reader], writer, func=calc_sim, block_shape=block_shape, + overlaps=(search_radius, search_radius), num_threads=num_threads, ) writer.notify_finished() diff --git a/src/dolphin/workflows/displacement.py b/src/dolphin/workflows/displacement.py index 56c663f6..998fa47b 100755 --- a/src/dolphin/workflows/displacement.py +++ b/src/dolphin/workflows/displacement.py @@ -35,6 +35,7 @@ class OutputPaths(NamedTuple): stitched_ps_file: Path stitched_amp_dispersion_file: Path stitched_shp_count_file: Path + stitched_similarity_file: Path unwrapped_paths: list[Path] | None conncomp_paths: list[Path] | None timeseries_paths: list[Path] | None @@ -128,6 +129,7 @@ def run( ps_file_list: list[Path] = [] amp_dispersion_file_list: list[Path] = [] shp_count_file_list: list[Path] = [] + similarity_file_list: list[Path] = [] # The comp_slc tracking object is a dict, since we'll need to organize # multiple comp slcs by burst (they'll have the same filename) comp_slc_dict: dict[str, list[Path]] = {} @@ -161,15 +163,22 @@ def run( for fut in fut_to_burst: burst = fut_to_burst[fut] - cur_ifg_list, comp_slcs, temp_coh, ps_file, amp_disp_file, shp_count = ( - fut.result() - ) + ( + cur_ifg_list, + comp_slcs, + temp_coh, + ps_file, + amp_disp_file, + shp_count, + similarity, + ) = fut.result() ifg_file_list.extend(cur_ifg_list) comp_slc_dict[burst] = comp_slcs temp_coh_file_list.append(temp_coh) ps_file_list.append(ps_file) amp_dispersion_file_list.append(amp_disp_file) shp_count_file_list.append(shp_count) + similarity_file_list.append(similarity) # ################################### # 2. Stitch burst-wise interferograms @@ -186,12 +195,14 @@ def run( stitched_ps_file, stitched_amp_dispersion_file, stitched_shp_count_file, + stitched_similarity_file, ) = stitching_bursts.run( ifg_file_list=ifg_file_list, temp_coh_file_list=temp_coh_file_list, ps_file_list=ps_file_list, amp_dispersion_list=amp_dispersion_file_list, shp_count_file_list=shp_count_file_list, + similarity_file_list=similarity_file_list, stitched_ifg_dir=cfg.interferogram_network._directory, output_options=cfg.output_options, file_date_fmt=cfg.input_options.cslc_date_fmt, @@ -212,6 +223,7 @@ def run( stitched_ps_file=stitched_ps_file, stitched_amp_dispersion_file=stitched_amp_dispersion_file, stitched_shp_count_file=stitched_shp_count_file, + stitched_similarity_file=stitched_similarity_file, unwrapped_paths=None, conncomp_paths=None, timeseries_paths=None, @@ -354,6 +366,7 @@ def run( stitched_ps_file=stitched_ps_file, stitched_amp_dispersion_file=stitched_amp_dispersion_file, stitched_shp_count_file=stitched_shp_count_file, + stitched_similarity_file=stitched_similarity_file, unwrapped_paths=unwrapped_paths, # TODO: Let's keep the unwrapped_paths since all the outputs are # corresponding to those and if we have a network unwrapping, the diff --git a/src/dolphin/workflows/sequential.py b/src/dolphin/workflows/sequential.py index 24c17e87..26fc2354 100644 --- a/src/dolphin/workflows/sequential.py +++ b/src/dolphin/workflows/sequential.py @@ -18,6 +18,7 @@ from dolphin import io from dolphin._types import Filename from dolphin.io import VRTStack +from dolphin.similarity import create_similarities from dolphin.stack import MiniStackPlanner from .config import ShpMethod @@ -51,7 +52,7 @@ def run_wrapped_phase_sequential( block_shape: tuple[int, int] = (512, 512), baseline_lag: Optional[int] = None, **tqdm_kwargs, -) -> tuple[list[Path], list[Path], Path, Path]: +) -> tuple[list[Path], list[Path], Path, Path, Path]: """Estimate wrapped phase using batches of ministacks.""" if strides is None: strides = {"x": 1, "y": 1} @@ -149,6 +150,18 @@ def already_processed(d: Path, search_ext: str = ".tif") -> bool: _average_rasters(temp_coh_files, output_temp_coh_file, "Float32") _average_rasters(shp_count_files, output_shp_count_file, "Int16") + # Create one phase similarity raster + output_similarity_file = output_folder / f"similarity_{full_span}.tif" + create_similarities( + ifg_file_list=cur_output_files, + output_file=output_similarity_file, + # TODO: any of these configurable? + search_radius=11, + sim_type="median", + block_shape=block_shape, + num_threads=3, + ) + # Combine the separate SLC output lists into a single list all_slc_files = list(chain.from_iterable(output_slc_files)) all_comp_slc_files = [ms.get_compressed_slc_info().path for ms in ministacks] @@ -163,7 +176,13 @@ def already_processed(d: Path, search_ext: str = ".tif") -> bool: p.rename(output_folder / p.name) comp_slc_outputs.append(output_folder / p.name) - return out_pl_slcs, comp_slc_outputs, output_temp_coh_file, output_shp_count_file + return ( + out_pl_slcs, + comp_slc_outputs, + output_temp_coh_file, + output_shp_count_file, + output_similarity_file, + ) def _get_outputs_from_folder( diff --git a/src/dolphin/workflows/stitching_bursts.py b/src/dolphin/workflows/stitching_bursts.py index 67963359..1dae5b02 100644 --- a/src/dolphin/workflows/stitching_bursts.py +++ b/src/dolphin/workflows/stitching_bursts.py @@ -4,7 +4,7 @@ import logging from pathlib import Path -from typing import Sequence +from typing import NamedTuple, Sequence from dolphin import stitching from dolphin._log import log_runtime @@ -18,6 +18,25 @@ logger = logging.getLogger(__name__) +class StitchedOutputs(NamedTuple): + """Output rasters from stitching step.""" + + stitched_ifg_paths: list[Path] + """List of Paths to the stitched interferograms.""" + interferometric_corr_paths: list[Path] + """List of Paths to interferometric correlation files created.""" + stitched_temp_coh_file: Path + """Path to temporal correlation file created.""" + stitched_ps_file: Path + """Path to ps mask file created.""" + stitched_amp_disp_file: Path + """Path to amplitude dispersion file created.""" + stitched_shp_count_file: Path + """Path to SHP count file created.""" + stitched_similarity_file: Path + """Path to cosine similarity file created.""" + + @log_runtime def run( ifg_file_list: Sequence[Path], @@ -25,26 +44,31 @@ def run( ps_file_list: Sequence[Path], amp_dispersion_list: Sequence[Path], shp_count_file_list: Sequence[Path], + similarity_file_list: Sequence[Path], stitched_ifg_dir: Path, output_options: OutputOptions, file_date_fmt: str = "%Y%m%d", corr_window_size: tuple[int, int] = (11, 11), num_workers: int = 3, -) -> tuple[list[Path], list[Path], Path, Path, Path, Path]: - """Run the displacement workflow on a stack of SLCs. +) -> StitchedOutputs: + """Stitch together spatial subsets from phase linking. + + For Sentinel-1 processing, these can be burst-wise interferograms. Parameters ---------- ifg_file_list : Sequence[Path] - Sequence of burst-wise interferograms files to stitch. + Sequence of interferograms files to stitch. temp_coh_file_list : Sequence[Path] - Sequence of paths to the burst-wise temporal coherence files. + Sequence of paths to the temporal coherence files. ps_file_list : Sequence[Path] - Sequence of paths to the (looked) burst-wise ps mask files. + Sequence of paths to the (looked) ps mask files. amp_dispersion_list : Sequence[Path] - Sequence of paths to the (looked) burst-wise amplitude dispersion files. + Sequence of paths to the (looked) amplitude dispersion files. shp_count_file_list : Sequence[Path] - Sequence of paths to the burst-wise SHP counts files. + Sequence of paths to the SHP counts files. + similarity_file_list : Sequence[Path] + Sequence of paths to the spatial phase cosine similarity files. stitched_ifg_dir : Path Location to store the output stitched ifgs and correlations output_options : OutputOptions @@ -145,6 +169,15 @@ def run( out_bounds_epsg=output_options.bounds_epsg, ) + stitched_similarity_file = stitched_ifg_dir / "similarity.tif" + stitching.merge_images( + similarity_file_list, + outfile=stitched_similarity_file, + driver="GTiff", + out_bounds=out_bounds, + out_bounds_epsg=output_options.bounds_epsg, + ) + if output_options.add_overviews: logger.info("Creating overviews for stitched images") create_overviews(stitched_ifg_paths, image_type=ImageType.INTERFEROGRAM) @@ -153,12 +186,16 @@ def run( create_image_overviews(stitched_temp_coh_file, image_type=ImageType.CORRELATION) create_image_overviews(stitched_amp_disp_file, image_type=ImageType.CORRELATION) create_image_overviews(stitched_shp_count_file, image_type=ImageType.PS) + create_image_overviews( + stitched_similarity_file, image_type=ImageType.CORRELATION + ) - return ( + return StitchedOutputs( stitched_ifg_paths, interferometric_corr_paths, stitched_temp_coh_file, stitched_ps_file, stitched_amp_disp_file, stitched_shp_count_file, + stitched_similarity_file, ) diff --git a/src/dolphin/workflows/wrapped_phase.py b/src/dolphin/workflows/wrapped_phase.py index 16b9eb5c..5c992fd9 100644 --- a/src/dolphin/workflows/wrapped_phase.py +++ b/src/dolphin/workflows/wrapped_phase.py @@ -21,7 +21,7 @@ @log_runtime def run( cfg: DisplacementWorkflow, debug: bool = False, tqdm_kwargs=None -) -> tuple[list[Path], list[Path], Path, Path, Path, Path]: +) -> tuple[list[Path], list[Path], Path, Path, Path, Path, Path]: """Run the displacement workflow on a stack of SLCs. Parameters @@ -151,6 +151,7 @@ def run( comp_slc_list = sorted(pl_path.glob("compressed*tif")) temp_coh_file = next(pl_path.glob("temporal_coherence*tif")) shp_count_file = next(pl_path.glob("shp_count*tif")) + similarity_file = next(pl_path.glob("*similarity*tif")) else: logger.info(f"Running sequential EMI step in {pl_path}") kwargs = tqdm_kwargs | {"desc": f"Phase linking ({pl_path})"} @@ -159,29 +160,33 @@ def run( # If we pre-compute it from some big stack, we need to use that for SHP # finding, not use the size of `slc_vrt_file` shp_nslc = None - (phase_linked_slcs, comp_slc_list, temp_coh_file, shp_count_file) = ( - sequential.run_wrapped_phase_sequential( - slc_vrt_stack=vrt_stack, - output_folder=pl_path, - ministack_size=cfg.phase_linking.ministack_size, - output_reference_idx=cfg.phase_linking.output_reference_idx, - new_compressed_reference_idx=new_compressed_slc_reference_idx, - half_window=cfg.phase_linking.half_window.model_dump(), - strides=strides, - use_evd=cfg.phase_linking.use_evd, - beta=cfg.phase_linking.beta, - mask_file=mask_filename, - ps_mask_file=ps_output, - amp_mean_file=cfg.ps_options._amp_mean_file, - amp_dispersion_file=cfg.ps_options._amp_dispersion_file, - shp_method=cfg.phase_linking.shp_method, - shp_alpha=cfg.phase_linking.shp_alpha, - shp_nslc=shp_nslc, - cslc_date_fmt=cfg.input_options.cslc_date_fmt, - block_shape=cfg.worker_settings.block_shape, - baseline_lag=cfg.phase_linking.baseline_lag, - **kwargs, - ) + ( + phase_linked_slcs, + comp_slc_list, + temp_coh_file, + shp_count_file, + similarity_file, + ) = sequential.run_wrapped_phase_sequential( + slc_vrt_stack=vrt_stack, + output_folder=pl_path, + ministack_size=cfg.phase_linking.ministack_size, + output_reference_idx=cfg.phase_linking.output_reference_idx, + new_compressed_reference_idx=new_compressed_slc_reference_idx, + half_window=cfg.phase_linking.half_window.model_dump(), + strides=strides, + use_evd=cfg.phase_linking.use_evd, + beta=cfg.phase_linking.beta, + mask_file=mask_filename, + ps_mask_file=ps_output, + amp_mean_file=cfg.ps_options._amp_mean_file, + amp_dispersion_file=cfg.ps_options._amp_dispersion_file, + shp_method=cfg.phase_linking.shp_method, + shp_alpha=cfg.phase_linking.shp_alpha, + shp_nslc=shp_nslc, + cslc_date_fmt=cfg.input_options.cslc_date_fmt, + block_shape=cfg.worker_settings.block_shape, + baseline_lag=cfg.phase_linking.baseline_lag, + **kwargs, ) # Dump the used options for JSON parsing logger.info( @@ -207,6 +212,7 @@ def run( ps_looked_file, amp_disp_looked_file, shp_count_file, + similarity_file, ) logger.info(f"Creating virtual interferograms from {len(phase_linked_slcs)} files") @@ -230,6 +236,7 @@ def run( ps_looked_file, amp_disp_looked_file, shp_count_file, + similarity_file, ) diff --git a/tests/test_similarity.py b/tests/test_similarity.py index f6b90a4d..b9a6d995 100644 --- a/tests/test_similarity.py +++ b/tests/test_similarity.py @@ -51,47 +51,56 @@ def test_pixel_similarity(): assert similarity.phase_similarity(x1, x1) == 1 -class TestMedianSimilarity: +def test_pixel_similarity_zero_nan(): + x1, x2 = np.zeros((2, 10), dtype="complex64") + sim = similarity.phase_similarity(x1, x2) + assert sim == 0 + + x1, x2 = np.full((2, 10), fill_value=np.nan + 1j * np.nan) + sim = similarity.phase_similarity(x1, x2) + assert np.isnan(sim) + + x1, x2 = np.ones((2, 10), dtype="complex64") + sim = similarity.phase_similarity(x1, x2) + assert sim == 1 + + +def test_block_similarity_zero_nan(): + block_zeros = np.zeros((10, 4, 5), dtype="complex64") + out = similarity.median_similarity(block_zeros, search_radius=2) + assert np.isnan(out).all() + + block_nan = block_zeros * np.nan + out = similarity.median_similarity(block_nan, search_radius=2) + assert np.isnan(out).all() + + +class TestStackSimilarity: @pytest.fixture def ifg_stack(self, slc_stack): return slc_stack * slc_stack[[0]].conj() @pytest.mark.parametrize("radius", [2, 5, 9]) - def test_basic(self, ifg_stack, radius): - sim = similarity.median_similarity(ifg_stack, search_radius=radius) + @pytest.mark.parametrize("func", ["median", "max"]) + def test_basic(self, ifg_stack, radius, func): + sim_func = getattr(similarity, f"{func}_similarity") + sim = sim_func(ifg_stack, search_radius=radius) assert np.all(sim > -1) assert np.all(sim < 1) @pytest.mark.parametrize("radius", [2, 5, 9]) - def test_median_similarity_masked(self, ifg_stack, radius): + @pytest.mark.parametrize("func", ["median", "max"]) + def test_max_similarity_masked(self, ifg_stack, radius, func): rows, cols = ifg_stack.shape[-2:] mask = np.random.rand(rows, cols).round().astype(bool) - sim = similarity.median_similarity(ifg_stack, search_radius=radius, mask=mask) - assert np.all(sim > -1) - assert np.all(sim < 1) + sim_func = getattr(similarity, f"{func}_similarity") + sim = sim_func(ifg_stack, search_radius=radius, mask=mask) + + assert ((np.nan_to_num(sim) > -1) & (np.nan_to_num(sim) < 1)).all() + assert np.all(np.isnan(sim) == (~mask)) def test_create_similarity(self, tmp_path, slc_file_list): outfile = tmp_path / "med_sim.tif" similarity.create_similarities( slc_file_list, output_file=outfile, num_threads=1, block_shape=(64, 64) ) - - -class TestMaxSimilarity: - @pytest.fixture - def ifg_stack(self, slc_stack): - return slc_stack * slc_stack[[0]].conj() - - @pytest.mark.parametrize("radius", [2, 5, 9]) - def test_basic(self, ifg_stack, radius): - sim = similarity.max_similarity(ifg_stack, search_radius=radius) - assert np.all(sim > -1) - assert np.all(sim < 1) - - @pytest.mark.parametrize("radius", [2, 5, 9]) - def test_max_similarity_masked(self, ifg_stack, radius): - rows, cols = ifg_stack.shape[-2:] - mask = np.random.rand(rows, cols).round().astype(bool) - sim = similarity.max_similarity(ifg_stack, search_radius=radius, mask=mask) - assert np.all(sim >= -1) - assert np.all(sim <= 1)