diff --git a/.github/workflows/unittest.yml b/.github/workflows/unittest.yml index 0f92dd7..80caf00 100644 --- a/.github/workflows/unittest.yml +++ b/.github/workflows/unittest.yml @@ -29,7 +29,7 @@ jobs: - name: run unit tests run: | echo "running unit tests" - python -m pytest --cov=src --cov-report=xml --cov-report=term-missing tests/ + python -m pytest --cov=src --cov-report=xml --cov-report=term-missing -m "not cuda_required" - name: upload coverage to codecov uses: codecov/codecov-action@v4 with: diff --git a/.gitignore b/.gitignore index 68bc17f..8dc3858 100644 --- a/.gitignore +++ b/.gitignore @@ -158,3 +158,5 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ +.envrc +src/bm3dornl/_version.py diff --git a/environment.yml b/environment.yml index 318ba37..19ccf62 100644 --- a/environment.yml +++ b/environment.yml @@ -1,39 +1,38 @@ name: bm3dornl channels: - conda-forge + - nvidia dependencies: - # -- Runtime dependencies - # base: list all base dependencies here - - python>=3.8 # please specify the mimimum version of python here + # base + - python>=3.10 + - pip - versioningit - # compute: list all compute dependencies here - - numpy - - pandas - # plot: list all plot dependencies here, if applicable + # compute + - cupy + - numba + - scipy<1.13 # avoid a bug in 1.13 + - scikit-image + # [Optional]visualization - matplotlib - # jupyter: list all jupyter dependencies here, if applicable + # [Optional]jupyter - jupyterlab - - ipympl - # -- Development dependencies - # utils: + # utils - pre-commit - # pacakge building: - - libmamba - - libarchive + - line_profiler # useful for development + - memory_profiler # useful for development + # packaging - anaconda-client - boa - - conda-build < 4 # conda-build 24.x has a bug, missing update_index from conda_build.index + - conda-build < 4 - conda-verify + - libmamba + - libarchive - python-build - # test: list all test dependencies here + # test - pytest - pytest-cov + - pytest-mock - pytest-xdist - # -------------------------------------------------- - # add additional sections such as Qt, etc. if needed - # -------------------------------------------------- - # if pakcages are not available on conda, list them here - - pip + # pip packages - pip: - - bm3d-streak-removal # example - - pytest-playwright + - bm3d-streak-removal # this is our reference package diff --git a/notebooks/example.ipynb b/notebooks/example.ipynb index 7fa8701..ef4fb06 100644 --- a/notebooks/example.ipynb +++ b/notebooks/example.ipynb @@ -6,20 +6,408 @@ "source": [ "# Overview\n", "\n", - "This folder is used to store notebooks that demonstrate how to use the library in an interactive environment like Jupyter." + "This notebook provides an example usage for using bm3dornl to remove streaks from single sinogram." ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": {}, "outputs": [], - "source": [] + "source": [ + "import numpy as np\n", + "import matplotlib.pyplot as plt" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "when on a multi-gpu system, make sure specify the card you are using to avoid affecting other people's job" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "# Set the GPU device ID to 0 for this notebook session\n", + "os.environ['CUDA_VISIBLE_DEVICES'] = '0'" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Prepare synthetic noisy sinogram" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "from bm3dornl.phantom import (\n", + " shepp_logan_phantom,\n", + " generate_sinogram,\n", + " simulate_detector_gain_error,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "# define image size\n", + "image_size = 512 # smaller size runs faster on local machine, large number means wider image\n", + "scan_step = 0.5 # deg, smaller number means taller image\n", + "detector_gain_range=(0.98, 1.02) # variation along detector width\n", + "detector_gain_error=0.01 # variation along time/rotation" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CPU times: user 3.16 s, sys: 21.4 ms, total: 3.18 s\n", + "Wall time: 3.22 s\n" + ] + } + ], + "source": [ + "%%time\n", + "\n", + "# make shepp_logan 2D phantom\n", + "shepp_logan_2d = shepp_logan_phantom(\n", + " size=image_size,\n", + " contrast_factor=8,\n", + " )\n", + "\n", + "# transform to sinogram\n", + "sino_org, thetas_deg = generate_sinogram(\n", + " input_img=shepp_logan_2d,\n", + " scan_step=scan_step,\n", + " )\n", + "\n", + "# add detector gain error\n", + "sino_noisy, detector_gain = simulate_detector_gain_error(\n", + " sinogram=sino_org,\n", + " detector_gain_range=detector_gain_range,\n", + " detector_gain_error=detector_gain_error,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "visualization" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "(512, 512) (720, 512) (720, 512)\n", + "1e-08 1.0\n" + ] + }, + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "fig, ax = plt.subplots(1, 3, figsize=(12, 4))\n", + "ax[0].imshow(shepp_logan_2d, cmap='gray')\n", + "ax[0].set_title('Original Shepp-Logan Phantom')\n", + "ax[1].imshow(sino_org, cmap='gray')\n", + "ax[1].set_title('Sinogram')\n", + "ax[2].imshow(sino_noisy, cmap='gray')\n", + "ax[2].set_title('Sinogram with Detector Gain Error')\n", + "\n", + "print(shepp_logan_2d.shape, sino_org.shape, sino_noisy.shape)\n", + "print(sino_noisy.min(), sino_noisy.max())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "estimate background" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "bg_estimate = 1e-1\n", + "\n", + "bg = np.array(sino_noisy)\n", + "bg[sino_noisy >= bg_estimate] = np.nan\n", + "#\n", + "plt.imshow(bg, cmap=\"jet\")\n", + "plt.colorbar()\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## BM3D close source version" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "import bm3d_streak_removal as bm3dsr" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Median filtering, iteration 0\n", + "Median filtering, iteration 1\n", + "Median filtering, iteration 2\n", + "CPU times: user 3.5 s, sys: 6.6 ms, total: 3.5 s\n", + "Wall time: 3.57 s\n" + ] + } + ], + "source": [ + "%%time\n", + "\n", + "sion_bm3d_attenuated = bm3dsr.extreme_streak_attenuation(\n", + " data=sino_noisy,\n", + " extreme_streak_iterations=3,\n", + " extreme_detect_lambda=4.0,\n", + " extreme_detect_size=9,\n", + " extreme_replace_size=2,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Denoising sinogram 0\n", + "k: 4\n", + "k: 3\n", + "k: 2\n", + "k: 1\n", + "k: 0\n", + "CPU times: user 5min 21s, sys: 2min 5s, total: 7min 26s\n", + "Wall time: 1min 49s\n" + ] + } + ], + "source": [ + "%%time\n", + "\n", + "sino_bm3d = bm3dsr.multiscale_streak_removal(\n", + " data=sion_bm3d_attenuated,\n", + " max_bin_iter_horizontal=4,\n", + " bin_vertical=0,\n", + " filter_strength=1.0,\n", + " use_slices=True,\n", + " slice_sizes=None,\n", + " slice_step_sizes=None,\n", + " denoise_indices=None,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "sion_bm3d_attenuated = sion_bm3d_attenuated[:, 0, :]\n", + "sino_bm3d = sino_bm3d[:, 0, :]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "visualization" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "fig, axs = plt.subplots(1, 3, figsize=(12, 4))\n", + "axs[0].imshow(sino_noisy, cmap='gray')\n", + "axs[0].set_title('Noisy sinogram')\n", + "axs[1].imshow(sion_bm3d_attenuated, cmap='gray')\n", + "axs[1].set_title('BM3D extreme streak attenuation')\n", + "axs[2].imshow(sino_bm3d, cmap='gray')\n", + "axs[2].set_title('BM3D denoised sinogram')\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## BM3DRONL" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [], + "source": [ + "from bm3dornl.denoiser import bm3d_streak_removal" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CPU times: user 1min 42s, sys: 13.3 s, total: 1min 56s\n", + "Wall time: 1min 53s\n" + ] + } + ], + "source": [ + "%%time\n", + "\n", + "sino_bm3dornl = bm3d_streak_removal(\n", + " sinogram=sino_noisy,\n", + " background_threshold=0.1,\n", + " patch_size=(8, 8),\n", + " stride=3,\n", + " cut_off_distance=(64, 64),\n", + " intensity_diff_threshold=0.2,\n", + " num_patches_per_group=512,\n", + " shrinkage_threshold=0.1,\n", + " k=4,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# visualize noisy, org, sino_bm3d, and sino_bm3dornl\n", + "fig, axs = plt.subplots(1, 4, figsize=(16, 4))\n", + "axs[0].imshow(sino_noisy, cmap='gray')\n", + "axs[0].set_title('Noisy sinogram')\n", + "axs[0].axis('off')\n", + "axs[1].imshow(sino_org, cmap='gray')\n", + "axs[1].set_title('Original sinogram')\n", + "axs[1].axis('off')\n", + "axs[2].imshow(sino_bm3d, cmap='gray')\n", + "axs[2].set_title('BM3D denoised sinogram')\n", + "axs[2].axis('off')\n", + "axs[3].imshow(sino_bm3dornl, cmap='gray')\n", + "axs[3].set_title('BM3D ORNL denoised sinogram')\n", + "axs[3].axis('off')\n", + "plt.show()" + ] } ], "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, "language_info": { - "name": "python" + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" } }, "nbformat": 4, diff --git a/pyproject.toml b/pyproject.toml index 091cbb5..5c32e29 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,7 +59,7 @@ testpaths = ["tests"] python_files = ["test*.py"] norecursedirs = [".git", "tmp*", "_tmp*", "__pycache__", "*dataset*", "*data_set*"] markers = [ - "mymarker: example markers goes here" + "cuda_required: test requires cuda to run." ] [tool.pylint] diff --git a/src/bm3dornl/__init__.py b/src/bm3dornl/__init__.py index 3ab36bc..6db246b 100644 --- a/src/bm3dornl/__init__.py +++ b/src/bm3dornl/__init__.py @@ -6,10 +6,3 @@ from ._version import __version__ # noqa: F401 except ImportError: __version__ = "0.0.1" - - -def PackageName(): # pylint: disable=invalid-name - """This is needed for backward compatibility because mantid workbench does "from shiver import Shiver" """ - from .packagenamepy import PackageName as packagename # pylint: disable=import-outside-toplevel - - return packagename() diff --git a/src/bm3dornl/aggregation.py b/src/bm3dornl/aggregation.py new file mode 100644 index 0000000..9738087 --- /dev/null +++ b/src/bm3dornl/aggregation.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 +"""Functions for aggregating hyper patch block into a single image.""" + +import numpy as np +from numba import jit, prange + + +@jit(nopython=True, parallel=True) +def aggregate_patches( + estimate_denoised_image: np.ndarray, + weights: np.ndarray, + hyper_block: np.ndarray, + hyper_block_index: np.ndarray, +): + """ + Aggregate patches into the denoised image matrix and update the corresponding weights matrix. + + Parameters + ---------- + estimate_denoised_image : np.ndarray + The 2D numpy array where the aggregate result of the denoised patches will be stored. + weights : np.ndarray + The 2D numpy array that counts the contributions of the patches to the cells of the `estimate_denoised_image`. + hyper_block : np.ndarray + A 4D numpy array of patches to be aggregated. Shape is (num_blocks, num_patches_per_block, patch_height, patch_width). + hyper_block_index : np.ndarray + A 3D numpy array containing the top-left indices (row, column) for each patch in the `hyper_block`. + Shape is (num_blocks, num_patches_per_block, 2). + + Notes + ----- + This function uses Numba's JIT compilation with parallel execution to speed up the aggregation of image patches. + Each thread handles a block of patches independently, reducing computational time significantly on multi-core processors. + """ + num_blocks, num_patches, ph, pw = hyper_block.shape + for i in prange(num_blocks): + for p in range(num_patches): + patch = hyper_block[i, p] + i_pos, j_pos = hyper_block_index[i, p] + for ii in range(ph): + for jj in range(pw): + estimate_denoised_image[i_pos + ii, j_pos + jj] += patch[ii, jj] + weights[i_pos + ii, j_pos + jj] += 1 diff --git a/src/bm3dornl/block_matching.py b/src/bm3dornl/block_matching.py new file mode 100644 index 0000000..c46a516 --- /dev/null +++ b/src/bm3dornl/block_matching.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python3 +"""Block matching to build hyper block from single sinogram.""" + +import numpy as np +from typing import Tuple, Optional +from bm3dornl.utils import ( + get_signal_patch_positions, + find_candidate_patch_ids, + is_within_threshold, + pad_patch_ids, +) + + +class PatchManager: + def __init__( + self, + image: np.ndarray, + patch_size: Tuple[int, int] = (8, 8), + stride: int = 1, + background_threshold: float = 0.1, + ): + """ + Initialize the PatchManager with an image, patch configuration, and background threshold + for distinguishing between signal and background patches. + + Parameters + ---------- + image : np.ndarray + The image from which patches will be managed. + patch_size : tuple + Dimensions (height, width) of each patch. Default is (8, 8). + stride : int + The stride with which to slide the window across the image. Default is 1. + background_threshold : float + The mean intensity threshold below which a patch is considered a background patch. + """ + self._image = image + self.patch_size = patch_size + self.stride = stride + self.background_threshold = background_threshold + self.signal_patches_pos = [] + self.signal_blocks_matrix = [] + self._generate_patch_positions() + + def _generate_patch_positions(self): + """Generate the positions of signal and background patches in the image.""" + self.signal_patches_pos = get_signal_patch_positions( + self._image, self.patch_size, self.stride, self.background_threshold + ) + + @property + def image(self): + return self._image + + @image.setter + def image(self, value): + self._image = value + self._generate_patch_positions() + + def get_patch( + self, position: tuple, source_image: Optional[np.ndarray] = None + ) -> np.ndarray: + """Retreive a patch from the image at the specified position. + + Parameters: + ---------- + position : tuple + The row and column indices of the top-left corner of the patch. + source_image : np.ndarray + + Returns: + ------- + np.ndarray + The patch extracted from the image. + """ + source_image = self._image if source_image is None else source_image + i, j = position + return source_image[i : i + self.patch_size[0], j : j + self.patch_size[1]] + + def group_signal_patches( + self, cut_off_distance: tuple, intensity_diff_threshold: float + ): + """ + Group signal patches into blocks based on spatial and intensity distance thresholds. + + Parameters: + ---------- + cut_off_distance : tuple + Maximum spatial distance in terms of row and column indices for patches in the same block, Manhattan distance (taxi cab distance). + intensity_diff_threshold : float + Maximum Euclidean distance in intensity for patches to be considered similar. + """ + num_patches = len(self.signal_patches_pos) + self.signal_blocks_matrix = np.eye(num_patches, dtype=bool) + + # Cache patches as views + cached_patches = [self.get_patch(pos) for pos in self.signal_patches_pos] + + for ref_patch_id in range(num_patches): + ref_patch = cached_patches[ref_patch_id] + candidate_patch_ids = find_candidate_patch_ids( + self.signal_patches_pos, ref_patch_id, cut_off_distance + ) + # iterate over the candidate patches + for neightbor_patch_id in candidate_patch_ids: + if is_within_threshold( + ref_patch, + cached_patches[neightbor_patch_id], + intensity_diff_threshold, + ): + self.signal_blocks_matrix[ref_patch_id, neightbor_patch_id] = True + self.signal_blocks_matrix[neightbor_patch_id, ref_patch_id] = True + + def get_hyper_block( + self, + num_patches_per_group: int, + padding_mode="circular", + alternative_source: np.ndarray = None, + ): + """ + Return groups of similar patches as 4D arrays with each group having a fixed number of patches. + + Parameters: + ---------- + num_patches_per_group : int + Number of patches in each group. + padding_mode : str + Mode for padding the patch IDs when the number of candidates is less than `num_patches_per_group`. + Options are 'first', 'repeat_sequence', 'circular', 'mirror', 'random'. + alternative_source : cp.ndarray + An alternative source image to extract patches from. Default is None. + + Returns: + ------- + tuple + A tuple containing the 4D array of patch groups and the corresponding positions. + + TODO: + ----- + - use multi-processing to further improve the speed of block building + """ + group_size = len(self.signal_blocks_matrix) + block = np.empty( + (group_size, num_patches_per_group, *self.patch_size), dtype=np.float32 + ) + positions = np.empty((group_size, num_patches_per_group, 2), dtype=np.int32) + + for i, row in enumerate(self.signal_blocks_matrix): + candidate_patch_ids = np.where(row)[0] + padded_patch_ids = pad_patch_ids( + candidate_patch_ids, num_patches_per_group, mode=padding_mode + ) + # update block and positions + block[i] = np.array( + [ + self.get_patch(self.signal_patches_pos[idx], alternative_source) + for idx in padded_patch_ids + ] + ) + positions[i] = np.array(self.signal_patches_pos[padded_patch_ids]) + + return block, positions diff --git a/src/bm3dornl/bm3dornl.py b/src/bm3dornl/bm3dornl.py deleted file mode 100644 index d30dde6..0000000 --- a/src/bm3dornl/bm3dornl.py +++ /dev/null @@ -1,66 +0,0 @@ -""" -Main Qt application -""" - -import sys -from qtpy.QtWidgets import QApplication, QMainWindow - -from mantid.kernel import Logger -from mantidqt.gui_helper import set_matplotlib_backend - -# make sure matplotlib is correctly set before we import shiver -set_matplotlib_backend() - -# make sure the algorithms have been loaded so they are available to the AlgorithmManager -import mantid.simpleapi # noqa: F401, E402 pylint: disable=unused-import, wrong-import-position - -from packagenamepy.configuration import Configuration # noqa: E402 pylint: disable=wrong-import-position -from packagenamepy.version import __version__ # noqa: E402 pylint: disable=wrong-import-position -from packagenamepy.mainwindow import MainWindow # noqa: E402 pylint: disable=wrong-import-position - -logger = Logger("PACKAGENAME") - - -class PackageName(QMainWindow): - """Main Package window""" - - __instance = None - - def __new__(cls): - if PackageName.__instance is None: - PackageName.__instance = QMainWindow.__new__(cls) # pylint: disable=no-value-for-parameter - return PackageName.__instance - - def __init__(self, parent=None): - super().__init__(parent) - logger.information(f"PackageName version: {__version__}") - config = Configuration() - - if not config.is_valid(): - msg = ( - "Error with configuration settings!", - f"Check and update your file: {config.config_file_path}", - "with the latest settings found here:", - f"{config.template_file_path} and start the application again.", - ) - - print(" ".join(msg)) - sys.exit(-1) - self.setWindowTitle(f"PACKAGENAME - {__version__}") - self.main_window = MainWindow(self) - self.setCentralWidget(self.main_window) - - -def gui(): - """ - Main entry point for Qt application - """ - input_flags = sys.argv[1::] - if "--v" in input_flags or "--version" in input_flags: - print(__version__) - sys.exit() - else: - app = QApplication(sys.argv) - window = PackageName() - window.show() - sys.exit(app.exec_()) diff --git a/src/bm3dornl/configuration.py b/src/bm3dornl/configuration.py deleted file mode 100644 index 86c68ba..0000000 --- a/src/bm3dornl/configuration.py +++ /dev/null @@ -1,107 +0,0 @@ -"""Module to load the the settings from SHOME/.packagename/configuration.ini file - -Will fall back to a default""" - -import os -import shutil - -from configparser import ConfigParser -from pathlib import Path -from mantid.kernel import Logger - -logger = Logger("PACKAGENAME") - -# configuration settings file path -CONFIG_PATH_FILE = os.path.join(Path.home(), ".packagename", "configuration.ini") - - -class Configuration: - """Load and validate Configuration Data""" - - def __init__(self): - """initialization of configuration mechanism""" - # capture the current state - self.valid = False - - # locate the template configuration file - project_directory = Path(__file__).resolve().parent - self.template_file_path = os.path.join( - project_directory, "configuration_template.ini" - ) - - # retrieve the file path of the file - self.config_file_path = CONFIG_PATH_FILE - logger.information(f"{self.config_file_path} will be used") - - # if template conf file path exists - if os.path.exists(self.template_file_path): - # file does not exist create it from template - if not os.path.exists(self.config_file_path): - # if directory structure does not exist create it - if not os.path.exists(os.path.dirname(self.config_file_path)): - os.makedirs(os.path.dirname(self.config_file_path)) - shutil.copy2(self.template_file_path, self.config_file_path) - - self.config = ConfigParser(allow_no_value=True, comment_prefixes="/") - # parse the file - try: - self.config.read(self.config_file_path) - # validate the file has the all the latest variables - self.validate() - except ValueError as err: - logger.error(str(err)) - logger.error(f"Problem with the file: {self.config_file_path}") - else: - logger.error( - f"Template configuration file: {self.template_file_path} is missing!" - ) - - def validate(self): - """validates that the fields exist at the config_file_path and writes any missing fields/data - using the template configuration file: configuration_template.ini as a guide""" - template_config = ConfigParser(allow_no_value=True, comment_prefixes="/") - template_config.read(self.template_file_path) - for section in template_config.sections(): - # if section is missing - if section not in self.config.sections(): - # copy the whole section - self.config.add_section(section) - - for item in template_config.items(section): - field, _ = item - if field not in self.config[section]: - # copy the field - self.config[section][field] = template_config[section][field] - with open(self.config_file_path, "w", encoding="utf8") as config_file: - self.config.write(config_file) - self.valid = True - - def is_valid(self): - """returns the configuration state""" - return self.valid - - -def get_data(section, name=None): - """retrieves the configuration data for a variable with name""" - # default file path location - config_file_path = CONFIG_PATH_FILE - if os.path.exists(config_file_path): - config = ConfigParser() - # parse the file - config.read(config_file_path) - try: - if name: - value = config[section][name] - # in case of boolean string value cast it to bool - if value in ("True", "False"): - return value == "True" - # in case of None - if value == "None": - return None - return value - return config[section] - except KeyError as err: - # requested section/field do not exist - logger.error(str(err)) - return None - return None diff --git a/src/bm3dornl/configuration_template.ini b/src/bm3dornl/configuration_template.ini deleted file mode 100644 index c8ea100..0000000 --- a/src/bm3dornl/configuration_template.ini +++ /dev/null @@ -1,2 +0,0 @@ -[global.other] -help_url = https://github.com/neutrons/python_project_template/blob/main/README.md diff --git a/src/bm3dornl/denoiser.py b/src/bm3dornl/denoiser.py new file mode 100644 index 0000000..3347c79 --- /dev/null +++ b/src/bm3dornl/denoiser.py @@ -0,0 +1,301 @@ +#!/usr/bin/env python +"""Denoiser module for BM3D-ORNL.""" + +import logging +import numpy as np +from typing import Tuple +from scipy.signal import medfilt2d +from bm3dornl.aggregation import aggregate_patches +from bm3dornl.block_matching import PatchManager +from bm3dornl.gpu_utils import ( + hard_thresholding, + wiener_hadamard, + memory_cleanup, +) +from bm3dornl.utils import ( + horizontal_binning, + horizontal_debinning, +) + + +class BM3D: + def __init__( + self, + image: np.ndarray, + patch_size: Tuple[int, int] = (8, 8), + stride: int = 3, + background_threshold: float = 0.1, + ): + """ + Initialize the BM3D class with an image and configuration parameters for patch management and denoising. + + Parameters + ---------- + image : np.ndarray + The sinogram or image to be denoised. + patch_size : tuple + Dimensions (height, width) of each patch. Default is (8, 8). + stride : int + The stride with which to slide the window across the image. Default is 3. + background_threshold : float + The mean intensity threshold below which a patch is considered a background patch. + """ + self.image = np.asarray(image) + self.estimate_denoised_image = np.zeros_like(self.image, dtype=float) + self.final_denoised_image = np.zeros_like(self.image, dtype=float) + # Initialize the PatchManager + self.patch_manager = PatchManager( + self.image, + patch_size=patch_size, + stride=stride, + background_threshold=background_threshold, + ) + # record input parameters + self.patch_size = patch_size + self.stride = stride + self.background_threshold = background_threshold + + def group_signal_patches( + self, cut_off_distance: Tuple[int, int], intensity_diff_threshold: float + ): + """ + Group signal patches into blocks based on spatial and intensity distance thresholds. + + Parameters: + ---------- + cut_off_distance : tuple + Maximum spatial distance in terms of row and column indices for patches in the same block. + intensity_diff_threshold : float + Maximum Euclidean distance in intensity for patches to be considered similar. + """ + self.patch_manager.group_signal_patches( + cut_off_distance, intensity_diff_threshold + ) + logging.info( + f"Total number of signal patches: {len(self.patch_manager.signal_patches_pos)}" + ) + + def thresholding( + self, + cut_off_distance: Tuple[int, int], + intensity_diff_threshold: float, + num_patches_per_group: int, + threshold: float, + ) -> np.ndarray: + """ + Perform the denoising process using the specified configuration. + + Parameters: + ---------- + cut_off_distance : tuple + Maximum spatial distance in terms of row and column indices for patches in the same block. + intensity_diff_threshold : float + Maximum Euclidean distance in intensity for patches to be considered similar. + num_patches_per_group : int + The number of patchs in each block. + threshold : float + The threshold value for hard thresholding during the first pass. + + Returns: + ------- + np.ndarray + The denoised image estimate. + """ + self.group_signal_patches(cut_off_distance, intensity_diff_threshold) + + weights = np.zeros_like(self.image, dtype=float) + + logging.info("Block matching for 1st pass...") + block, positions = self.patch_manager.get_hyper_block( + num_patches_per_group=num_patches_per_group, padding_mode="circular" + ) + + logging.info("Applying shrinkage...") + block = hard_thresholding(block, threshold) + + # manual release of memory + memory_cleanup() + + # Aggreation + # NOTE: this part needs optimization (numba or parallel or both) + logging.info("Aggregating...") + aggregate_patches( + estimate_denoised_image=self.estimate_denoised_image, + weights=weights, + hyper_block=block, + hyper_block_index=positions, + ) + + # Normalize by the weights to compute the average + self.estimate_denoised_image /= np.maximum(weights, 1) + + # update the patch manager with the new estimate + self.patch_manager.background_threshold *= ( + 0.5 # reduce the threshold for background threshold further + ) + self.patch_manager.image = self.estimate_denoised_image + + def re_filtering( + self, + cut_off_distance: Tuple[int, int], + intensity_diff_threshold: float, + num_patches_per_group: int, + ): + """ + Perform the second step for BM3D, re-filter using estimates as reference noisy free image. + + Parameters + ---------- + cut_off_distance : tuple + Maximum spatial distance in terms of row and column indices for patches in the same block. + intensity_diff_threshold : float + Maximum Euclidean distance in intensity for patches to be considered similar. + num_patches_per_group : int + The number of patch in each block. + """ + # assume the patch manager has been update to use the estimate_denoised_image + # NOTE: this should give us better blocks as we are using a noise reduced image as reference + self.group_signal_patches(cut_off_distance, intensity_diff_threshold) + + weights = np.zeros_like(self.image, dtype=np.float64) + + # estimate the noise + noise = np.asarray(self.image) - self.estimate_denoised_image + sigma_squared = np.mean(noise**2) + + logging.info("Block matching for 2nd pass...") + block, positions = self.patch_manager.get_hyper_block( + num_patches_per_group=num_patches_per_group, + padding_mode="circular", + alternative_source=self.image, # use the original image + ) + + logging.info("Wiener-Hadamard filtering...") + block = wiener_hadamard(block, sigma_squared) + + # manual release of memory + memory_cleanup() + + # Aggreation + # NOTE: this part needs optimization (numba or parallel or both) + logging.info("Aggregating...") + aggregate_patches( + estimate_denoised_image=self.final_denoised_image, + weights=weights, + hyper_block=block, + hyper_block_index=positions, + ) + + # Normalize by the weights to compute the average + self.final_denoised_image /= np.maximum(weights, 1) + + def denoise( + self, + cut_off_distance: Tuple[int, int], + intensity_diff_threshold: float, + num_patches_per_group: int, + threshold: float, + ): + """ + Perform the BM3D denoising process on the input image. + + Parameters: + ---------- + cut_off_distance : tuple + Maximum spatial distance in terms of row and column indices for patches in the same block. + intensity_diff_threshold : float + Maximum Euclidean distance in intensity for patches to be considered similar. + num_patches_per_group : int + The number of patch in each block. + threshold : float + The threshold value for hard thresholding during the first pass. + """ + logging.info("First pass: Hard thresholding") + self.thresholding( + cut_off_distance, intensity_diff_threshold, num_patches_per_group, threshold + ) + + logging.info("Second pass: Re-filtering") + self.re_filtering( + cut_off_distance, intensity_diff_threshold, num_patches_per_group + ) + + +def bm3d_streak_removal( + sinogram: np.ndarray, + background_threshold: float = 0.1, + patch_size: Tuple[int, int] = (8, 8), + stride: int = 3, + cut_off_distance: Tuple[int, int] = (64, 64), + intensity_diff_threshold: float = 0.1, + num_patches_per_group: int = 400, + shrinkage_threshold: float = 0.1, + k: int = 4, +) -> np.ndarray: + """Multiscale BM3D for streak removal + + Parameters + ---------- + sinogram : np.ndarray + The input sinogram to be denoised. + background_threshold: float + Estimated background intensity threshold, default to 0.1. + patch_size : tuple[int, int], optional + The size of the patches, by default (8, 8) + stride: + Steps when generating blocks with sliding window. + cut_off_distance : tuple + Maximum spatial distance in terms of row and column indices for patches in the same block. + intensity_diff_threshold : float, optional + The threshold for patch similarity, by default 0.01 + num_patches_per_group : int + The number of patch in each block. + shrinkage_threshold : float, optional + The threshold for hard thresholding, by default 0.2 + k : int, optional + The number of iterations for horizontal binning, by default 3 + + Returns + ------- + np.ndarray + The denoised sinogram. + + References + ---------- + [1] ref: `Collaborative Filtering of Correlated Noise `_ + [2] ref: `Ring artifact reduction via multiscale nonlocal collaborative filtering of spatially correlated noise `_ + """ + # step 0: median filter the sinogram + sinogram = medfilt2d(sinogram, kernel_size=3) + sino_star = sinogram + + # step 1: create a list of binned sinograms + binned_sinos = horizontal_binning(sinogram, k=k) + # reverse the list + binned_sinos = binned_sinos[::-1] + + # step 2: estimate the noise level from the coarsest sinogram, then working back to the original sinogram + noise_estimate = None + for i in range(len(binned_sinos)): + logging.info(f"Processing binned sinogram {i+1} of {len(binned_sinos)}") + sino = binned_sinos[i] + sino_star = ( + sino if i == 0 else sino - horizontal_debinning(noise_estimate, sino) + ) + + if i < len(binned_sinos) - 1: + worker = BM3D( + image=sino_star, + patch_size=patch_size, + stride=stride, + background_threshold=background_threshold, + ) + worker.denoise( + cut_off_distance=cut_off_distance, + intensity_diff_threshold=intensity_diff_threshold, + num_patches_per_group=num_patches_per_group, + threshold=shrinkage_threshold, + ) + noise_estimate = sino - worker.final_denoised_image + + return sino_star diff --git a/src/bm3dornl/gpu_utils.py b/src/bm3dornl/gpu_utils.py new file mode 100644 index 0000000..1b301e8 --- /dev/null +++ b/src/bm3dornl/gpu_utils.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python3 +"""CuPy utility functions for GPU acceleration.""" + +import numpy as np +import cupy as cp +from cupyx.scipy.linalg import hadamard + + +def hard_thresholding( + hyper_block: np.ndarray, + threshold: float, +) -> np.ndarray: + """ + Apply shrinkage operation to a block of image patches on GPU using CuPy. + + This function transforms the block of patches into the frequency domain using FFT, + applies hard thresholding to attenuate small coefficients, and then transforms the + patches back to the spatial domain to acquire a noise-free estimate. + + Parameters + ---------- + hyper_block : cp.ndarray + A 4D CuPy array containing groups of stack of 2D image patches. + The shape of `hyper_block` should be (group, n_patches, patch_height, patch_width). + threshold : float + The threshold value for hard thresholding. Coefficients with absolute values below + this threshold will be set to zero. + + Returns + ------- + denoised_block : np.ndarray + A 4D CuPy array of the same shape as `hyper_block`, containing the denoised patches. + + Notes + ----- + 1. This function uses GPU acceleration to improve the performance of the FFT-based denoising process. + 2. FFT cache are manually cleared to release memory after each iteration, avoid potential CUDA out of memory error. + """ + # Send data to the GPU + hyper_block = cp.asarray(hyper_block) + + # Transform the patch block to the frequency domain using rfft + hyper_block = cp.fft.rfft2(hyper_block, axes=(1, 2, 3)) + + # Apply hard thresholding + hyper_block[cp.abs(hyper_block) < threshold] = 0 + + # Transform the block back to the spatial domain using irFFT + hyper_block = cp.fft.irfft2(hyper_block, axes=(1, 2, 3)) + + # Send data back to the CPU + denoised_block = hyper_block.get() + del hyper_block + + return denoised_block + + +def wiener_hadamard(hyper_block: np.ndarray, sigma_squared: float): + """ + Wiener filter using the Hadamard transform, implemented with CuPy for GPU acceleration. + + This function handles both 3D and 4D inputs where patches are square and of size 2^n x 2^n. + + Parameters + ---------- + hyper_block : cp.ndarray + A 3D or 4D array containing groups of image patches in the **spatial** domain. + sigma_squared : float + The noise variance. + + Returns + ------- + np.ndarray + An array of the same shape as `patch_block`, containing the denoised patches. + """ + # Send data to the GPU + hyper_block = cp.asarray(hyper_block) + + # Get the size of the patches + n = hyper_block.shape[-1] # Assuming square patches + H = hadamard(n) + + # Flatten 4D to 3D if necessary + original_shape = hyper_block.shape + if hyper_block.ndim == 4: + hyper_block = hyper_block.reshape(-1, n, n) + + # Hadamard transform + hyper_block = cp.einsum("ij,kjl->kil", H, hyper_block) + hyper_block = cp.einsum("ijk,kl->ijl", hyper_block, H) + + # Calculate mean and variance across the patches dimension + local_mean = cp.mean(hyper_block, axis=0, keepdims=True) + local_variance = cp.var(hyper_block, axis=0, keepdims=True) + + # Apply Wiener filter + hyper_block = (1 - sigma_squared / (local_variance + 1e-8)) * ( + hyper_block - local_mean + ) + local_mean + mask = cp.broadcast_to(local_variance < sigma_squared, hyper_block.shape) + hyper_block[mask] = 0 + + # Inverse Hadamard transform + hyper_block = cp.einsum("ij,kjl->kil", H, hyper_block) + hyper_block = cp.einsum("ijk,kl->ijl", hyper_block, H) / (n * n) + + # Reshape back if it was 4D + if original_shape != hyper_block.shape: + hyper_block = hyper_block.reshape(original_shape) + + # Send data back to the CPU + denoised_block = hyper_block.get() + + # release memory + del hyper_block + + return denoised_block + + +def memory_cleanup(): + """Clear the memory cache for CuPy and synchronize the default stream.""" + cp.get_default_memory_pool().free_all_blocks() + cp.get_default_pinned_memory_pool().free_all_blocks() + cp.cuda.Stream.null.synchronize() diff --git a/src/bm3dornl/phantom.py b/src/bm3dornl/phantom.py new file mode 100644 index 0000000..b2522a5 --- /dev/null +++ b/src/bm3dornl/phantom.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python3 +"""Sinogram generation for phantom data.""" + +import numpy as np +from skimage.transform import radon +from typing import Tuple + + +def shepp_logan_phantom(size: int = 256, contrast_factor: float = 2.0) -> np.ndarray: + """ + Generate a high-contrast Shepp-Logan phantom with intensity values normalized between 0 and 1. + + Parameters + ---------- + size : int, optional + The width and height of the square image, by default 256. + contrast_factor : float, optional + Factor by which to multiply the intensities to increase contrast, by default 2.0. + + Returns + ------- + np.ndarray + A 2D array representing the high-contrast phantom image. + """ + ellipses = [ + [0.69, 0.92, 0, 0, 0, 2], # Outer ellipse + [0.6624, 0.874, 0, -0.0184, 0, -0.98], + [0.21, 0.25, 0.22, 0, -18, -0.2], + [0.16, 0.41, -0.22, 0, 18, -0.2], + [0.21, 0.25, 0, 0.35, 0, 0.1], + [0.046, 0.046, 0, 0.1, 0, 0.2], + [0.046, 0.046, 0, -0.1, 0, 0.2], + [0.046, 0.023, -0.08, -0.605, 0, 0.2], + [0.023, 0.023, 0, -0.606, 0, 0.2], + [0.023, 0.046, 0.06, -0.605, 0, 0.2], + ] + + phantom = np.zeros((size, size)) + + for ellipse in ellipses: + a, b, x0, y0, phi, intensity = ellipse + intensity *= contrast_factor + y, x = np.ogrid[-1 : 1 : size * 1j, -1 : 1 : size * 1j] + phi = np.deg2rad(phi) + x_rot = x * np.cos(phi) + y * np.sin(phi) + y_rot = -x * np.sin(phi) + y * np.cos(phi) + mask = ((x_rot - x0) ** 2 / a**2) + ((y_rot - y0) ** 2 / b**2) <= 1 + phantom += mask * intensity + + min_val = phantom.min() + max_val = phantom.max() + phantom = (phantom - min_val) / (max_val - min_val) + + return phantom + + +def generate_sinogram( + input_img: np.ndarray, + scan_step: float, +) -> Tuple[np.ndarray, np.ndarray]: + """Simulate sinogram from input image. + + Parameters + ---------- + input_img : np.ndarray + Input image. + scan_step : float + Scan step in degrees. + + Returns + ------- + sinogram : np.ndarray + Generated sinogram. + theta : np.ndarray + Projection angles in degrees. + + Example + ------- + >>> img = np.random.rand(256, 256) + >>> sinogram, thetas_deg = generate_sinogram(img, 1) + >>> print(sinogram.shape, thetas_deg.shape) + (360, 256) (360,) + """ + # prepare thetas_deg + thetas_deg = np.arange(-180, 180, scan_step) + + # prepare sinogram + # perform virtual projection via radon transform + sinogram = radon( + input_img, + theta=thetas_deg, + circle=True, + ).T # transpose to get the sinogram in the correct orientation for tomopy + + return sinogram, thetas_deg + + +def simulate_detector_gain_error( + sinogram: np.ndarray, + detector_gain_range: Tuple[float, float], + detector_gain_error: float, +) -> Tuple[np.ndarray, np.ndarray]: + """Simulate detector gain error. + + Parameters + ---------- + sinogram : np.ndarray + Input sinogram. + detector_gain_range : Tuple[float, float] + Detector gain range. + detector_gain_error : float + Detector gain error, along time axis. + + Returns + ------- + sinogram : np.ndarray + Sinogram with detector gain error. + detector_gain : np.ndarray + Detector gain. + + Example + ------- + >>> img = np.random.rand(256, 256) + >>> sinogram, thetas_deg = generate_sinogram(img, 1) + >>> sinogram, detector_gain = simulate_detector_gain_error( + ... sinogram, + ... (0.9, 1.1), + ... 0.1, + ... ) + >>> print(sinogram.shape, detector_gain.shape) + (360, 256) (360, 256) + """ + # prepare detector_gain + detector_gain = np.random.uniform( + detector_gain_range[0], + detector_gain_range[1], + sinogram.shape[1], + ) + detector_gain = np.ones(sinogram.shape) * detector_gain + + # simulate detector gain vary slightly along time axis + if detector_gain_error != 0.0: + detector_gain = np.random.normal( + detector_gain, + detector_gain * detector_gain_error, + ) + + # apply detector_gain + sinogram = sinogram * detector_gain + + # rescale sinogram to [0, 1] + sinogram = (sinogram - sinogram.min()) / (sinogram.max() - sinogram.min()) + 1e-8 + + # convert to float32 + sinogram = sinogram.astype(np.float32) + detector_gain = detector_gain.astype(np.float32) + + return sinogram, detector_gain diff --git a/src/bm3dornl/utils.py b/src/bm3dornl/utils.py new file mode 100644 index 0000000..383dcca --- /dev/null +++ b/src/bm3dornl/utils.py @@ -0,0 +1,269 @@ +#!/usr/bin/env python3 +"""Utility functions for BM3DORNL.""" + +import numpy as np +from scipy.interpolate import RectBivariateSpline +from numba import jit +from typing import Tuple, List + + +@jit(nopython=True) +def find_candidate_patch_ids( + signal_patches: np.ndarray, ref_index: int, cut_off_distance: Tuple +) -> List: + """ + Identify candidate patch indices that are within the specified Manhattan distance from a reference patch. + + This function computes a list of indices for patches that are within a given row and column distance from + the reference patch specified by `ref_index`. It only considers patches that have not been compared previously + (i.e., patches that are ahead of the reference patch in the list, ensuring the upper triangle of the comparison matrix). + + Parameters + ---------- + signal_patches : np.ndarray + Array containing the positions of all signal patches. Each position is represented as (row_index, column_index). + ref_index : int + Index of the reference patch in `signal_patches` for which candidates are being sought. + cut_off_distance : tuple + A tuple (row_dist, col_dist) specifying the maximum allowed distances in the row and column directions. + + Returns + ------- + list + A list of integers representing the indices of the candidate patches in `signal_patches` that are within + the `cut_off_distance` from the reference patch and are not previously compared (ensuring upper triangle). + """ + num_patches = signal_patches.shape[0] + ref_pos = signal_patches[ref_index] + candidate_patch_ids = [] + + for i in range(ref_index + 1, num_patches): # Ensure only checking upper triangle + if ( + np.abs(signal_patches[i, 0] - ref_pos[0]) <= cut_off_distance[0] + and np.abs(signal_patches[i, 1] - ref_pos[1]) <= cut_off_distance[1] + ): + candidate_patch_ids.append(i) + + return candidate_patch_ids + + +@jit(nopython=True) +def is_within_threshold( + ref_patch: np.ndarray, cmp_patch: np.ndarray, intensity_diff_threshold: float +) -> bool: + """ + Determine if the Euclidean distance between two patches is less than a specified threshold. + + This function computes the Euclidean distance between two patches and checks if it is less than the provided + intensity difference threshold. It is optimized with Numba's JIT in nopython mode to ensure high performance. + + Parameters + ---------- + ref_patch : np.ndarray + The reference patch as a flattened array of intensities. + cmp_patch : np.ndarray + The comparison patch as a flattened array of intensities. + intensity_diff_threshold : float + The threshold below which the Euclidean distance between the patches is considered sufficiently small + for the patches to be deemed similar. + + Returns + ------- + bool + True if the Euclidean distance between `ref_patch` and `cmp_patch` is less than `intensity_diff_threshold`; + otherwise, False. + + Example: + -------- + >>> ref_patch = np.array([1, 2, 3]) + >>> cmp_patch = np.array([1, 2, 5]) + >>> threshold = 2.5 + >>> is_within_threshold(ref_patch, cmp_patch, threshold) + False + """ + return np.linalg.norm(ref_patch - cmp_patch) <= intensity_diff_threshold + + +@jit(nopython=True) +def get_signal_patch_positions( + image: np.ndarray, + patch_size: Tuple[int, int] = (8, 8), + stride: int = 3, + background_threshold: float = 0.1, +) -> np.ndarray: + """Segment an image into signal patches. + + Parameters + ---------- + image : np.ndarray + The input image to be segmented into patches. + patch_size : Tuple[int, int] + The size of the patches to be extracted. + stride : int + The stride for patch extraction. + background_threshold : float + The threshold for determining background patches. + + Returns + ------- + signal_patches : np.ndarray + An array of positions of signal patches. + + NOTE + ---- + Numba has issues with return a Tuple of np.ndarray, and since we only operates on signal patches, + we will ignore the background patches for now. + """ + i_height, i_width = image.shape + p_height, p_width = patch_size + + signal_patches = [] + + for r in range(0, i_height - p_height + 1, stride): + for c in range(0, i_width - p_width + 1, stride): + patch = image[r : r + p_height, c : c + p_width] + patch_max = np.max(patch) + if patch_max >= background_threshold: + signal_patches.append((r, c)) + + # deal with empty list + # Note: raise error when couldn't find a single signal patch from the entire + # sinogram, which usually indicating a bad background estimation. + if len(signal_patches) == 0: + raise ValueError( + "Couldn't find any signal patches in the image! Please check the background threshold." + ) + + return np.array(signal_patches) + + +def pad_patch_ids( + candidate_patch_ids: np.ndarray, + num_patches: int, + mode: str = "circular", +) -> np.ndarray: + """ + Pad the array of patch IDs to reach a specified length using different strategies. + + Parameters + ---------- + candidate_patch_ids : np.ndarray + Array of patch indices identified as candidates. + num_patches : int + Desired number of patches in the padded list. + mode : str + Padding mode, options are 'first', 'repeat_sequence', 'circular', 'mirror', 'random'. + + Returns + ------- + np.ndarray + Padded array of patch indices. + """ + current_length = len(candidate_patch_ids) + if current_length >= num_patches: + return candidate_patch_ids[:num_patches] + + if mode == "first": + padding = np.full((num_patches - current_length,), candidate_patch_ids[0]) + elif mode == "repeat_sequence": + repeats = (num_patches // current_length) + 1 + padded = np.tile(candidate_patch_ids, repeats)[:num_patches] + return padded + elif mode == "circular": + extended = np.tile(candidate_patch_ids, ((num_patches // current_length) + 1))[ + :num_patches + ] + return extended + elif mode == "mirror": + mirror_length = min(current_length, num_patches - current_length) + mirrored_part = candidate_patch_ids[:mirror_length][::-1] + return np.concatenate([candidate_patch_ids, mirrored_part]) + elif mode == "random": + random_padded = np.random.choice(candidate_patch_ids, num_patches, replace=True) + return random_padded + else: + raise ValueError("Unknown padding mode specified.") + + return np.concatenate([candidate_patch_ids, padding]) + + +def horizontal_binning(Z: np.ndarray, k: int = 0) -> list[np.ndarray]: + """ + Horizontal binning of the image Z into a list of k images. + + Parameters + ---------- + Z : np.ndarray + The image to be binned. + k : int + Number of iterations to bin the image by half. + + Returns + ------- + list[np.ndarray] + List of k images. + + Example + ------- + >>> Z = np.random.rand(64, 64) + >>> binned_zs = horizontal_binning(Z, 3) + >>> len(binned_zs) + 4 + """ + binned_zs = [Z] + for _ in range(k): + sub_z0 = Z[:, ::2] + sub_z1 = Z[:, 1::2] + # make sure z0 and z1 have the same shape + if sub_z0.shape[1] > sub_z1.shape[1]: + sub_z0 = sub_z0[:, :-1] + elif sub_z0.shape[1] < sub_z1.shape[1]: + sub_z1 = sub_z1[:, :-1] + # average z0 and z1 + Z = (sub_z0 + sub_z1) * 0.5 + binned_zs.append(Z) + return binned_zs + + +def horizontal_debinning(original_image: np.ndarray, target: np.ndarray) -> np.ndarray: + """ + Horizontal debinning of the image Z into the same shape as Z_target. + + Parameters + ---------- + original_image : np.ndarray + The image to be debinned. + target : np.ndarray + The target image to match the shape. + + Returns + ------- + np.ndarray + The debinned image. + + Example + ------- + >>> Z = np.random.rand(64, 64) + >>> target = np.random.rand(64, 128) + >>> debinned_z = horizontal_debinning(Z, target) + >>> debinned_z.shape + (64, 128) + """ + # Original dimensions + original_height, original_width = original_image.shape + # Target dimensions + new_height, new_width = target.shape + + # Original grid + original_x = np.arange(original_width) + original_y = np.arange(original_height) + + # Target grid + new_x = np.linspace(0, original_width - 1, new_width) + new_y = np.linspace(0, original_height - 1, new_height) + + # Spline interpolation + spline = RectBivariateSpline(original_y, original_x, original_image) + interpolated_image = spline(new_y, new_x) + + return interpolated_image diff --git a/src/bm3dornl/version.py b/src/bm3dornl/version.py deleted file mode 100644 index 6f674ca..0000000 --- a/src/bm3dornl/version.py +++ /dev/null @@ -1,8 +0,0 @@ -"""Module to load the version created by versioningit - -Will fall back to a default packagename is not installed""" - -try: - from ._version import __version__ -except ModuleNotFoundError: - __version__ = "0.0.1" diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..fcc77cf --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,16 @@ +# standard imports +from pathlib import Path +import pytest +from shutil import rmtree +from tempfile import mkdtemp + + +# NOTE: pytest fixtures tmp_path and tmp_path_factory are NOT deleting the temporary directory, hence this fixture +@pytest.fixture(scope="function") +def tmpdir(): + r"""Create directory, then delete the directory and its contents upon test exit""" + try: + temporary_dir = Path(mkdtemp()) + yield temporary_dir + finally: + rmtree(temporary_dir) diff --git a/tests/data/readme.md b/tests/data/readme.md new file mode 100644 index 0000000..5decf5f --- /dev/null +++ b/tests/data/readme.md @@ -0,0 +1,3 @@ +# Readme + +This folder contains testing data for unit, system and integration tests. diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/readme.md b/tests/integration/readme.md new file mode 100644 index 0000000..047a100 --- /dev/null +++ b/tests/integration/readme.md @@ -0,0 +1,3 @@ +# Readme + +This folder is for integration test. diff --git a/tests/test_version.py b/tests/test_version.py deleted file mode 100644 index aa6aec4..0000000 --- a/tests/test_version.py +++ /dev/null @@ -1,5 +0,0 @@ -from bm3dornl import __version__ - - -def test_version(): - assert __version__ == "0.0.1" diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/bm3dornl/test_aggregation.py b/tests/unit/bm3dornl/test_aggregation.py new file mode 100644 index 0000000..658764b --- /dev/null +++ b/tests/unit/bm3dornl/test_aggregation.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 + +"""Unit test for patch aggregation functions.""" + +import pytest +import numpy as np +from bm3dornl.aggregation import aggregate_patches + + +def test_aggregate_patches(): + # Setup + # ph, pw = 2, 2 # patch height and width + # num_blocks = 1 + # num_patches_per_block = 2 + + # Create a simple hyper block with known values + hyper_block = np.array([[[[1, 2], [3, 4]], [[5, 6], [7, 8]]]]) + + # Index positions where patches will be placed + hyper_block_index = np.array( + [ + [ + [0, 0], # First patch at top-left corner + [0, 0], # Second patch also starts at top-left for overlap + ] + ] + ) + + # Initial image and weights matrices sized 2x2 + estimate_denoised_image = np.zeros((2, 2), dtype=float) + weights = np.zeros((2, 2), dtype=float) + + # Expected outputs + expected_image = np.array( + [ + [6, 8], # Both patches contribute to the first row + [10, 12], # Both patches contribute to the second row + ] + ) + expected_weights = np.array( + [ + [2, 2], # Both patches contribute to each position + [2, 2], + ] + ) + + # Invoke the function under test + aggregate_patches(estimate_denoised_image, weights, hyper_block, hyper_block_index) + + # Assertions + np.testing.assert_array_almost_equal( + estimate_denoised_image, + expected_image, + err_msg="Image aggregation did not match expected", + ) + np.testing.assert_array_equal( + weights, expected_weights, err_msg="Weights update did not match expected" + ) + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/tests/unit/bm3dornl/test_block_matching.py b/tests/unit/bm3dornl/test_block_matching.py new file mode 100644 index 0000000..56ea0f3 --- /dev/null +++ b/tests/unit/bm3dornl/test_block_matching.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 +"""Unit test for block matching functions.""" + +import pytest +import numpy as np +from bm3dornl.block_matching import PatchManager + + +@pytest.fixture +def patch_manager(): + image = np.ones(400).reshape(20, 20).astype(float) # Simple uniform image + patch_size = (5, 5) + stride = 5 + background_threshold = ( + 0.1 # All patches are signal since threshold is lower than image values + ) + manager = PatchManager(image, patch_size, stride, background_threshold) + return manager + + +def test_generate_patch_positions(patch_manager): + expected_number_of_patches = (20 // 5) * ( + 20 // 5 + ) # As stride equals the patch size + assert ( + len(patch_manager.signal_patches_pos) == expected_number_of_patches + ), "Incorrect number of signal patches generated." + + +def test_get_patch(patch_manager): + expected_patch = patch_manager.image[0:5, 0:5] + retrieved_patch = patch_manager.get_patch((0, 0)) + np.testing.assert_array_equal( + retrieved_patch, expected_patch, "Patch retrieved incorrectly." + ) + + +def test_group_signal_patches_geometric(patch_manager): + cut_off_distance = (100, 100) # Larger than image dimensions + intensity_diff_threshold = 0.5 # Irrelevant due to uniform image + patch_manager.group_signal_patches(cut_off_distance, intensity_diff_threshold) + expected_blocks = np.ones( + (len(patch_manager.signal_patches_pos), len(patch_manager.signal_patches_pos)), + dtype=bool, + ) + np.testing.assert_array_equal( + patch_manager.signal_blocks_matrix, + expected_blocks, + "Signal blocks grouped incorrectly.", + ) + + +def test_get_hyper_block(patch_manager): + cut_off_distance = (100, 100) # Larger than image dimensions + intensity_diff_threshold = 0.5 # Irrelevant due to uniform image + patch_manager.group_signal_patches(cut_off_distance, intensity_diff_threshold) + num_patches_per_group = 4 + padding_mode = "circular" + patch_groups, positions = patch_manager.get_hyper_block( + num_patches_per_group, padding_mode + ) + assert patch_groups.shape == ( + len(patch_manager.signal_patches_pos), + num_patches_per_group, + 5, + 5, + ), "Incorrect shape of patch groups." + assert positions.shape == ( + len(patch_manager.signal_patches_pos), + num_patches_per_group, + 2, + ), "Incorrect shape of positions." + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/tests/unit/bm3dornl/test_denoiser.py b/tests/unit/bm3dornl/test_denoiser.py new file mode 100644 index 0000000..876508b --- /dev/null +++ b/tests/unit/bm3dornl/test_denoiser.py @@ -0,0 +1,122 @@ +#!/usr/env/bin python3 + +"""Unit test for denoiser module.""" + +import pytest +import numpy as np +from unittest.mock import patch +from bm3dornl.denoiser import ( + BM3D, + bm3d_streak_removal, +) + + +def test_bm3d_initialization(): + image = np.random.rand(64, 64) + bm3d_instance = BM3D(image) + assert np.array_equal( + bm3d_instance.image, image + ), "The images should be identical after initialization." + assert ( + bm3d_instance.patch_manager is not None + ), "PatchManager should be initialized." + + +def test_group_signal_patches(): + image = np.random.rand(64, 64) + + bm3d_instance = BM3D(image) + with patch.object( + bm3d_instance.patch_manager, "group_signal_patches" + ) as mock_method: + bm3d_instance.group_signal_patches((5, 5), 0.1) + mock_method.assert_called_once_with((5, 5), 0.1) + + +def test_thresholding(): + image = np.random.rand(64, 64) + bm3d_instance = BM3D(image) + + # Patching the PatchManager instance method get_hyper_block + with patch.object( + bm3d_instance.patch_manager, "get_hyper_block", autospec=True + ) as mock_get_hyper_block, patch.object( + bm3d_instance.patch_manager, "_generate_patch_positions", autospec=True + ) as mock_generate_patch_positions, patch( + "bm3dornl.denoiser.hard_thresholding" + ) as mock_hard_thresholding, patch( + "bm3dornl.denoiser.aggregate_patches" + ) as mock_aggregate_patches, patch( + "bm3dornl.denoiser.memory_cleanup" + ) as mock_memory_cleanup: + # Configure the mock to return specific values + mock_get_hyper_block.return_value = ( + np.random.rand(10, 8, 8), + np.random.randint(0, 64, (10, 2)), + ) + mock_hard_thresholding.return_value = np.random.rand(10, 8, 8) + + # Call the method to be tested + bm3d_instance.thresholding((5, 5), 0.1, 10, 0.1) + + # Assertions to check if each function was called correctly + mock_generate_patch_positions.assert_called_once_with() + mock_get_hyper_block.assert_called_once_with(10, padding_mode="circular") + mock_hard_thresholding.assert_called_once() + mock_aggregate_patches.assert_called_once() + mock_memory_cleanup.assert_called_once() + + +@patch("bm3dornl.denoiser.wiener_hadamard") +@patch("bm3dornl.denoiser.aggregate_patches") +@patch("bm3dornl.denoiser.memory_cleanup") +@patch("bm3dornl.denoiser.PatchManager", autospec=True) +def test_re_filtering( + mock_patch_manager, + mock_memory_cleanup, + mock_aggregate_patches, + mock_wiener_hadamard, +): + image = np.random.rand(64, 64) + bm3d_instance = BM3D(image) + mock_patch_manager.return_value.get_hyper_block.return_value = ( + np.random.rand(10, 8, 8), + np.random.randint(0, 64, (10, 2)), + ) + mock_wiener_hadamard.return_value = np.random.rand(10, 8, 8) + + with patch.object( + bm3d_instance, "group_signal_patches" + ) as mock_group_signal_patches: + bm3d_instance.re_filtering((5, 5), 0.1, 10) + mock_group_signal_patches.assert_called_once_with((5, 5), 0.1) + + mock_wiener_hadamard.assert_called_once() + mock_aggregate_patches.assert_called_once() + mock_memory_cleanup.assert_called_once() + + +@patch("bm3dornl.denoiser.BM3D", autospec=True) +@patch("bm3dornl.denoiser.horizontal_binning", return_value=np.random.rand(64, 64)) +@patch("bm3dornl.denoiser.horizontal_debinning", return_value=np.random.rand(64, 64)) +@patch("bm3dornl.denoiser.medfilt2d", return_value=np.random.rand(64, 64)) +def test_bm3d_streak_removal( + mock_medfilt2d, mock_horizontal_debinning, mock_horizontal_binning, mock_bm3d +): + sinogram = np.random.rand(64, 64) + mock_bm3d_instance = mock_bm3d.return_value + mock_bm3d_instance.final_denoised_image = np.random.rand( + 64, 64 + ) # Set the final_denoised_image attribute + + result = bm3d_streak_removal(sinogram, k=1) + + assert result.shape == (64, 64), "The output should maintain the input dimensions." + mock_medfilt2d.assert_called_once_with(sinogram, kernel_size=3) + mock_horizontal_binning.assert_called() + mock_bm3d.assert_called() + mock_horizontal_debinning.assert_called() + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/tests/unit/bm3dornl/test_gpu_utils.py b/tests/unit/bm3dornl/test_gpu_utils.py new file mode 100644 index 0000000..dd09ae3 --- /dev/null +++ b/tests/unit/bm3dornl/test_gpu_utils.py @@ -0,0 +1,131 @@ +#!/usr/env/bin python3 + +"""Unit test for cupy utility module.""" + +import pytest +import numpy as np +import cupy as cp +from bm3dornl.gpu_utils import ( + hard_thresholding, + wiener_hadamard, + memory_cleanup, +) + + +@pytest.mark.cuda_required +def test_hard_thresholding(): + # Setup the patch block + patch_block = np.random.rand(2, 5, 8, 8) # Random block of patches on GPU + threshold = 0.5 # Threshold for hard thresholding + + # Apply shrinkage + denoised_block = hard_thresholding(patch_block, threshold) + + # Convert back to frequency domain to check thresholding + dct_block_check = cp.fft.rfft2(cp.asarray(denoised_block), axes=(1, 2, 3)).get() + + # Test if all values in the DCT domain are either zero or above the threshold + # Allow a small tolerance for floating point arithmetic issues + tolerance = 1e-5 + assert np.all( + (np.abs(dct_block_check) >= threshold - tolerance) + | (np.abs(dct_block_check) < tolerance) + ), "DCT coefficients are not correctly thresholded" + + # Check the shape is maintained + assert ( + patch_block.shape == denoised_block.shape + ), "Output shape does not match input shape" + + # Check for any values that should not have been zeroed out + original_dct_block = cp.fft.rfft2(cp.asarray(patch_block), axes=(1, 2, 3)).get() + should_not_change = np.abs(original_dct_block) >= threshold + assert np.allclose( + dct_block_check[should_not_change], + original_dct_block[should_not_change], + atol=tolerance, + ), "Values that should not have been zeroed out have changed" + + # Cleanup GPU memory + memory_cleanup() + + +@pytest.mark.cuda_required +def test_wiener_hadamard_3d_input(): + # Prepare a 3D patch block + patch_block = np.random.rand(1000, 8, 8) # 1000 patches of 8x8 pixels + sigma_squared = 0.1 + + # Apply the Wiener-Hadamard filter + denoised_block = wiener_hadamard(patch_block, sigma_squared) + + # Check if the output dimensions match the input + assert ( + patch_block.shape == denoised_block.shape + ), "Output dimensions should match input dimensions" + + # Ensure changes were made to the patch block + assert not cp.allclose( + patch_block, denoised_block, atol=1e-3 + ), "No changes detected in the patch block after filtering" + + # Cleanup GPU memory + memory_cleanup() + + +@pytest.mark.cuda_required +def test_wiener_hadamard_4d_input(): + # Prepare a 4D patch block + patch_block = np.random.rand( + 4, 1000, 8, 8 + ) # 4 batches, 1000 patches each, of 8x8 pixels + sigma_squared = 0.1 + + # Apply the Wiener-Hadamard filter + denoised_block = wiener_hadamard(patch_block, sigma_squared) + + # Check if the output dimensions match the input + assert ( + patch_block.shape == denoised_block.shape + ), "Output dimensions should match input dimensions" + + # Ensure changes were made to the patch block + assert not np.allclose( + patch_block, denoised_block, atol=1e-3 + ), "No changes detected in the patch block after filtering" + + # Cleanup GPU memory + memory_cleanup() + + +@pytest.mark.cuda_required +def test_memory_cleanup(mocker): + # Create mock objects for the method chains + mock_free_all_blocks = mocker.Mock() + mock_free_all_blocks_pinned = mocker.Mock() + mock_synchronize = mocker.Mock() + + # Mock the chain calls + mock_memory_pool = mocker.patch( + "cupy.get_default_memory_pool", return_value=mock_free_all_blocks + ) + mock_memory_pool().free_all_blocks = mock_free_all_blocks + + mock_pinned_memory_pool = mocker.patch( + "cupy.get_default_pinned_memory_pool", return_value=mock_free_all_blocks_pinned + ) + mock_pinned_memory_pool().free_all_blocks = mock_free_all_blocks_pinned + + mocker.patch("cupy.cuda.Stream.null.synchronize", mock_synchronize) + + # Call the function + memory_cleanup() + + # Check if the functions were called + mock_free_all_blocks.assert_called_once() + mock_free_all_blocks_pinned.assert_called_once() + mock_synchronize.assert_called_once() + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/tests/unit/bm3dornl/test_phantom.py b/tests/unit/bm3dornl/test_phantom.py new file mode 100644 index 0000000..800cc07 --- /dev/null +++ b/tests/unit/bm3dornl/test_phantom.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 + +"""Unit test for phantom module.""" + +import pytest +import numpy as np +from bm3dornl.phantom import ( + shepp_logan_phantom, + generate_sinogram, + simulate_detector_gain_error, +) + + +def test_shepp_logan_phantom(): + """Test the shepp_logan_phantom function.""" + size = 256 + contrast_factor = 2.0 + phantom = shepp_logan_phantom(size=size, contrast_factor=contrast_factor) + + # Check the shape + assert phantom.shape == (size, size), "Phantom shape mismatch" + + # Check that all values are between 0 and 1 + assert phantom.min() >= 0, "Phantom values should be >= 0" + assert phantom.max() <= 1, "Phantom values should be <= 1" + + # Check that the phantom contains meaningful non-zero values + assert np.any(phantom > 0), "Phantom should have non-zero values" + + +def test_generate_sinogram(): + """Test the generate_sinogram function.""" + input_size = 256 + scan_step = 1.0 + input_img = np.random.rand(input_size, input_size) + + sinogram, thetas_deg = generate_sinogram(input_img, scan_step) + + # Verify the shape of the sinogram + expected_num_projections = int(360 / scan_step) + assert sinogram.shape == ( + expected_num_projections, + input_size, + ), f"Sinogram shape mismatch, expected: {(expected_num_projections, input_size)}" + + # Verify the length of the angles array + assert thetas_deg.shape == ( + expected_num_projections, + ), f"Theta shape mismatch, expected: {(expected_num_projections,)}" + + # Ensure that the theta array spans the correct range + assert thetas_deg.min() >= -180, "Minimum theta value should be -180 degrees" + assert thetas_deg.max() < 180, "Maximum theta value should be less than 180 degrees" + + # Check for non-zero sinogram + assert np.any(sinogram > 0), "The sinogram should contain non-zero values" + + +def test_simulate_detector_gain_error(): + """Test the simulate_detector_gain_error function.""" + # Define the parameters for the test + sinogram_shape = (360, 256) + detector_gain_range = (0.9, 1.1) + detector_gain_error = 0.1 + + # Create a random sinogram for testing + sinogram = np.random.rand(*sinogram_shape) + + # Call the function to simulate gain error + modified_sinogram, detector_gain = simulate_detector_gain_error( + sinogram, detector_gain_range, detector_gain_error + ) + + # Ensure the output sinogram and detector gain have the same shape as the input + assert ( + modified_sinogram.shape == sinogram_shape + ), f"Output sinogram shape mismatch, expected: {sinogram_shape}" + assert ( + detector_gain.shape == sinogram_shape + ), f"Detector gain shape mismatch, expected: {sinogram_shape}" + + # Check that the sinogram is normalized to [0, 1] + assert modified_sinogram.min() >= 0, "Sinogram values should be >= 0" + assert modified_sinogram.max() <= 1, "Sinogram values should be <= 1" + + # Ensure that the output is of type float32 + assert ( + modified_sinogram.dtype == np.float32 + ), "Output sinogram should be of type float32" + assert detector_gain.dtype == np.float32, "Detector gain should be of type float32" + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/tests/unit/bm3dornl/test_utils.py b/tests/unit/bm3dornl/test_utils.py new file mode 100644 index 0000000..27445f8 --- /dev/null +++ b/tests/unit/bm3dornl/test_utils.py @@ -0,0 +1,232 @@ +#!/usr/bin/env python3 + +"""Unit tests for the utility module.""" + +import pytest +import numpy as np +from bm3dornl.utils import ( + find_candidate_patch_ids, + is_within_threshold, + get_signal_patch_positions, + pad_patch_ids, + horizontal_binning, + horizontal_debinning, +) + + +def test_find_candidate_patch_ids(): + # Setup the signal patches and test various reference indices and cut-off distances + signal_patches = np.array([[0, 0], [0, 1], [0, 2], [1, 0], [1, 1], [2, 2], [3, 3]]) + + # Test case 1 + ref_index = 0 + cut_off_distance = (1, 1) + expected = [ + 1, + 3, + 4, + ] # Only patches within 1 unit from (0, 0) in both dimensions and are after index 0 + result = find_candidate_patch_ids(signal_patches, ref_index, cut_off_distance) + assert result == expected, "Test case 1 failed" + + # Test case 2 + ref_index = 2 + cut_off_distance = (2, 2) + expected = [3, 4, 5] # Indices that are within 2 units from (0, 2) + result = find_candidate_patch_ids(signal_patches, ref_index, cut_off_distance) + assert result == expected, "Test case 2 failed" + + # Test case 3 + ref_index = 4 + cut_off_distance = (3, 3) + expected = [5, 6] # Indices that are within 3 units from (1, 1) + result = find_candidate_patch_ids(signal_patches, ref_index, cut_off_distance) + assert result == expected, "Test case 3 failed" + + # Test case 4 + ref_index = 0 + cut_off_distance = (0, 0) + expected = [] # No patch within 0 distance from (0, 0) except itself + result = find_candidate_patch_ids(signal_patches, ref_index, cut_off_distance) + assert result == expected, "Test case 4 failed" + + +def test_is_within_threshold(): + # Setup the patches + ref_patch = np.array([1, 2, 3], dtype=float) + cmp_patch_same = np.array([1, 2, 3], dtype=float) + cmp_patch_different = np.array([4, 5, 6], dtype=float) + cmp_patch_close = np.array([1, 2, 4], dtype=float) + + # Test case 1: Same patches, zero distance + threshold = 0 + result = is_within_threshold(ref_patch, cmp_patch_same, threshold) + assert result, "Failed: Same patches should be within zero distance" + + # Test case 2: Different patches, threshold less than actual distance + threshold = 2 + result = is_within_threshold(ref_patch, cmp_patch_different, threshold) + assert not result, "Failed: Different patches should not be within distance of 2" + + # Test case 3: Different patches, threshold greater than actual distance + threshold = 6 + result = is_within_threshold(ref_patch, cmp_patch_different, threshold) + assert result, "Failed: Different patches should be within distance of 6" + + # Test case 4: Slightly different patches, small threshold + threshold = 2 + result = is_within_threshold(ref_patch, cmp_patch_close, threshold) + assert result, "Failed: Slightly different patches should be within distance of 2" + + # Test case 5: Slightly different patches, very small threshold + threshold = 0.1 + result = is_within_threshold(ref_patch, cmp_patch_close, threshold) + assert not result, "Failed: Slightly different patches should not be within very small distance of 0.1" + + +def test_get_signal_patch_positions(): + # Create a synthetic image with a signal patch in the center + image = np.zeros((10, 10), dtype=float) + image[4:6, 4:6] = 1.0 # Making the center bright + + # Define the patch size, stride, and background threshold + patch_size = (3, 3) + stride = 1 + background_threshold = 0.5 + + # Call the function + result = get_signal_patch_positions( + image=image, + patch_size=patch_size, + stride=stride, + background_threshold=background_threshold, + ) + + # Check that the function correctly identified the signal patch + assert ( + [4, 4] in result.tolist() + ), "The signal patch at position (4, 4) was not identified correctly" + + +def test_get_signal_patch_positions_no_signal_error(): + # Create an image with all values below the threshold + image = np.zeros((10, 10), dtype=float) + patch_size = (3, 3) + stride = 1 + background_threshold = 0.5 + + # Check for ValueError when no signal patches are found + with pytest.raises(ValueError) as excinfo: + get_signal_patch_positions( + image=image, + patch_size=patch_size, + stride=stride, + background_threshold=background_threshold, + ) + assert "Couldn't find any signal patches in the image" in str( + excinfo.value + ), "Expected ValueError for no signal patches was not raised" + + +def test_pad_patch_ids_first(): + candidate_patch_ids = np.array([1, 2, 3]) + num_patches = 5 + padded = pad_patch_ids(candidate_patch_ids, num_patches, mode="first") + assert np.array_equal( + padded, np.array([1, 2, 3, 1, 1]) + ), "Padding with the first element failed" + + +def test_pad_patch_ids_repeat_sequence(): + candidate_patch_ids = np.array([1, 2, 3]) + num_patches = 7 + padded = pad_patch_ids(candidate_patch_ids, num_patches, mode="repeat_sequence") + assert np.array_equal( + padded, np.array([1, 2, 3, 1, 2, 3, 1]) + ), "Repeating sequence padding failed" + + +def test_pad_patch_ids_circular(): + candidate_patch_ids = np.array([1, 2, 3]) + num_patches = 6 + padded = pad_patch_ids(candidate_patch_ids, num_patches, mode="circular") + assert np.array_equal( + padded, np.array([1, 2, 3, 1, 2, 3]) + ), "Circular padding failed" + + +def test_pad_patch_ids_mirror(): + candidate_patch_ids = np.array([1, 2, 3]) + num_patches = 6 + padded = pad_patch_ids(candidate_patch_ids, num_patches, mode="mirror") + assert np.array_equal(padded, np.array([1, 2, 3, 3, 2, 1])), "Mirror padding failed" + + +def test_pad_patch_ids_random(): + candidate_patch_ids = np.array([1, 2, 3]) + num_patches = 5 + padded = pad_patch_ids(candidate_patch_ids, num_patches, mode="random") + # Check that all elements in padded are from candidate_patch_ids + assert all(item in candidate_patch_ids for item in padded), "Random padding failed" + + +def test_pad_patch_ids_unknown_mode(): + candidate_patch_ids = np.array([1, 2, 3]) + num_patches = 5 + with pytest.raises(ValueError) as excinfo: + pad_patch_ids(candidate_patch_ids, num_patches, mode="unknown") + assert "Unknown padding mode specified" in str( + excinfo.value + ), "Error not raised for unknown mode" + + +def test_horizontal_binning(): + # Initial setup: Create a test image + Z = np.random.rand(64, 64) + + # Number of binning iterations + k = 3 + + # Perform the binning + binned_images = horizontal_binning(Z, k) + + # Assert we have the correct number of images + assert len(binned_images) == k + 1, "Incorrect number of binned images returned" + + # Assert that each image has the correct dimensions + expected_width = 64 + for i, img in enumerate(binned_images): + assert img.shape[0] == 64, f"Height of image {i} is incorrect" + assert img.shape[1] == expected_width, f"Width of image {i} is incorrect" + expected_width = (expected_width + 1) // 2 # Calculate the next expected width + + +def test_horizontal_binning_k_zero(): + Z = np.random.rand(64, 64) + binned_images = horizontal_binning(Z, 0) + assert len(binned_images) == 1 and np.array_equal( + binned_images[0], Z + ), "Binning with k=0 should return only the original image" + + +def test_horizontal_binning_large_k(): + Z = np.random.rand(64, 64) + binned_images = horizontal_binning(Z, 6) + assert len(binned_images) == 7, "Incorrect number of images for large k" + assert binned_images[-1].shape[1] == 1, "Final image width should be 1 for large k" + + +@pytest.mark.parametrize( + "original_width, target_width", [(32, 64), (64, 128), (128, 256)] +) +def test_horizontal_debinning_scaling(original_width, target_width): + original_image = np.random.rand(64, original_width) + target_shape = (64, target_width) + debinned_image = horizontal_debinning(original_image, np.empty(target_shape)) + assert ( + debinned_image.shape == target_shape + ), f"Failed to scale from {original_width} to {target_width}" + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/tests/unit/readme.md b/tests/unit/readme.md new file mode 100644 index 0000000..b53e7a2 --- /dev/null +++ b/tests/unit/readme.md @@ -0,0 +1,3 @@ +# Readme + +This folder is for unit test.