diff --git a/simpa/utils/__init__.py b/simpa/utils/__init__.py index 2393f806..0ecfa5d5 100644 --- a/simpa/utils/__init__.py +++ b/simpa/utils/__init__.py @@ -16,6 +16,8 @@ from .libraries.heterogeneity_generator import RandomHeterogeneity from .libraries.heterogeneity_generator import BlobHeterogeneity from .libraries.heterogeneity_generator import ImageHeterogeneity +from .libraries.heterogeneity_generator import OxygenationDiffusionMap3D +from .libraries.heterogeneity_generator import OxygenationDiffusionMap2D # Then load classes and methods with an increasing amount of internal dependencies. # If there are import errors in the tests, it is probably due to an incorrect diff --git a/simpa/utils/libraries/heterogeneity_generator.py b/simpa/utils/libraries/heterogeneity_generator.py index 8d01b3f1..13e1fb57 100644 --- a/simpa/utils/libraries/heterogeneity_generator.py +++ b/simpa/utils/libraries/heterogeneity_generator.py @@ -2,6 +2,7 @@ # SPDX-FileCopyrightText: 2021 Janek Groehl # SPDX-License-Identifier: MIT +from abc import abstractmethod, ABC import numpy as np from sklearn.datasets import make_blobs from scipy.ndimage.filters import gaussian_filter @@ -9,6 +10,7 @@ from simpa.utils import Tags from typing import Union, Optional from simpa.log import Logger +from simpa.utils.calculate import calculate_oxygenation class HeterogeneityGeneratorBase(object): @@ -280,6 +282,9 @@ def crop_image(self, xdim: int, zdim: int, image: np.ndarray, crop_horizontal = round((image_width_pixels - crop_width) / 2) elif isinstance(crop_placement[0], int): crop_horizontal = crop_placement[0] + else: + raise ValueError(f'Unrecognised crop placement {crop_placement[0]}, please see the available options in' + f'the documentation') if crop_placement[1] == Tags.CROP_POSITION_TOP: crop_vertical = 0 @@ -289,6 +294,9 @@ def crop_image(self, xdim: int, zdim: int, image: np.ndarray, crop_vertical = round((image_height_pixels - crop_height) / 2) elif isinstance(crop_placement[1], int): crop_vertical = crop_placement[1] + else: + raise ValueError(f'Unrecognised crop placement {crop_placement[1]}, please see the available options in' + f'the documentation') elif isinstance(crop_placement, str): if crop_placement == Tags.CROP_POSITION_CENTRE: @@ -301,6 +309,12 @@ def crop_image(self, xdim: int, zdim: int, image: np.ndarray, crop_vertical = image_height_pixels - crop_height if crop_vertical != 0: crop_vertical = np.random.randint(0, crop_vertical) + else: + raise ValueError(f'Unrecognised crop placement {crop_placement}, please see the available options in' + f'the documentation') + else: + raise ValueError(f'Unrecognised crop placement {crop_placement}, please see the available options in' + f'the documentation') cropped_image = image[crop_horizontal: crop_horizontal + crop_width, crop_vertical: crop_vertical + crop_height] @@ -325,3 +339,177 @@ def change_resolution(self, image: np.ndarray, spacing_mm: Union[int, float], self.logger.warning( "The input image has changed pixel spacing to {} to match the simulation volume".format(spacing_mm)) return transform.resize(image, (new_image_pixel_width, new_image_pixel_height)) + + +class OxygenationDiffusionMap(object): + """ + Here the aim is to create diffusion maps for oxygenation based on the positions and values of pre-set of the + sources/sinks. This predefined oxygenation map must be 2D. The user should understand that this technique requires + knowing when the diffusion equations have approached a stable point. This works using a finite difference method on + the diffusion equations. + + Some suggestions for how to optimise the time of convergence: + - We recommend that the user try a number of time steps to begin with to gauge reasonable values for your + simulation. + - We recommend that the user amends the diffusion constant. At its essence, this value determines the size of + the steps in this numerical method. Large diffusion constant will lead to faster convergence, but will reduce + the accuracy + - We recommend caution around the value decide for the background_oxygenation. This value should represent what + you expect the long term oxygenation to look like, as this will reduce the amount each value needs to change in + order to reach ots converged value. In addition, this value will be used as the boundary. It represents + condition which must be fulfilled throughout the simulation and will therefore have a large baring on the final + results + + Attributes: + map: The final oxygenation map + """ + + def __init__(self, segmentation_array: np.ndarray, segmentation_mapping: dict, + segments_to_include: list, diffusion_const: Union[int, float], dt: Union[int, float], + ds2: Union[int, float], n_steps: int, background_oxygenation: Union[int, float]): + """ + Upon initialisation of this class, the diffusion map will be created. + :param segmentation_array: An array segmenting the medium + :param segmentation_mapping: A mapping from the segmentations to the oxygenations of the blood vessels + :param segments_to_include: Which segments to include in the initial oxygenation map + :param diffusion_const: The diffusion constant of the oxygenation + :param n_steps: The number of steps the simulation should use + :param background_oxygenation: The oxygenation that will be used as the starting value everywhere where the + oxygenation is not defined, and as the edge value + """ + initial_conditions = np.zeros(segmentation_array.shape) + for segment in segments_to_include: + initial_conditions[np.where(segmentation_array == segment) + ] = calculate_oxygenation(segmentation_mapping[segment]) + + oxygenation = initial_conditions.copy() + oxygenation[np.where(initial_conditions == 0)] = background_oxygenation + + for m in range(n_steps): + oxygenation = self.diffusion_timestep(oxygenation, initial_conditions, diffusion_const, dt, ds2) + + self.map = oxygenation + + @abstractmethod + def diffusion_timestep(self, u, boundary_conditions, diffusion_const, dt, ds2): + """ + Here, a diffusion equation can be applied to determine how the oxygenation should evolve. + :param u: The initial map to perform a time step of the diffusion model + :param boundary_conditions: The conditions that you wish to remain constant throughout the simulation + :param diffusion_const: The diffusion constant of the oxygenation + :param dt: The time step + :param ds2: The second order spatial step + :return: The map after a time step of the diffusion model + """ + pass + + def get_map(self): + return self.map + + +class OxygenationDiffusionMap2D(OxygenationDiffusionMap): + """ + This uses the OxygenationDiffusionMap class and applies it to a volume with a constant cross-sectoin. The predefined + oxygenation map must be 2D. + + Attributes: + map: The final oxygenation map + """ + + def __init__(self, spacing_mm, segmentation_area: np.ndarray, segmentation_mapping, segments_to_include, + diffusion_const, ydim: int, n_steps=10000, background_oxygenation=0.8): + """ + Upon initialisation of this class, the diffusion map will be created. + :param spacing_mm: the spacing of the volume in mm + :param segmentation_area: An array segmenting the cross-sectional area of the medium + :param segmentation_mapping: A mapping from the segmentations to the oxygenations of the blood vessels + :param segments_to_include: Which segments to include in the initial oxygenation map + :param diffusion_const: The diffusion constant of the oxygenation + :param ydim: the dimension to scale the image to make it into a volume + :param n_steps: The number of steps the simulation should use + :param background_oxygenation: The oxygenation that will be used as the starting value everywhere where the + oxygenation is not defined, and as the edge value + """ + ds2 = spacing_mm**2 + dt = ds2 ** 2 / (4 * diffusion_const * ds2) + super().__init__(segmentation_area, segmentation_mapping, segments_to_include, diffusion_const, dt, ds2, + n_steps, background_oxygenation) + + self.map = np.repeat(self.map[:, np.newaxis, :], ydim, axis=1) + + def diffusion_timestep(self, u, boundary_conditions, diffusion_const, dt, ds2): + """ + :param u: The initial map to perform a time step of the diffusion model + :param boundary_conditions: The conditions that you wish to remain constant throughout the simulation + :param diffusion_const: The diffusion constant of the oxygenation + :param dt: The time step + :param ds2: The second order spatial step + :return: The map after a time step of the diffusion model + """ + # Propagate with forward-difference in time, central-difference in space + u[1:-1, 1:-1] = u[1:-1, 1:-1] + diffusion_const * dt * ( + (u[2:, 1:-1] - 2 * u[1:-1, 1:-1] + u[:-2, 1:-1]) / ds2 + + (u[1:-1, 2:] - 2 * u[1:-1, 1:-1] + u[1:-1, :-2]) / ds2) + u[np.where(boundary_conditions != 0)] = boundary_conditions[np.where(boundary_conditions != 0)] + return u + + +class OxygenationDiffusionMap3D(OxygenationDiffusionMap): + """ + Here the aim is to create diffusion maps for oxygenation based on the positions and values of pre-set of the + sources/sinks. This predefined oxygenation map must be 3D. The user should understand that this technique requires + knowing when the diffusion equations have approached a stable point. This works using a finite difference method on + the diffusion equations. + + Some suggestions for how to optimise the time of convergence: + - We recommend that the user try a number of time steps to begin with to gauge reasonable values for your + simulation. + - We recommend that the user amends the diffusion constant. At its essence, this value determines the size of + the steps in this numerical method. Large diffusion constant will lead to faster convergence, but will reduce + the accuracy + - We recommend caution around the value decide for the background_oxygenation. This value should represent what + you expect the long term oxygenation to look like, as this will reduce the amount each value needs to change in + order to reach ots converged value. In addition, this value will be used as the boundary. It represents + condition which must be fulfilled throughout the simulation and will therefore have a large baring on the final + results + + Attributes: + map: The final oxygenation map + """ + + def __init__(self, spacing_mm: Union[int, float], segmentation_volume: np.ndarray, segmentation_mapping: dict, + segments_to_include: list, diffusion_const: Union[int, float], + n_steps: int = 10000, background_oxygenation: Union[int, float] = 0.8): + """ + Upon initialisation of this class, the diffusion map will be created. + :param spacing_mm: the spacing of the volume in mm + :param segmentation_volume: An array segmenting the volume of the medium + :param segmentation_mapping: A mapping from the segmentations to the oxygenations of the blood vessels + :param segments_to_include: Which segments to include in the initial oxygenation map + :param diffusion_const: The diffusion constant of the oxygenation + :param n_steps: The number of steps the simulation should use + :param background_oxygenation: The oxygenation that will be used: as the starting value everywhere where the + oxygenation is not defined, and as the edge value + """ + ds2 = spacing_mm**2 + dt = ds2 ** 2 / (6 * diffusion_const * ds2) + + super().__init__(segmentation_volume, segmentation_mapping, segments_to_include, diffusion_const, dt, + ds2, n_steps, background_oxygenation) + + def diffusion_timestep(self, u, boundary_conditions, diffusion_const, dt, ds2): + """ + :param u: The initial map to perform a time step of the diffusion model + :param boundary_conditions: The conditions that you wish to remain constant throughout the simulation + :param diffusion_const: The diffusion constant of the oxygenation + :param dt: The time step + :param ds2: The second order spatial step + :return: The map after a time step of the diffusion model + """ + # Propagate with forward-difference in time, central-difference in space + u[1:-1, 1:-1, 1:-1] = u[1:-1, 1:-1, 1:-1] + diffusion_const * dt * ( + (u[2:, 1:-1, 1:-1] - 2 * u[1:-1, 1:-1, 1:-1] + u[:-2, 1:-1, 1:-1]) / ds2 + + (u[1:-1, 2:, 1:-1] - 2 * u[1:-1, 1:-1, 1:-1] + u[1:-1, :-2, 1:-1]) / ds2 + + (u[1:-1, 1:-1, 2:] - 2 * u[1:-1, 1:-1, 1:-1] + u[1:-1, 1:-1, :-2]) / ds2) + u[np.where(boundary_conditions != 0)] = boundary_conditions[np.where(boundary_conditions != 0)] + return u