Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit 33b660e
Author: Leonid Kostrykin <[email protected]>
Date:   Tue Mar 5 20:58:24 2024 +0000

    Fix XSD

commit 4ee6112
Author: Leonid Kostrykin <[email protected]>
Date:   Tue Mar 5 20:53:14 2024 +0000

    Add `labels` attribute for `has_image_labels` and `has_image_mean_object_size`

commit 4ff401c
Author: Leonid Kostrykin <[email protected]>
Date:   Tue Mar 5 20:32:57 2024 +0000

    Replace `image_has_labels` by `has_image_labels` and `has_image_mean_object_size`

commit 09149d1
Author: Leonid Kostrykin <[email protected]>
Date:   Tue Mar 5 19:51:56 2024 +0000

    Fix bug

commit 28fedb9
Author: Leonid Kostrykin <[email protected]>
Date:   Tue Mar 5 19:44:12 2024 +0000

    Replace `image_has_intensities` by `has_image_mean_intensity` and `has_image_center_of_mass`

commit fe96916
Author: Leonid Kostrykin <[email protected]>
Date:   Tue Mar 5 19:10:38 2024 +0000

    Replace `image_has_metadata` by `has_image_width`, etc.
  • Loading branch information
kostrykin committed Mar 5, 2024
1 parent 7aebd26 commit 00485cc
Show file tree
Hide file tree
Showing 3 changed files with 423 additions and 135 deletions.
255 changes: 184 additions & 71 deletions lib/galaxy/tool_util/verify/asserts/image.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import io
from typing import (
Any,
List,
Optional,
Tuple,
TYPE_CHECKING,
Union,
)

from ._util import _assert_number

try:
import numpy
except ImportError:
Expand Down Expand Up @@ -44,28 +47,79 @@ def _assert_float(
assert actual <= float(range_max), f"Wrong {label}: {actual} (must be {range_max} or smaller)"


def assert_image_has_metadata(
def assert_has_image_width(
output_bytes: bytes,
width: Optional[Union[int, str]] = None,
height: Optional[Union[int, str]] = None,
channels: Optional[Union[int, str]] = None,
value: Optional[Union[int, str]] = None,
delta: Union[int, str] = 0,
min: Optional[Union[int, str]] = None,
max: Optional[Union[int, str]] = None,
negate: Union[bool, str] = False,
) -> None:
"""
Assert the image output has specific metadata.
Asserts the specified output is an image and has a width of the specified value.
"""
buf = io.BytesIO(output_bytes)
with Image.open(buf) as im:
_assert_number(
im.size[0],
value,
delta,
min,
max,
negate,
"{expected} width {n}+-{delta}",
"{expected} width to be in [{min}:{max}]",
)


assert width is None or im.size[0] == int(width), f"Image has wrong width: {im.size[0]} (expected {int(width)})"
def assert_has_image_height(
output_bytes: bytes,
value: Optional[Union[int, str]] = None,
delta: Union[int, str] = 0,
min: Optional[Union[int, str]] = None,
max: Optional[Union[int, str]] = None,
negate: Union[bool, str] = False,
) -> None:
"""
Asserts the specified output is an image and has a height of the specified value.
"""
buf = io.BytesIO(output_bytes)
with Image.open(buf) as im:
_assert_number(
im.size[1],
value,
delta,
min,
max,
negate,
"{expected} height {n}+-{delta}",
"{expected} height to be in [{min}:{max}]",
)

assert height is None or im.size[1] == int(
height
), f"Image has wrong height: {im.size[1]} (expected {int(height)})"

actual_channels = len(im.getbands())
assert channels is None or actual_channels == int(
channels
), f"Image has wrong number of channels: {actual_channels} (expected {int(channels)})"
def assert_has_image_channels(
output_bytes: bytes,
value: Optional[Union[int, str]] = None,
delta: Union[int, str] = 0,
min: Optional[Union[int, str]] = None,
max: Optional[Union[int, str]] = None,
negate: Union[bool, str] = False,
) -> None:
"""
Asserts the specified output is an image and has the specified number of channels.
"""
buf = io.BytesIO(output_bytes)
with Image.open(buf) as im:
_assert_number(
len(im.getbands()),
value,
delta,
min,
max,
negate,
"{expected} image channels {n}+-{delta}",
"{expected} image channels to be in [{min}:{max}]",
)


def _compute_center_of_mass(im_arr: "numpy.typing.NDArray") -> Tuple[float, float]:
Expand All @@ -79,17 +133,12 @@ def _compute_center_of_mass(im_arr: "numpy.typing.NDArray") -> Tuple[float, floa
return (im_arr * xx).sum(), (im_arr * yy).sum()


def assert_image_has_intensities(
def _get_image(
output_bytes: bytes,
channel: Optional[Union[int, str]] = None,
mean_intensity: Optional[Union[float, str]] = None,
mean_intensity_min: Optional[Union[float, str]] = None,
mean_intensity_max: Optional[Union[float, str]] = None,
center_of_mass: Optional[Union[Tuple[float, float], str]] = None,
eps: Union[float, str] = 0.01,
) -> None:
) -> "numpy.typing.NDArray":
"""
Assert the image output has specific intensity content.
Returns the output image or a specific channel.
"""
buf = io.BytesIO(output_bytes)
with Image.open(buf) as im:
Expand All @@ -99,79 +148,143 @@ def assert_image_has_intensities(
if channel is not None:
im_arr = im_arr[:, :, int(channel)]

# Perform `mean_intensity` assertions.
# Return the image
return im_arr


def assert_has_image_mean_intensity(
output_bytes: bytes,
channel: Optional[Union[int, str]] = None,
value: Optional[Union[float, str]] = None,
delta: Union[float, str] = 0.01,
min: Optional[Union[float, str]] = None,
max: Optional[Union[float, str]] = None,
) -> None:
"""
Asserts the specified output is an image and has the specified mean intensity value.
"""
im_arr = _get_image(output_bytes, channel)
_assert_float(
actual=im_arr.mean(),
label="mean intensity",
tolerance=eps,
expected=mean_intensity,
range_min=mean_intensity_min,
range_max=mean_intensity_max,
tolerance=delta,
expected=value,
range_min=min,
range_max=max,
)

# Perform `center_of_mass` assertion.
if center_of_mass is not None:
if isinstance(center_of_mass, str):
center_of_mass_parts = [c.strip() for c in center_of_mass.split(",")]
assert len(center_of_mass_parts) == 2
center_of_mass = (float(center_of_mass_parts[0]), float(center_of_mass_parts[1]))
assert len(center_of_mass) == 2, "center_of_mass must have two components"

def assert_has_image_center_of_mass(
output_bytes: bytes,
channel: Optional[Union[int, str]] = None,
point: Optional[Union[Tuple[float, float], str]] = None,
delta: Union[float, str] = 0.01,
) -> None:
"""
Asserts the specified output is an image and has the specified center of mass.
"""
im_arr = _get_image(output_bytes, channel)
if point is not None:
if isinstance(point, str):
point_parts = [c.strip() for c in point.split(",")]
assert len(point_parts) == 2
point = (float(point_parts[0]), float(point_parts[1]))
assert len(point) == 2, "point must have two components"
actual_center_of_mass = _compute_center_of_mass(im_arr)
distance = numpy.linalg.norm(numpy.subtract(center_of_mass, actual_center_of_mass))
distance = numpy.linalg.norm(numpy.subtract(point, actual_center_of_mass))
assert distance <= float(
eps
), f"Wrong center of mass: {actual_center_of_mass} (expected {center_of_mass}, distance: {distance}, eps: {eps})"
delta
), f"Wrong center of mass: {actual_center_of_mass} (expected {point}, distance: {distance}, delta: {delta})"


def assert_image_has_labels(
def _get_image_labels(
output_bytes: bytes,
number_of_objects: Optional[Union[int, str]] = None,
mean_object_size: Optional[Union[float, str]] = None,
mean_object_size_min: Optional[Union[float, str]] = None,
mean_object_size_max: Optional[Union[float, str]] = None,
channel: Optional[Union[int, str]] = None,
labels: Optional[Union[str, List[int]]] = None,
exclude_labels: Optional[Union[str, List[int]]] = None,
eps: Union[float, str] = 0.01,
) -> None:
) -> Tuple["numpy.typing.NDArray", List[Any]]:
"""
Assert the image output has specific label content.
Determines the unique labels in the output image or a specific channel.
"""
buf = io.BytesIO(output_bytes)
with Image.open(buf) as im:
im_arr = numpy.array(im)
assert labels is None or exclude_labels is None
im_arr = _get_image(output_bytes, channel)

def cast_label(label):
label = label.strip()
if numpy.issubdtype(im_arr.dtype, numpy.integer):
return int(label)
if numpy.issubdtype(im_arr.dtype, float):
return float(label)
raise AssertionError(f'Unsupported image label type: "{im_arr.dtype}"')

# Determine labels present in the image.
labels = numpy.unique(im_arr)
present_labels = numpy.unique(im_arr)

# Apply filtering due to `labels` (keep only those).
if labels is None:
labels = list()
if isinstance(labels, str):
labels = [cast_label(label) for label in labels.split(",") if len(label) > 0]
if len(labels) > 0:
present_labels = [label for label in present_labels if label in labels]

# Apply filtering due to `exclude_labels`.
if exclude_labels is None:
exclude_labels = list()
if isinstance(exclude_labels, str):
exclude_labels = [cast_label(label) for label in exclude_labels.split(",") if len(label) > 0]
present_labels = [label for label in present_labels if label not in exclude_labels]

def cast_label(label):
if numpy.issubdtype(im_arr.dtype, numpy.integer):
return int(label)
if numpy.issubdtype(im_arr.dtype, float):
return float(label)
raise AssertionError(f'Unsupported image label type: "{im_arr.dtype}"')
# Return the image data and the labels.
return im_arr, present_labels

exclude_labels = [cast_label(label) for label in exclude_labels.split(",") if len(label) > 0]
labels = [label for label in labels if label not in exclude_labels]

# Perform `number_of_objects` assertion.
if number_of_objects is not None:
actual_number_of_objects = len(labels)
expected_number_of_objects = int(number_of_objects)
assert (
actual_number_of_objects == expected_number_of_objects
), f"Wrong number of objects: {actual_number_of_objects} (expected {expected_number_of_objects})"

# Perform `mean_object_size` assertion.
actual_mean_object_size = sum((im_arr == label).sum() for label in labels) / len(labels)

def assert_has_image_labels(
output_bytes: bytes,
channel: Optional[Union[int, str]] = None,
exclude_labels: Optional[Union[str, List[int]]] = None,
value: Optional[Union[int, str]] = None,
delta: Union[int, str] = 0,
min: Optional[Union[int, str]] = None,
max: Optional[Union[int, str]] = None,
negate: Union[bool, str] = False,
) -> None:
"""
Asserts the specified output is an image and has the specified number of unique values (e.g., uniquely labeled objects).
"""
present_labels = _get_image_labels(output_bytes, channel, exclude_labels)[1]
_assert_number(
len(present_labels),
value,
delta,
min,
max,
negate,
"{expected} labels {n}+-{delta}",
"{expected} labels to be in [{min}:{max}]",
)


def assert_has_image_mean_object_size(
output_bytes: bytes,
channel: Optional[Union[int, str]] = None,
labels: Optional[Union[str, List[int]]] = None,
exclude_labels: Optional[Union[str, List[int]]] = None,
value: Optional[Union[float, str]] = None,
delta: Union[float, str] = 0.01,
min: Optional[Union[float, str]] = None,
max: Optional[Union[float, str]] = None,
) -> None:
"""
Asserts the specified output is an image with labeled objects which have the specified mean size (number of pixels).
"""
im_arr, present_labels = _get_image_labels(output_bytes, channel, labels, exclude_labels)
actual_mean_object_size = sum((im_arr == label).sum() for label in present_labels) / len(present_labels)
_assert_float(
actual=actual_mean_object_size,
label="mean object size",
tolerance=eps,
expected=mean_object_size,
range_min=mean_object_size_min,
range_max=mean_object_size_max,
tolerance=delta,
expected=value,
range_min=min,
range_max=max,
)
Loading

0 comments on commit 00485cc

Please sign in to comment.