diff --git a/docs/source/index.rst b/docs/source/index.rst index 2fbc49c79c..b3e8287a7c 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -15,6 +15,35 @@ within the context of an Open Source community. Ready to use with state-of-the art Deep Learning models: +DexiNed edge detection model. + +.. code-block:: python + + image = kornia.utils.sample.get_sample_images()[0][None] + model = DexiNedBuilder.build() + model.save(image) + +RTDETRDetector for object detection. + +.. code-block:: python + + image = kornia.utils.sample.get_sample_images()[0][None] + model = RTDETRDetectorBuilder.build() + model.save(image) + +BoxMotTracker for object tracking. + +.. code-block:: python + + import kornia + image = kornia.utils.sample.get_sample_images()[0][None] + model = BoxMotTracker() + for i in range(4): + model.update(image) + model.save(image) + +Vision Transformer for image classification. + .. code:: python >>> import torch.nn as nn @@ -66,9 +95,11 @@ Join the community io image losses + models metrics morphology nerf + onnx tracking testing utils diff --git a/docs/source/models.rst b/docs/source/models.rst new file mode 100644 index 0000000000..223453037f --- /dev/null +++ b/docs/source/models.rst @@ -0,0 +1,135 @@ +Models Overview +=============== + +This section covers several of Kornia's built-in models for key computer vision tasks. Each model is documented with its respective API and example usage. + +.. _RTDETRDetectorBuilder: + +RTDETRDetectorBuilder +--------------------- + +The `RTDETRDetectorBuilder` class is a builder for constructing a detection model based on the RT-DETR architecture, which is designed for real-time object detection. It is capable of detecting multiple objects within an image and provides efficient inference suitable for real-world applications. + +**Key Methods:** + +- `build`: Constructs and returns an instance of the RTDETR detection model. +- `save`: Saves the processed image or results after applying the detection model. + +.. autoclass:: kornia.models.detection.rtdetr.RTDETRDetectorBuilder + :members: + :undoc-members: + :show-inheritance: + + .. rubric:: Example + + The following code demonstrates how to use `RTDETRDetectorBuilder` to detect objects in an image: + + .. code-block:: python + + import kornia + image = kornia.utils.sample.get_sample_images()[0][None] + model = kornia.models.detection.rtdetr.RTDETRDetectorBuilder.build() + model.save(image) + +.. _DexiNedBuilder: + +DexiNedBuilder +-------------- + +The `DexiNedBuilder` class implements a state-of-the-art edge detection model based on DexiNed, which excels at detecting fine-grained edges in images. This model is well-suited for tasks like medical imaging, object contour detection, and more. + +**Key Methods:** + +- `build`: Builds and returns an instance of the DexiNed edge detection model. +- `save`: Saves the detected edges for further processing or visualization. + +.. autoclass:: kornia.models.edge_detection.dexined.DexiNedBuilder + :members: + :undoc-members: + :show-inheritance: + + .. rubric:: Example + + The following code shows how to use the `DexiNedBuilder` to detect edges in an image: + + .. code-block:: python + + import kornia + image = kornia.utils.sample.get_sample_images()[0][None] + model = kornia.models.edge_detection.dexined.DexiNedBuilder.build() + model.save(image) + +.. _SegmentationModels: + +SegmentationModels +------------------ + +The `SegmentationModels` class offers a flexible API for implementing and running various segmentation models. It supports a variety of architectures such as UNet, FPN, and others, making it highly adaptable for tasks like semantic segmentation, instance segmentation, and more. + +**Key Methods:** + +- `__init__`: Initializes a segmentation model based on the chosen architecture (e.g., UNet, DeepLabV3, etc.). +- `forward`: Runs inference on an input tensor and returns segmented output. + +**Parameters:** + +- `model_name`: (str) Name of the segmentation architecture to use, e.g., `"Unet"`, `"DeepLabV3"`. +- `classes`: (int) The number of output classes for segmentation. + +.. autoclass:: kornia.models.segmentation.segmentation_models.SegmentationModels + :members: + :undoc-members: + :show-inheritance: + + .. rubric:: Example + + Here's an example of how to use `SegmentationModels` for binary segmentation: + + .. code-block:: python + + import kornia + input_tensor = kornia.utils.sample.get_sample_images()[0][None] + model = kornia.models.segmentation.segmentation_models.SegmentationModels() + segmented_output = model(input_tensor) + print(segmented_output.shape) + +.. _BoxMotTracker: + +BoxMotTracker +------------- + +The `BoxMotTracker` class is used for multi-object tracking in video streams. It is designed to track bounding boxes of objects across multiple frames, supporting various tracking algorithms for object detection and tracking continuity. + +**Key Methods:** + +- `__init__`: Initializes the multi-object tracker. +- `update`: Updates the tracker with a new image frame. +- `save`: Saves the tracked object data or visualization for post-processing. + +**Parameters:** + +- `max_lost`: (int) The maximum number of frames where an object can be lost before it is removed from the tracker. + +.. autoclass:: kornia.models.tracking.boxmot_tracker.BoxMotTracker + :members: + :undoc-members: + :show-inheritance: + + .. rubric:: Example + + The following example demonstrates how to track objects across multiple frames using `BoxMotTracker`: + + .. code-block:: python + + import kornia + image = kornia.utils.sample.get_sample_images()[0][None] + model = kornia.models.tracking.boxmot_tracker.BoxMotTracker() + for i in range(4): + model.update(image) # Update the tracker with new frames + model.save(image) # Save the tracking result + +--- + +.. note:: + + This documentation provides detailed information about each model class, its methods, and usage examples. For further details on individual methods and arguments, refer to the respective code documentation. diff --git a/docs/source/onnx.rst b/docs/source/onnx.rst new file mode 100644 index 0000000000..5ae0ba8787 --- /dev/null +++ b/docs/source/onnx.rst @@ -0,0 +1,137 @@ +ONNXSequential: Chain Multiple ONNX Models with Ease +==================================================== + +The `ONNXSequential` class is a powerful new feature that allows users to effortlessly combine and chain multiple ONNX models together. This is especially useful when you have several pre-trained models or custom ONNX operators that you want to execute sequentially as part of a larger pipeline. + +Whether you're working with models for inference, experimentation, or optimization, `ONNXSequential` makes it easier to manage, combine, and run ONNX models in a streamlined manner. It also supports flexibility in execution environments with ONNXRuntime’s execution providers (CPU, CUDA, etc.). + +Key Features +------------ + +- **Seamless Model Chaining**: Combine multiple ONNX models into a single computational graph. +- **Flexible Input/Output Mapping**: Control how the outputs of one model are passed as inputs to the next. +- **Optimized Execution**: Automatically create optimized `ONNXRuntime` sessions to speed up inference. +- **Export to ONNX**: Save the combined model into a single ONNX file for easy deployment and sharing. +- **Execution Providers Support**: Utilize ONNXRuntime's execution providers (e.g., `CUDAExecutionProvider`, `CPUExecutionProvider`) for accelerated inference on different hardware. +- **PyTorch-like Interface**: Use the `ONNXSequential` class like a PyTorch `nn.Sequential` model, including calling it directly for inference. + +Quickstart Guide +---------------- + +Here's how you can quickly get started with `ONNXSequential`: + +1. **Install ONNX and ONNXRuntime** + + If you haven't already installed `onnx` and `onnxruntime`, you can install them using `pip`: + + .. code-block:: bash + + pip install onnx onnxruntime + +2. **Combining ONNX Models** + + You can initialize the `ONNXSequential` with a list of ONNX models or file paths. Models will be automatically chained together and optimized for inference. + + .. code-block:: python + + import numpy as np + from kornia.onnx import ONNXSequential + + # Initialize ONNXSequential with two models, loading from our only repo + onnx_seq = ONNXSequential( + "hf://operators/kornia.color.gray.RgbToGrayscale", + "hf://operators/kornia.geometry.transform.affwarp.Resize_512x512" + ) + + # Prepare some input data + input_data = np.random.randn(1, 3, 256, 512).astype(np.float32) + + # Perform inference + outputs = onnx_seq(input_data) + + # Print the model outputs + print(outputs) + + .. note:: + By default, we assume each ONNX model contains only one input node named "input" and one output node named "output". For complex models, you may need to pass an `io_maps` argument. + +3. **Input/Output Mapping Between Models** + + When combining models, you can specify how the outputs of one model are mapped to the inputs of the next. This allows you to chain models in custom ways. + + .. code-block:: python + + io_map = [("model1_output_0", "model2_input_0"), ("model1_output_1", "model2_input_1")] + onnx_seq = ONNXSequential("model1.onnx", "model2.onnx", io_map=io_map) + +4. **Exporting the Combined Model** + + You can easily export the combined model to an ONNX file: + + .. code-block:: python + + # Export the combined model to a file + onnx_seq.export("combined_model.onnx") + +5. **Optimizing with Execution Providers** + + Leverage ONNXRuntime's execution providers for optimized inference. For example, to run the model on a GPU: + + .. code-block:: python + + # Initialize with CUDA execution provider + onnx_seq = ONNXSequential( + "hf://operators/kornia.geometry.transform.flips.Hflip", + # Or you may use a local model with either a filepath "YOUR_OWN_MODEL.onnx" or a loaded ONNX model. + "hf://models/kornia.models.detection.rtdetr_r18vd_640x640", + providers=['CUDAExecutionProvider'] + ) + + # Run inference + outputs = onnx_seq(input_data) + + +Frequently Asked Questions (FAQ) +-------------------------------- + +**1. Can I chain models from different sources?** + +Yes! You can chain models from different ONNX files or directly from `onnx.ModelProto` objects. `ONNXSequential` handles the integration and merging of their graphs. + +**2. What happens if the input/output sizes of models don't match?** + +You can use the `io_map` parameter to control how outputs of one model are mapped to the inputs of the next. This allows for greater flexibility when chaining models with different architectures. + +**3. Can I use custom ONNXRuntime session options?** + +Absolutely! You can pass your own session options to the `create_session` method to fine-tune performance, memory usage, or logging. + +Why Choose ONNXSequential? +-------------------------- + +With the increasing adoption of ONNX for model interoperability and deployment, `ONNXSequential` provides a simple yet powerful interface for combining models and operators. By leveraging ONNXRuntime’s optimization and execution provider capabilities, it gives you the flexibility to: +- Deploy on different hardware (CPU, GPU). +- Run complex pipelines in production environments. +- Combine and experiment with models effortlessly. + +Whether you're building an advanced deep learning pipeline or simply trying to chain pre-trained models, `ONNXSequential` makes it easy to manage, optimize, and execute ONNX models at scale. + +Get started today and streamline your ONNX workflows! + + +API Documentation +----------------- +.. autoclass:: kornia.onnx.sequential.ONNXSequential + :members: + +.. autoclass:: kornia.onnx.utils.ONNXLoader + + .. code-block:: python + + onnx_loader = ONNXLoader() + # Load a HuggingFace operator + onnx_loader.load_model("hf://operators/kornia.color.gray.GrayscaleToRgb") # doctest: +SKIP + # Load a local converted/downloaded operator + onnx_loader.load_model("operators/kornia.color.gray.GrayscaleToRgb") # doctest: +SKIP + + :members: diff --git a/kornia/__init__.py b/kornia/__init__.py index 84f22f2051..b1b9782747 100644 --- a/kornia/__init__.py +++ b/kornia/__init__.py @@ -10,12 +10,15 @@ color, contrib, core, + config, enhance, feature, io, losses, metrics, + models, morphology, + onnx, tracking, utils, x, diff --git a/kornia/augmentation/base.py b/kornia/augmentation/base.py index e912768373..a4cfa33e2b 100644 --- a/kornia/augmentation/base.py +++ b/kornia/augmentation/base.py @@ -53,6 +53,10 @@ class _BasicAugmentationBase(Module): the batch form ``False``. """ + # TODO: Hard to support. Many codes are not ONNX-friendly that contains lots of if-else blocks, etc. + # Please contribute if anyone interested. + ONNX_EXPORTABLE = False + def __init__( self, p: float = 0.5, diff --git a/kornia/color/gray.py b/kornia/color/gray.py index 96223a7a69..f647cfd337 100644 --- a/kornia/color/gray.py +++ b/kornia/color/gray.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Optional +from typing import ClassVar, Optional import torch @@ -125,6 +125,9 @@ class GrayscaleToRgb(Module): >>> output = rgb(input) # 2x3x4x5 """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[list[int]] = [-1, 1, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[list[int]] = [-1, 3, -1, -1] + def forward(self, image: Tensor) -> Tensor: return grayscale_to_rgb(image) @@ -147,6 +150,9 @@ class RgbToGrayscale(Module): >>> output = gray(input) # 2x1x4x5 """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[list[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[list[int]] = [-1, 1, -1, -1] + def __init__(self, rgb_weights: Optional[Tensor] = None) -> None: super().__init__() if rgb_weights is None: @@ -175,5 +181,8 @@ class BgrToGrayscale(Module): >>> output = gray(input) # 2x1x4x5 """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[list[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[list[int]] = [-1, 1, -1, -1] + def forward(self, image: Tensor) -> Tensor: return bgr_to_grayscale(image) diff --git a/kornia/color/hls.py b/kornia/color/hls.py index 39c457a7c2..0506762c8c 100644 --- a/kornia/color/hls.py +++ b/kornia/color/hls.py @@ -1,5 +1,5 @@ import math -from typing import Tuple +from typing import ClassVar, List, Tuple import torch @@ -142,6 +142,9 @@ class RgbToHls(Module): >>> output = hls(input) # 2x3x4x5 """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def forward(self, image: Tensor) -> Tensor: return rgb_to_hls(image) @@ -167,5 +170,8 @@ class HlsToRgb(Module): >>> output = rgb(input) # 2x3x4x5 """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def forward(self, image: Tensor) -> Tensor: return hls_to_rgb(image) diff --git a/kornia/color/hsv.py b/kornia/color/hsv.py index ce5cc4c55c..4b82447306 100644 --- a/kornia/color/hsv.py +++ b/kornia/color/hsv.py @@ -1,4 +1,5 @@ import math +from typing import ClassVar, List import torch @@ -116,6 +117,9 @@ class RgbToHsv(Module): >>> output = hsv(input) # 2x3x4x5 """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def __init__(self, eps: float = 1e-6) -> None: super().__init__() self.eps = eps @@ -142,5 +146,8 @@ class HsvToRgb(Module): >>> output = rgb(input) # 2x3x4x5 """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def forward(self, image: torch.Tensor) -> torch.Tensor: return hsv_to_rgb(image) diff --git a/kornia/color/lab.py b/kornia/color/lab.py index c9bda065f9..3dedc03e2c 100644 --- a/kornia/color/lab.py +++ b/kornia/color/lab.py @@ -3,6 +3,8 @@ https://github.com/scikit-image/scikit-image/blob/a48bf6774718c64dade4548153ae16065b595ca9/skimage/color/colorconv.py """ +from typing import ClassVar, List + import torch from kornia.core import ImageModule as Module @@ -150,6 +152,9 @@ class RgbToLab(Module): [3] https://github.com/torch/image/blob/dc061b98fb7e946e00034a5fc73e883a299edc7f/generic/image.c#L1467 """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def forward(self, image: torch.Tensor) -> torch.Tensor: return rgb_to_lab(image) @@ -177,5 +182,8 @@ class LabToRgb(Module): [3] https://github.com/torch/image/blob/dc061b98fb7e946e00034a5fc73e883a299edc7f/generic/image.c#L1518 """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def forward(self, image: torch.Tensor, clip: bool = True) -> torch.Tensor: return lab_to_rgb(image, clip) diff --git a/kornia/color/luv.py b/kornia/color/luv.py index 4af051e4f8..34c00bb746 100644 --- a/kornia/color/luv.py +++ b/kornia/color/luv.py @@ -3,7 +3,7 @@ https://github.com/scikit-image/scikit-image/blob/a48bf6774718c64dade4548153ae16065b595ca9/skimage/color/colorconv.py """ -from typing import Tuple +from typing import ClassVar, List, Tuple import torch @@ -141,6 +141,9 @@ class RgbToLuv(Module): [3] http://www.poynton.com/ColorFAQ.html """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def forward(self, image: torch.Tensor) -> torch.Tensor: return rgb_to_luv(image) @@ -168,5 +171,8 @@ class LuvToRgb(Module): [3] http://www.poynton.com/ColorFAQ.html """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def forward(self, image: torch.Tensor) -> torch.Tensor: return luv_to_rgb(image) diff --git a/kornia/color/raw.py b/kornia/color/raw.py index 7dca550375..b13459fb3e 100644 --- a/kornia/color/raw.py +++ b/kornia/color/raw.py @@ -1,4 +1,5 @@ from enum import Enum +from typing import ClassVar, List import torch @@ -288,6 +289,9 @@ class RawToRgb(Module): >>> output = rgb(rawinput) # 2x3x4x5 """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 1, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def __init__(self, cfa: CFA) -> None: super().__init__() self.cfa = cfa @@ -314,6 +318,9 @@ class RgbToRaw(Module): >>> output = raw(rgbinput) # 2x1x4x6 """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 1, -1, -1] + def __init__(self, cfa: CFA) -> None: super().__init__() self.cfa = cfa diff --git a/kornia/color/rgb.py b/kornia/color/rgb.py index 1ac9d6761e..068b19f5eb 100644 --- a/kornia/color/rgb.py +++ b/kornia/color/rgb.py @@ -1,4 +1,4 @@ -from typing import Union, cast +from typing import ClassVar, List, Union, cast import torch @@ -248,6 +248,9 @@ class BgrToRgb(Module): >>> output = rgb(input) # 2x3x4x5 """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def forward(self, image: Tensor) -> Tensor: return bgr_to_rgb(image) @@ -270,6 +273,9 @@ class RgbToBgr(Module): >>> output = bgr(input) # 2x3x4x5 """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def forward(self, image: Tensor) -> Tensor: return rgb_to_bgr(image) @@ -298,6 +304,9 @@ class RgbToRgba(Module): >>> output = rgba(input) # 2x4x4x5 """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 4, -1, -1] + def __init__(self, alpha_val: Union[float, Tensor]) -> None: super().__init__() self.alpha_val = alpha_val @@ -330,6 +339,9 @@ class BgrToRgba(Module): >>> output = rgba(input) # 2x4x4x5 """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 4, -1, -1] + def __init__(self, alpha_val: Union[float, Tensor]) -> None: super().__init__() self.alpha_val = alpha_val @@ -356,6 +368,9 @@ class RgbaToRgb(Module): >>> output = rgba(input) # 2x3x4x5 """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 4, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def forward(self, image: Tensor) -> Tensor: return rgba_to_rgb(image) @@ -378,6 +393,9 @@ class RgbaToBgr(Module): >>> output = rgba(input) # 2x3x4x5 """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 4, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def forward(self, image: Tensor) -> Tensor: return rgba_to_bgr(image) @@ -408,6 +426,9 @@ class RgbToLinearRgb(Module): [3] https://en.wikipedia.org/wiki/SRGB """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def forward(self, image: Tensor) -> Tensor: return rgb_to_linear_rgb(image) @@ -437,5 +458,8 @@ class LinearRgbToRgb(Module): [3] https://en.wikipedia.org/wiki/SRGB """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def forward(self, image: Tensor) -> Tensor: return linear_rgb_to_rgb(image) diff --git a/kornia/color/xyz.py b/kornia/color/xyz.py index 8a611e16bf..456f1e8e55 100644 --- a/kornia/color/xyz.py +++ b/kornia/color/xyz.py @@ -1,3 +1,5 @@ +from typing import ClassVar, List + import torch from kornia.core import ImageModule as Module @@ -91,6 +93,9 @@ class RgbToXyz(Module): [1] https://docs.opencv.org/4.0.1/de/d25/imgproc_color_conversions.html """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def forward(self, image: Tensor) -> Tensor: return rgb_to_xyz(image) @@ -114,5 +119,8 @@ class XyzToRgb(Module): [1] https://docs.opencv.org/4.0.1/de/d25/imgproc_color_conversions.html """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def forward(self, image: Tensor) -> Tensor: return xyz_to_rgb(image) diff --git a/kornia/color/ycbcr.py b/kornia/color/ycbcr.py index 918dad1068..014d1fcaba 100644 --- a/kornia/color/ycbcr.py +++ b/kornia/color/ycbcr.py @@ -1,3 +1,5 @@ +from typing import ClassVar, List + import torch from kornia.core import ImageModule as Module @@ -121,6 +123,9 @@ class RgbToYcbcr(Module): >>> output = ycbcr(input) # 2x3x4x5 """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def forward(self, image: Tensor) -> Tensor: return rgb_to_ycbcr(image) @@ -143,5 +148,8 @@ class YcbcrToRgb(Module): >>> output = rgb(input) # 2x3x4x5 """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def forward(self, image: Tensor) -> Tensor: return ycbcr_to_rgb(image) diff --git a/kornia/color/yuv.py b/kornia/color/yuv.py index bf620c83ed..88f6f835fe 100644 --- a/kornia/color/yuv.py +++ b/kornia/color/yuv.py @@ -1,4 +1,4 @@ -from typing import Tuple +from typing import ClassVar, List, Tuple import torch @@ -293,6 +293,9 @@ class RgbToYuv(Module): [1] https://es.wikipedia.org/wiki/YUV#RGB_a_Y'UV """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def forward(self, input: Tensor) -> Tensor: return rgb_to_yuv(input) @@ -324,6 +327,9 @@ class RgbToYuv420(Module): [1] https://es.wikipedia.org/wiki/YUV#RGB_a_Y'UV """ + # TODO: Handle multiple inputs and outputs models later + ONNX_EXPORTABLE = False + def forward(self, yuvinput: Tensor) -> Tuple[Tensor, Tensor]: # skipcq: PYL-R0201 return rgb_to_yuv420(yuvinput) @@ -355,6 +361,9 @@ class RgbToYuv422(Module): [1] https://es.wikipedia.org/wiki/YUV#RGB_a_Y'UV """ + # TODO: Handle multiple inputs and outputs models later + ONNX_EXPORTABLE = False + def forward(self, yuvinput: Tensor) -> Tuple[Tensor, Tensor]: # skipcq: PYL-R0201 return rgb_to_yuv422(yuvinput) @@ -382,6 +391,9 @@ class YuvToRgb(Module): >>> output = rgb(input) # 2x3x4x5 """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def forward(self, input: Tensor) -> Tensor: return yuv_to_rgb(input) @@ -413,6 +425,9 @@ class Yuv420ToRgb(Module): >>> output = rgb(inputy, inputuv) # 2x3x4x6 """ + # TODO: Handle multiple inputs and outputs models later + ONNX_EXPORTABLE = False + def forward(self, inputy: Tensor, inputuv: Tensor) -> Tensor: # skipcq: PYL-R0201 return yuv420_to_rgb(inputy, inputuv) @@ -444,5 +459,8 @@ class Yuv422ToRgb(Module): >>> output = rgb(inputy, inputuv) # 2x3x4x6 """ + # TODO: Handle multiple inputs and outputs models later + ONNX_EXPORTABLE = False + def forward(self, inputy: Tensor, inputuv: Tensor) -> Tensor: # skipcq: PYL-R0201 return yuv422_to_rgb(inputy, inputuv) diff --git a/kornia/config.py b/kornia/config.py new file mode 100644 index 0000000000..c21bc25b25 --- /dev/null +++ b/kornia/config.py @@ -0,0 +1,51 @@ +from dataclasses import dataclass, field +from enum import Enum + +__all__ = ["kornia_config", "InstallationMode"] + + +class InstallationMode(str, Enum): + # Ask the user if to install the dependencies + ASK = "ASK" + # Install the dependencies + AUTO = "AUTO" + # Raise an error if the dependencies are not installed + RAISE = "RAISE" + + def __eq__(self, other: object) -> bool: + if isinstance(other, str): + return self.value.lower() == other.lower() # Case-insensitive comparison + return super().__eq__(other) + + +class LazyLoaderConfig: + _installation_mode: InstallationMode = InstallationMode.ASK + + @property + def installation_mode(self) -> InstallationMode: + return self._installation_mode + + @installation_mode.setter + def installation_mode(self, value: str) -> None: + # Allow setting via string by converting to the Enum + if isinstance(value, str): + try: + self._installation_mode = InstallationMode(value.upper()) + except ValueError: + raise ValueError(f"{value} is not a valid InstallationMode. Choose from: {list(InstallationMode)}") + elif isinstance(value, InstallationMode): + self._installation_mode = value + else: + raise TypeError("installation_mode must be a string or InstallationMode Enum.") + + +@dataclass +class KorniaConfig: + output_dir: str = "kornia_outputs" + hub_cache_dir: str = ".kornia_hub" + hub_models_dir: str = ".kornia_hub/models" + hub_onnx_dir: str = ".kornia_hub/onnx_models" + lazyloader: LazyLoaderConfig = field(default_factory=LazyLoaderConfig) + + +kornia_config = KorniaConfig() diff --git a/kornia/constants.py b/kornia/constants.py index 72ac09863f..9bf340d329 100644 --- a/kornia/constants.py +++ b/kornia/constants.py @@ -1,3 +1,4 @@ +import logging from enum import Enum, EnumMeta from typing import Iterator, Type, TypeVar, Union @@ -7,6 +8,9 @@ __all__ = ["pi", "DType", "Resample", "BorderType", "SamplePadding", "TKEnum"] +# NOTE: to remove later +logging.basicConfig(level=logging.INFO) + pi = torch.tensor(3.14159265358979323846) diff --git a/kornia/contrib/models/rt_detr/architecture/rtdetr_head.py b/kornia/contrib/models/rt_detr/architecture/rtdetr_head.py index 6345577e5a..3c731238b5 100644 --- a/kornia/contrib/models/rt_detr/architecture/rtdetr_head.py +++ b/kornia/contrib/models/rt_detr/architecture/rtdetr_head.py @@ -274,6 +274,7 @@ def __init__( num_denoising: int = 100, ) -> None: super().__init__() + self.num_classes = num_classes self.num_queries = num_queries # TODO: verify this is correct if len(in_channels) > num_levels: diff --git a/kornia/contrib/models/rt_detr/post_processor.py b/kornia/contrib/models/rt_detr/post_processor.py index 95b35623df..0df2c1190c 100644 --- a/kornia/contrib/models/rt_detr/post_processor.py +++ b/kornia/contrib/models/rt_detr/post_processor.py @@ -2,11 +2,12 @@ from __future__ import annotations -from typing import Optional +from typing import Optional, Union import torch -from kornia.core import Module, Tensor, concatenate +from kornia.core import Module, Tensor, concatenate, tensor +from kornia.models.detection.utils import BoxFiltering def mod(a: Tensor, b: int) -> Tensor: @@ -37,14 +38,18 @@ def __init__( num_classes: int = 80, num_top_queries: int = 300, confidence_filtering: bool = True, + filter_as_zero: bool = False, ) -> None: super().__init__() self.confidence_threshold = confidence_threshold self.num_classes = num_classes self.confidence_filtering = confidence_filtering self.num_top_queries = num_top_queries + self.box_filtering = BoxFiltering( + tensor(confidence_threshold) if confidence_threshold is not None else None, filter_as_zero=filter_as_zero + ) - def forward(self, logits: Tensor, boxes: Tensor, original_sizes: Tensor) -> Tensor: + def forward(self, logits: Tensor, boxes: Tensor, original_sizes: Tensor) -> Union[Tensor, list[Tensor]]: """Post-process outputs from DETR. Args: @@ -88,6 +93,4 @@ def forward(self, logits: Tensor, boxes: Tensor, original_sizes: Tensor) -> Tens if not self.confidence_filtering or self.confidence_threshold == 0: return all_boxes - return all_boxes[(all_boxes[:, :, 1] > self.confidence_threshold).unsqueeze(-1).expand_as(all_boxes)].view( - all_boxes.shape[0], -1, all_boxes.shape[-1] - ) + return self.box_filtering(all_boxes, self.confidence_threshold) diff --git a/kornia/contrib/models/sam/model.py b/kornia/contrib/models/sam/model.py index f9cbbfa140..c528baf365 100644 --- a/kornia/contrib/models/sam/model.py +++ b/kornia/contrib/models/sam/model.py @@ -86,6 +86,25 @@ def __init__( self.prompt_encoder = prompt_encoder self.mask_decoder = mask_decoder + @staticmethod + def from_name(name: str) -> Sam: + """Build/load the SAM model based on it's name. + + Args: + name: The name of the SAM model. Valid names are: + - 'vit_b' + - 'vit_l' + - 'vit_h' + - 'mobile_sam' + + Returns: + The respective SAM model + """ + if name in ["vit_b", "vit_l", "vit_h", "mobile_sam"]: + return Sam.from_config(SamConfig(name)) + else: + raise ValueError(f"Invalid SAM model name: {name}") + @staticmethod def from_config(config: SamConfig) -> Sam: """Build/load the SAM model based on it's config. diff --git a/kornia/contrib/object_detection.py b/kornia/contrib/object_detection.py index 4a4b965dc2..e43d97394d 100644 --- a/kornia/contrib/object_detection.py +++ b/kornia/contrib/object_detection.py @@ -1,21 +1,24 @@ from __future__ import annotations -import datetime -import logging -import os -from dataclasses import dataclass -from enum import Enum -from typing import Optional, Union - -import torch - -from kornia.core import Module, Tensor, concatenate -from kornia.core.check import KORNIA_CHECK_SHAPE -from kornia.core.external import PILImage as Image -from kornia.core.external import numpy as np -from kornia.geometry.transform import resize -from kornia.io import write_image -from kornia.utils.draw import draw_rectangle +import warnings +from typing import Any + +from kornia.models.detection.base import ( + BoundingBox as BoundingBoxBase, +) +from kornia.models.detection.base import ( + BoundingBoxDataFormat, +) +from kornia.models.detection.base import ( + ObjectDetector as ObjectDetectorBase, +) +from kornia.models.detection.base import ( + ObjectDetectorResult as ObjectDetectorResultBase, +) +from kornia.models.detection.base import ( + results_from_detections as results_from_detections_base, +) +from kornia.models.utils import ResizePreProcessor as ResizePreProcessorBase __all__ = [ "BoundingBoxDataFormat", @@ -26,218 +29,51 @@ "ObjectDetectorResult", ] -logger = logging.getLogger(__name__) - - -class BoundingBoxDataFormat(Enum): - """Enum class that maps bounding box data format.""" - - XYWH = 0 - XYXY = 1 - CXCYWH = 2 - CENTER_XYWH = 2 - - -# NOTE: probably we should use a more generic name like BoundingBox2D -# and add a BoundingBox3D class for 3D bounding boxes. Also for serialization -# we should have an explicit class for each format to make it more production ready -# specially to serialize to protobuf and not saturate at a high rates. - - -@dataclass(frozen=True) -class BoundingBox: - """Bounding box data class. - - Useful for representing bounding boxes in different formats for object detection. - - Args: - data: tuple of bounding box data. The length of the tuple depends on the data format. - data_format: bounding box data format. - """ - - data: tuple[float, float, float, float] - data_format: BoundingBoxDataFormat - - -@dataclass(frozen=True) -class ObjectDetectorResult: - """Object detection result. - - Args: - class_id: class id of the detected object. - confidence: confidence score of the detected object. - bbox: bounding box of the detected object in xywh format. - """ - - class_id: int - confidence: float - bbox: BoundingBox - -def results_from_detections(detections: Tensor, format: str | BoundingBoxDataFormat) -> list[ObjectDetectorResult]: - """Convert a detection tensor to a list of :py:class:`ObjectDetectorResult`. - - Args: - detections: tensor with shape :math:`(D, 6)`, where :math:`D` is the number of detections in the given image, - :math:`6` represents class id, score, and `xywh` bounding box. - - Returns: - list of :py:class:`ObjectDetectorResult`. - """ - KORNIA_CHECK_SHAPE(detections, ["D", "6"]) - - if isinstance(format, str): - format = BoundingBoxDataFormat[format.upper()] - - results: list[ObjectDetectorResult] = [] - for det in detections: - det = det.squeeze().tolist() - if len(det) != 6: - continue - results.append( - ObjectDetectorResult( - class_id=int(det[0]), - confidence=det[1], - bbox=BoundingBox(data=(det[2], det[3], det[4], det[5]), data_format=format), - ) +class BoundingBox(BoundingBoxBase): + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + warnings.warn( + "BoundingBox is deprecated and will be removed in v0.8.0. " + "Use kornia.models.detection.BoundingBox instead.", + DeprecationWarning, ) - return results - - -class ResizePreProcessor(Module): - """This module resizes a list of image tensors to the given size. - - Additionally, also returns the original image sizes for further post-processing. - """ - - def __init__(self, size: tuple[int, int], interpolation_mode: str = "bilinear") -> None: - """ - Args: - size: images will be resized to this value. If a 2-integer tuple is given, it is interpreted as - (height, width). - interpolation_mode: interpolation mode for image resizing. Supported values: ``nearest``, ``bilinear``, - ``bicubic``, ``area``, and ``nearest-exact``. - """ - super().__init__() - self.size = size - self.interpolation_mode = interpolation_mode - def forward(self, imgs: Union[Tensor, list[Tensor]]) -> tuple[Tensor, Tensor]: - """ - Returns: - resized_imgs: resized images in a batch. - original_sizes: the original image sizes of (height, width). - """ - # TODO: support other input formats e.g. file path, numpy - resized_imgs: list[Tensor] = [] - iters = len(imgs) if isinstance(imgs, list) else imgs.shape[0] - original_sizes = imgs[0].new_zeros((iters, 2)) - for i in range(iters): - img = imgs[i] - original_sizes[i, 0] = img.shape[-2] # Height - original_sizes[i, 1] = img.shape[-1] # Width - resized_imgs.append(resize(img[None], size=self.size, interpolation=self.interpolation_mode)) - return concatenate(resized_imgs), original_sizes +def results_from_detections(*args: Any, **kwargs: Any) -> list[ObjectDetectorResultBase]: + warnings.warn( + "results_from_detections is deprecated and will be removed in v0.8.0. " + "Use kornia.models.detection.results_from_detections instead.", + DeprecationWarning, + ) + return results_from_detections_base(*args, **kwargs) -# TODO: move this to kornia.models as AlgorithmicModel api -class ObjectDetector(Module): - """This class wraps an object detection model and performs pre-processing and post-processing.""" - - def __init__(self, model: Module, pre_processor: Module, post_processor: Module) -> None: - """Construct an Object Detector object. - - Args: - model: an object detection model. - pre_processor: a pre-processing module - post_processor: a post-processing module. - """ - super().__init__() - self.model = model.eval() - self.pre_processor = pre_processor.eval() - self.post_processor = post_processor.eval() - - @torch.inference_mode() - def forward(self, images: Union[Tensor, list[Tensor]]) -> Tensor: - """Detect objects in a given list of images. - - Args: - images: If list of RGB images. Each image is a Tensor with shape :math:`(3, H, W)`. - If Tensor, a Tensor with shape :math:`(B, 3, H, W)`. - - Returns: - list of detections found in each image. For item in a batch, shape is :math:`(D, 6)`, where :math:`D` is the - number of detections in the given image, :math:`6` represents class id, score, and `xywh` bounding box. - """ - images, images_sizes = self.pre_processor(images) - logits, boxes = self.model(images) - detections = self.post_processor(logits, boxes, images_sizes) - return detections - - def draw( - self, images: Union[Tensor, list[Tensor]], detections: Optional[Tensor] = None, output_type: str = "torch" - ) -> Union[Tensor, list[Tensor], list[Image.Image]]: # type: ignore - """Very simple drawing. +class ResizePreProcessor(ResizePreProcessorBase): + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + warnings.warn( + "ResizePreProcessor is deprecated and will be removed in v0.8.0. " + "Use kornia.models.utils.ResizePreProcessor instead.", + DeprecationWarning, + ) - Needs to be more fancy later. - """ - if detections is None: - detections = self.forward(images) - output = [] - for image, detection in zip(images, detections): - out_img = image[None].clone() - for out in detection: - out_img = draw_rectangle( - out_img, - torch.Tensor([[[out[-4], out[-3], out[-4] + out[-2], out[-3] + out[-1]]]]), - ) - if output_type == "torch": - output.append(out_img[0]) - elif output_type == "pil": - output.append(Image.fromarray((out_img[0] * 255).permute(1, 2, 0).numpy().astype(np.uint8))) # type: ignore - else: - raise RuntimeError(f"Unsupported output type `{output_type}`.") - return output - def save( - self, images: Union[Tensor, list[Tensor]], detections: Optional[Tensor] = None, directory: Optional[str] = None - ) -> None: - """Saves the output image(s) to a directory. +class ObjectDetector(ObjectDetectorBase): + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + warnings.warn( + "ObjectDetector is deprecated and will be removed in v0.8.0. " + "Use kornia.models.detection.ObjectDetector instead.", + DeprecationWarning, + ) - Args: - name: Directory to save the images. - n_row: Number of images displayed in each row of the grid. - """ - if directory is None: - name = f"detection-{datetime.datetime.now(tz=datetime.timezone.utc).strftime('%Y%m%d%H%M%S')!s}" - directory = os.path.join("Kornia_outputs", name) - outputs = self.draw(images, detections) - os.makedirs(directory, exist_ok=True) - for i, out_image in enumerate(outputs): - write_image( - os.path.join(directory, f"{str(i).zfill(6)}.jpg"), - out_image.mul(255.0).byte(), - ) - logger.info(f"Outputs are saved in {directory}") - def compile( - self, - *, - fullgraph: bool = False, - dynamic: bool = False, - backend: str = "inductor", - mode: Optional[str] = None, - options: Optional[dict[str, str | int | bool]] = None, - disable: bool = False, - ) -> None: - """Compile the internal object detection model with :py:func:`torch.compile()`.""" - self.model = torch.compile( # type: ignore - self.model, - fullgraph=fullgraph, - dynamic=dynamic, - backend=backend, - mode=mode, - options=options, - disable=disable, +class ObjectDetectorResult(ObjectDetectorResultBase): + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + warnings.warn( + "ObjectDetectorResult is deprecated and will be removed in v0.8.0. " + "Use kornia.models.detection.ObjectDetectorResult instead.", + DeprecationWarning, ) diff --git a/kornia/core/__init__.py b/kornia/core/__init__.py index 2e1ebc2f7c..58ace6afa2 100644 --- a/kornia/core/__init__.py +++ b/kornia/core/__init__.py @@ -32,7 +32,7 @@ zeros, zeros_like, ) -from .module import ImageModule +from .module import ImageModule, ImageModuleMixIn, ONNXExportMixin from .tensor_wrapper import TensorWrapper # type: ignore __all__ = [ @@ -70,4 +70,6 @@ "TensorWrapper", "map_coordinates", "ImageModule", + "ONNXExportMixin", + "ImageModuleMixIn", ] diff --git a/kornia/core/external.py b/kornia/core/external.py index 1e0160035d..2596ad0271 100644 --- a/kornia/core/external.py +++ b/kornia/core/external.py @@ -5,6 +5,8 @@ from types import ModuleType from typing import List, Optional +from kornia.config import InstallationMode, kornia_config + logger = logging.getLogger(__name__) @@ -45,23 +47,36 @@ def _load(self) -> None: try: self.module = importlib.import_module(self.module_name) except ImportError as e: - if self.auto_install: + if kornia_config.lazyloader.installation_mode == InstallationMode.AUTO or self.auto_install: self._install_package(self.module_name) - else: + elif kornia_config.lazyloader.installation_mode == InstallationMode.ASK: + to_ask = True if_install = input( f"Optional dependency '{self.module_name}' is not installed. " + "You may silent this prompt by `kornia_config.lazyloader.installation_mode = 'auto'`. " "Do you wish to install the dependency? [Y]es, [N]o, [A]ll." ) - if if_install.lower() == "y": - self._install_package(self.module_name) - elif if_install.lower() == "a": - self.auto_install = True - self._install_package(self.module_name) - else: - raise ImportError( - f"Optional dependency '{self.module_name}' is not installed. " - f"Please install it to use this functionality." - ) from e + while to_ask: + if if_install.lower() == "y" or if_install.lower() == "yes": + self._install_package(self.module_name) + to_ask = False + elif if_install.lower() == "a" or if_install.lower() == "all": + self.auto_install = True + self._install_package(self.module_name) + to_ask = False + elif if_install.lower() == "n" or if_install.lower() == "no": + raise ImportError( + f"Optional dependency '{self.module_name}' is not installed. " + f"Please install it to use this functionality." + ) from e + else: + if_install = input("Invalid input. Please enter 'Y', 'N', or 'A'.") + + elif kornia_config.lazyloader.installation_mode == InstallationMode.RAISE: + raise ImportError( + f"Optional dependency '{self.module_name}' is not installed. " + f"Please install it to use this functionality." + ) from e def __getattr__(self, item: str) -> object: """Loads the module (if not already loaded) and returns the requested attribute. @@ -94,3 +109,7 @@ def __dir__(self) -> List[str]: numpy = LazyLoader("numpy") PILImage = LazyLoader("PIL.Image") diffusers = LazyLoader("diffusers") +onnx = LazyLoader("onnx") +onnxruntime = LazyLoader("onnxruntime") +boxmot = LazyLoader("boxmot") +segmentation_models_pytorch = LazyLoader("segmentation_models_pytorch") diff --git a/kornia/core/module.py b/kornia/core/module.py index 97132caf52..12a875df6a 100644 --- a/kornia/core/module.py +++ b/kornia/core/module.py @@ -2,13 +2,120 @@ import math import os from functools import wraps -from typing import Any, Callable, List, Optional, Tuple, Union +from typing import Any, Callable, ClassVar, Dict, List, Optional, Tuple, Union + +import torch import kornia -from ._backend import Module, Tensor, from_numpy +from ._backend import Module, Tensor, from_numpy, rand from .external import PILImage as Image from .external import numpy as np +from .external import onnx + + +class ONNXExportMixin: + """Mixin class that provides ONNX export functionality for objects that support it. + + Attributes: + ONNX_EXPORTABLE: + A flag indicating whether the object can be exported to ONNX. Default is True. + ONNX_DEFAULT_INPUTSHAPE: + Default input shape for the ONNX export. A list of integers where `-1` indicates + dynamic dimensions. Default is [-1, -1, -1, -1]. + ONNX_DEFAULT_OUTPUTSHAP: + Default output shape for the ONNX export. A list of integers where `-1` indicates + dynamic dimensions. Default is [-1, -1, -1, -1]. + ONNX_EXPORT_PSEUDO_SHAPE: + This is used to create a dummy input tensor for the ONNX export. Default is [1, 3, 256, 256]. + It dimension shall match the ONNX_DEFAULT_INPUTSHAPE and ONNX_DEFAULT_OUTPUTSHAPE. + Non-image dimensions are allowed. + + Note: + - If `ONNX_EXPORTABLE` is False, indicating that the object cannot be exported to ONNX. + """ + + ONNX_EXPORTABLE: bool = True + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, -1, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, -1, -1, -1] + ONNX_EXPORT_PSEUDO_SHAPE: ClassVar[List[int]] = [1, 3, 256, 256] + ADDITIONAL_METADATA: ClassVar[List[Tuple[str, str]]] = [] + + def to_onnx( + self, + onnx_name: Optional[str] = None, + input_shape: Optional[List[int]] = None, + output_shape: Optional[List[int]] = None, + ) -> None: + """Exports the current object to an ONNX model file. + + Args: + onnx_name: + The name of the output ONNX file. If not provided, a default name in the + format "Kornia-.onnx" will be used. + input_shape: + The input shape for the model as a list of integers. If None, + `ONNX_DEFAULT_INPUTSHAPE` will be used. Dynamic dimensions can be indicated by `-1`. + output_shape: + The output shape for the model as a list of integers. If None, + `ONNX_DEFAULT_OUTPUTSHAPE` will be used. Dynamic dimensions can be indicated by `-1`. + + Notes: + - A dummy input tensor is created based on the provided or default input shape. + - Dynamic axes for input and output tensors are configured where dimensions are marked `-1`. + - The model is exported with `torch.onnx.export`, with constant folding enabled and opset version set to 17. + """ + if not self.ONNX_EXPORTABLE: + raise RuntimeError("This object cannot be exported to ONNX.") + + if input_shape is None: + input_shape = self.ONNX_DEFAULT_INPUTSHAPE + if output_shape is None: + output_shape = self.ONNX_DEFAULT_OUTPUTSHAPE + + if onnx_name is None: + onnx_name = f"Kornia-{self.__class__.__name__}.onnx" + + dummy_input = self._create_dummy_input(input_shape) + dynamic_axes = self._create_dynamic_axes(input_shape, output_shape) + + torch.onnx.export( + self, # type: ignore + dummy_input, + onnx_name, + export_params=True, + opset_version=17, + do_constant_folding=True, + input_names=["input"], + output_names=["output"], + dynamic_axes=dynamic_axes, + ) + + self._add_metadata(onnx_name) + + def _create_dummy_input(self, input_shape: List[int]) -> Union[Tuple[Any, ...], Tensor]: + return rand(*[(self.ONNX_EXPORT_PSEUDO_SHAPE[i] if dim == -1 else dim) for i, dim in enumerate(input_shape)]) + + def _create_dynamic_axes(self, input_shape: List[int], output_shape: List[int]) -> Dict[str, Dict[int, str]]: + return { + "input": {i: "dim_" + str(i) for i, dim in enumerate(input_shape) if dim == -1}, + "output": {i: "dim_" + str(i) for i, dim in enumerate(output_shape) if dim == -1}, + } + + def _add_metadata(self, onnx_name: str) -> None: + onnx_model = onnx.load(onnx_name) # type: ignore + + for key, value in [ + ("source", "kornia"), + ("version", kornia.__version__), + ("class", self.__class__.__name__), + *self.ADDITIONAL_METADATA, + ]: + metadata_props = onnx_model.metadata_props.add() + metadata_props.key = key + metadata_props.value = str(value) + + onnx.save(onnx_model, onnx_name) # type: ignore class ImageModuleMixIn: @@ -219,7 +326,7 @@ def save(self, name: Optional[str] = None, n_row: Optional[int] = None) -> None: kornia.io.write_image(name, out_image.mul(255.0).byte()) -class ImageModule(Module, ImageModuleMixIn): +class ImageModule(Module, ImageModuleMixIn, ONNXExportMixin): """Handles image-based operations. This modules accepts multiple input and output data types, provides end-to-end diff --git a/kornia/enhance/adjust.py b/kornia/enhance/adjust.py index 82b2bb0d0b..18e79b6eb1 100644 --- a/kornia/enhance/adjust.py +++ b/kornia/enhance/adjust.py @@ -1,5 +1,5 @@ from math import pi -from typing import Optional, Union +from typing import ClassVar, List, Optional, Union import torch @@ -1042,6 +1042,9 @@ class AdjustSaturation(Module): tensor(0.) """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def __init__(self, saturation_factor: Union[float, Tensor]) -> None: super().__init__() self.saturation_factor: Union[float, Tensor] = saturation_factor @@ -1089,6 +1092,9 @@ class AdjustSaturationWithGraySubtraction(Module): tensor(0.) """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def __init__(self, saturation_factor: Union[float, Tensor]) -> None: super().__init__() self.saturation_factor: Union[float, Tensor] = saturation_factor @@ -1136,6 +1142,9 @@ class AdjustHue(Module): torch.Size([2, 3, 3, 3]) """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + def __init__(self, hue_factor: Union[float, Tensor]) -> None: super().__init__() self.hue_factor: Union[float, Tensor] = hue_factor diff --git a/kornia/enhance/normalize.py b/kornia/enhance/normalize.py index 423d6ee3bd..f46748edaa 100644 --- a/kornia/enhance/normalize.py +++ b/kornia/enhance/normalize.py @@ -54,10 +54,10 @@ def __init__( std = torch.tensor([std]) if isinstance(mean, (tuple, list)): - mean = torch.tensor(mean) + mean = torch.tensor(mean)[None] if isinstance(std, (tuple, list)): - std = torch.tensor(std) + std = torch.tensor(std)[None] self.mean = mean self.std = std @@ -100,28 +100,37 @@ def normalize(data: Tensor, mean: Tensor, std: Tensor) -> Tensor: torch.Size([1, 4, 3, 3]) """ shape = data.shape - if len(mean.shape) == 0 or mean.shape[0] == 1: - mean = mean.expand(shape[1]) - if len(std.shape) == 0 or std.shape[0] == 1: - std = std.expand(shape[1]) - # Allow broadcast on channel dimension - if mean.shape and mean.shape[0] != 1: - if mean.shape[0] != data.shape[1] and mean.shape[:2] != data.shape[:2]: - raise ValueError(f"mean length and number of channels do not match. Got {mean.shape} and {data.shape}.") - - # Allow broadcast on channel dimension - if std.shape and std.shape[0] != 1: - if std.shape[0] != data.shape[1] and std.shape[:2] != data.shape[:2]: - raise ValueError(f"std length and number of channels do not match. Got {std.shape} and {data.shape}.") - - mean = torch.as_tensor(mean, device=data.device, dtype=data.dtype) - std = torch.as_tensor(std, device=data.device, dtype=data.dtype) - - if mean.shape: - mean = mean[..., :, None] - if std.shape: - std = std[..., :, None] + if torch.onnx.is_in_onnx_export(): + if not isinstance(mean, Tensor) or not isinstance(std, Tensor): + raise ValueError("Only tensor is accepted when converting to ONNX.") + if mean.shape[0] != 1 or std.shape[0] != 1: + raise ValueError( + "Batch dimension must be one for broadcasting when converting to ONNX." + f"Try changing mean shape and std shape from ({mean.shape}, {std.shape}) to (1, C) or (1, C, 1, 1)." + ) + else: + if isinstance(mean, float): + mean = torch.tensor([mean] * shape[1], device=data.device, dtype=data.dtype) + + if isinstance(std, float): + std = torch.tensor([std] * shape[1], device=data.device, dtype=data.dtype) + + # Allow broadcast on channel dimension + if mean.shape and mean.shape[0] != 1: + if mean.shape[0] != data.shape[1] and mean.shape[:2] != data.shape[:2]: + raise ValueError(f"mean length and number of channels do not match. Got {mean.shape} and {data.shape}.") + + # Allow broadcast on channel dimension + if std.shape and std.shape[0] != 1: + if std.shape[0] != data.shape[1] and std.shape[:2] != data.shape[:2]: + raise ValueError(f"std length and number of channels do not match. Got {std.shape} and {data.shape}.") + + mean = torch.as_tensor(mean, device=data.device, dtype=data.dtype) + std = torch.as_tensor(std, device=data.device, dtype=data.dtype) + + mean = mean[..., None] + std = std[..., None] out: Tensor = (data.view(shape[0], shape[1], -1) - mean) / std @@ -203,38 +212,33 @@ def denormalize(data: Tensor, mean: Union[Tensor, float], std: Union[Tensor, flo """ shape = data.shape - if isinstance(mean, float): - mean = torch.tensor([mean] * shape[1], device=data.device, dtype=data.dtype) - - if isinstance(std, float): - std = torch.tensor([std] * shape[1], device=data.device, dtype=data.dtype) - - if not isinstance(data, Tensor): - raise TypeError(f"data should be a tensor. Got {type(data)}") - - if not isinstance(mean, Tensor): - raise TypeError(f"mean should be a tensor or a float. Got {type(mean)}") - - if not isinstance(std, Tensor): - raise TypeError(f"std should be a tensor or float. Got {type(std)}") - - # Allow broadcast on channel dimension - if mean.shape and mean.shape[0] != 1: - if mean.shape[0] != data.shape[-3] and mean.shape[:2] != data.shape[:2]: - raise ValueError(f"mean length and number of channels do not match. Got {mean.shape} and {data.shape}.") - - # Allow broadcast on channel dimension - if std.shape and std.shape[0] != 1: - if std.shape[0] != data.shape[-3] and std.shape[:2] != data.shape[:2]: - raise ValueError(f"std length and number of channels do not match. Got {std.shape} and {data.shape}.") - - mean = torch.as_tensor(mean, device=data.device, dtype=data.dtype) - std = torch.as_tensor(std, device=data.device, dtype=data.dtype) - - if mean.shape: - mean = mean[..., :, None] - if std.shape: - std = std[..., :, None] + if torch.onnx.is_in_onnx_export(): + if not isinstance(mean, Tensor) or not isinstance(std, Tensor): + raise ValueError("Only tensor is accepted when converting to ONNX.") + if mean.shape[0] != 1 or std.shape[0] != 1: + raise ValueError("Batch dimension must be one for broadcasting when converting to ONNX.") + else: + if isinstance(mean, float): + mean = torch.tensor([mean] * shape[1], device=data.device, dtype=data.dtype) + + if isinstance(std, float): + std = torch.tensor([std] * shape[1], device=data.device, dtype=data.dtype) + + # Allow broadcast on channel dimension + if mean.shape and mean.shape[0] != 1: + if mean.shape[0] != data.shape[-3] and mean.shape[:2] != data.shape[:2]: + raise ValueError(f"mean length and number of channels do not match. Got {mean.shape} and {data.shape}.") + + # Allow broadcast on channel dimension + if std.shape and std.shape[0] != 1: + if std.shape[0] != data.shape[-3] and std.shape[:2] != data.shape[:2]: + raise ValueError(f"std length and number of channels do not match. Got {std.shape} and {data.shape}.") + + mean = torch.as_tensor(mean, device=data.device, dtype=data.dtype) + std = torch.as_tensor(std, device=data.device, dtype=data.dtype) + + mean = mean[..., None] + std = std[..., None] out: Tensor = (data.view(shape[0], shape[1], -1) * std) + mean diff --git a/kornia/filters/canny.py b/kornia/filters/canny.py index 35515bfbff..ff4f315b70 100644 --- a/kornia/filters/canny.py +++ b/kornia/filters/canny.py @@ -170,6 +170,9 @@ class Canny(Module): torch.Size([5, 1, 4, 4]) """ + # TODO: Handle multiple inputs and outputs models later + ONNX_EXPORTABLE = False + def __init__( self, low_threshold: float = 0.1, diff --git a/kornia/filters/dexined.py b/kornia/filters/dexined.py index d28d3f6930..adb3d2437c 100644 --- a/kornia/filters/dexined.py +++ b/kornia/filters/dexined.py @@ -3,7 +3,7 @@ from __future__ import annotations from collections import OrderedDict -from typing import Optional +from typing import ClassVar, Optional import torch import torch.nn.functional as F @@ -125,8 +125,7 @@ def compute_out_features(self, idx: int, up_scale: int) -> int: def forward(self, x: Tensor, out_shape: list[int]) -> Tensor: out = self.features(x) - if out.shape[-2:] != out_shape: - out = F.interpolate(out, out_shape, mode="bilinear") + out = F.interpolate(out, out_shape, mode="bilinear") return out @@ -176,10 +175,13 @@ class DexiNed(Module): >>> img = torch.rand(1, 3, 320, 320) >>> net = DexiNed(pretrained=False) >>> out = net(img) - >>> out[-1].shape + >>> out.shape torch.Size([1, 1, 320, 320]) """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[list[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[list[int]] = [-1, 1, -1, -1] + def __init__(self, pretrained: bool) -> None: super().__init__() self.block_1 = DoubleConvBlock(3, 32, 64, stride=2) @@ -225,7 +227,7 @@ def load_from_file(self, path_file: str) -> None: self.load_state_dict(pretrained_dict, strict=True) self.eval() - def forward(self, x: Tensor) -> list[Tensor]: + def get_features(self, x: Tensor) -> list[Tensor]: # Block 1 block_1 = self.block_1(x) block_1_side = self.side_1(block_1) @@ -269,11 +271,13 @@ def forward(self, x: Tensor) -> list[Tensor]: out_5 = self.up_block_5(block_5, out_shape) out_6 = self.up_block_6(block_6, out_shape) results = [out_1, out_2, out_3, out_4, out_5, out_6] + return results + + def forward(self, x: Tensor) -> Tensor: + features = self.get_features(x) # concatenate multiscale outputs - block_cat = concatenate(results, 1) # Bx6xHxW + block_cat = concatenate(features, 1) # Bx6xHxW block_cat = self.block_cat(block_cat) # Bx1xHxW - # return results - results.append(block_cat) - return results + return block_cat diff --git a/kornia/filters/motion.py b/kornia/filters/motion.py index 985004101b..f084036eb8 100644 --- a/kornia/filters/motion.py +++ b/kornia/filters/motion.py @@ -1,5 +1,7 @@ from __future__ import annotations +from typing import ClassVar + from kornia.core import ImageModule as Module from kornia.core import Tensor from kornia.core.check import KORNIA_CHECK @@ -84,6 +86,10 @@ class MotionBlur3D(Module): >>> output = motion_blur(input) # 2x4x5x7x9 """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[list[int]] = [-1, -1, -1, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[list[int]] = [-1, -1, -1, -1, -1] + ONNX_EXPORT_PSEUDO_SHAPE: ClassVar[list[int]] = [1, 3, 80, 80, 80] + def __init__( self, kernel_size: int, diff --git a/kornia/filters/sobel.py b/kornia/filters/sobel.py index f18e61cb71..6b7a048795 100644 --- a/kornia/filters/sobel.py +++ b/kornia/filters/sobel.py @@ -1,5 +1,7 @@ from __future__ import annotations +from typing import ClassVar + import torch import torch.nn.functional as F @@ -172,6 +174,9 @@ class SpatialGradient(Module): >>> output = SpatialGradient()(input) # 1x3x2x4x4 """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[list[int]] = [-1, -1, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[list[int]] = [-1, -1, 2, -1, -1] + def __init__(self, mode: str = "sobel", order: int = 1, normalized: bool = True) -> None: super().__init__() self.normalized: bool = normalized @@ -206,6 +211,9 @@ class SpatialGradient3d(Module): torch.Size([1, 4, 3, 2, 4, 4]) """ + ONNX_DEFAULT_INPUTSHAPE: ClassVar[list[int]] = [-1, -1, -1, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[list[int]] = [-1, -1, -1, -1, -1, -1] + def __init__(self, mode: str = "diff", order: int = 1) -> None: super().__init__() self.order: int = order diff --git a/kornia/geometry/transform/affwarp.py b/kornia/geometry/transform/affwarp.py index c85c5ddf3d..3abb418ec4 100644 --- a/kornia/geometry/transform/affwarp.py +++ b/kornia/geometry/transform/affwarp.py @@ -2,8 +2,8 @@ from typing import Optional, Tuple, Union import torch -from torch import nn +from kornia.core import ImageModule as Module from kornia.core import ones, ones_like, zeros from kornia.filters import gaussian_blur2d from kornia.utils import _extract_device_dtype @@ -648,7 +648,7 @@ def rescale( return resize(input, size, interpolation=interpolation, align_corners=align_corners, antialias=antialias) -class Resize(nn.Module): +class Resize(Module): r"""Resize the input torch.Tensor to the given size. Args: @@ -704,7 +704,7 @@ def forward(self, input: torch.Tensor) -> torch.Tensor: ) -class Affine(nn.Module): +class Affine(Module): r"""Apply multiple elementary affine transforms simultaneously. Args: @@ -800,7 +800,7 @@ def forward(self, input: torch.Tensor) -> torch.Tensor: return affine(input, matrix[..., :2, :3], self.mode, self.padding_mode, self.align_corners) -class Rescale(nn.Module): +class Rescale(Module): r"""Rescale the input torch.Tensor with the given factor. Args: @@ -843,7 +843,7 @@ def forward(self, input: torch.Tensor) -> torch.Tensor: ) -class Rotate(nn.Module): +class Rotate(Module): r"""Rotate the tensor anti-clockwise about the centre. Args: @@ -888,7 +888,7 @@ def forward(self, input: torch.Tensor) -> torch.Tensor: return rotate(input, self.angle, self.center, self.mode, self.padding_mode, self.align_corners) -class Translate(nn.Module): +class Translate(Module): r"""Translate the tensor in pixel units. Args: @@ -925,7 +925,7 @@ def forward(self, input: torch.Tensor) -> torch.Tensor: return translate(input, self.translation, self.mode, self.padding_mode, self.align_corners) -class Scale(nn.Module): +class Scale(Module): r"""Scale the tensor by a factor. Args: @@ -972,7 +972,7 @@ def forward(self, input: torch.Tensor) -> torch.Tensor: return scale(input, self.scale_factor, self.center, self.mode, self.padding_mode, self.align_corners) -class Shear(nn.Module): +class Shear(Module): r"""Shear the tensor. Args: diff --git a/kornia/geometry/transform/flips.py b/kornia/geometry/transform/flips.py index 6519725dd5..cd0030347d 100644 --- a/kornia/geometry/transform/flips.py +++ b/kornia/geometry/transform/flips.py @@ -1,6 +1,7 @@ import torch -from kornia.core import Module, Tensor +from kornia.core import ImageModule as Module +from kornia.core import Tensor __all__ = ["Vflip", "Hflip", "Rot180", "rot180", "hflip", "vflip"] diff --git a/kornia/models/__init__.py b/kornia/models/__init__.py index e69de29bb2..3f2f71da48 100644 --- a/kornia/models/__init__.py +++ b/kornia/models/__init__.py @@ -0,0 +1,2 @@ +from . import detection, segmentation, tracking +from .utils import * diff --git a/kornia/models/base.py b/kornia/models/base.py new file mode 100644 index 0000000000..20c1b4d9c0 --- /dev/null +++ b/kornia/models/base.py @@ -0,0 +1,67 @@ +import datetime +import logging +import os +from typing import List, Optional, Union + +from kornia.core import Module, Tensor, stack +from kornia.core.external import PILImage as Image +from kornia.core.external import numpy as np +from kornia.io import write_image +from kornia.utils.image import tensor_to_image + +logger = logging.getLogger(__name__) + + +class ModelBase(Module): + """This class wraps a model and performs pre-processing and post-processing.""" + + name: str = "model" + + def __init__( + self, model: Module, pre_processor: Module, post_processor: Module, name: Optional[str] = None + ) -> None: + """Construct an Object Detector object. + + Args: + model: an object detection model. + pre_processor: a pre-processing module + post_processor: a post-processing module. + """ + super().__init__() + self.model = model.eval() + self.pre_processor = pre_processor.eval() + self.post_processor = post_processor.eval() + if name is not None: + self.name = name + + def _tensor_to_type( + self, output: List[Tensor], output_type: str, is_batch: bool = False + ) -> Union[Tensor, List[Tensor], List[Image.Image]]: # type: ignore + if output_type == "torch": + if is_batch: + return stack(output) + elif output_type == "pil": + return [Image.fromarray((tensor_to_image(out_img) * 255).astype(np.uint8)) for out_img in output] # type: ignore + + raise RuntimeError(f"Unsupported output type `{output_type}`.") + + def _save_outputs( + self, outputs: Union[Tensor, List[Tensor]], directory: Optional[str] = None, suffix: str = "" + ) -> None: + """Save the output image(s) to a directory. + + Args: + outputs: output tensor. + directory: directory to save the images. + """ + if directory is None: + name = f"{self.name}_{datetime.datetime.now(tz=datetime.timezone.utc).strftime('%Y%m%d%H%M%S')!s}" + directory = os.path.join("kornia_outputs", name) + + os.makedirs(directory, exist_ok=True) + for i, out_image in enumerate(outputs): + write_image( + os.path.join(directory, f"{str(i).zfill(6)}{suffix}.jpg"), + out_image.mul(255.0).byte(), + ) + logger.info(f"Outputs are saved in {directory}") diff --git a/kornia/models/detection/__init__.py b/kornia/models/detection/__init__.py new file mode 100644 index 0000000000..b511f133c2 --- /dev/null +++ b/kornia/models/detection/__init__.py @@ -0,0 +1,3 @@ +from .base import * +from .rtdetr import * +from .utils import * diff --git a/kornia/models/detection/base.py b/kornia/models/detection/base.py new file mode 100644 index 0000000000..b47025ab7e --- /dev/null +++ b/kornia/models/detection/base.py @@ -0,0 +1,221 @@ +from __future__ import annotations + +from dataclasses import dataclass +from enum import Enum +from typing import Optional, Union + +import torch + +from kornia.core import Tensor, rand +from kornia.core.check import KORNIA_CHECK_SHAPE +from kornia.core.external import PILImage as Image +from kornia.models.base import ModelBase +from kornia.utils.draw import draw_rectangle + +__all__ = [ + "BoundingBoxDataFormat", + "BoundingBox", + "results_from_detections", + "ObjectDetector", + "ObjectDetectorResult", +] + + +class BoundingBoxDataFormat(Enum): + """Enum class that maps bounding box data format.""" + + XYWH = 0 + XYXY = 1 + CXCYWH = 2 + CENTER_XYWH = 2 + + +# NOTE: probably we should use a more generic name like BoundingBox2D +# and add a BoundingBox3D class for 3D bounding boxes. Also for serialization +# we should have an explicit class for each format to make it more production ready +# specially to serialize to protobuf and not saturate at a high rates. + + +@dataclass(frozen=True) +class BoundingBox: + """Bounding box data class. + + Useful for representing bounding boxes in different formats for object detection. + + Args: + data: tuple of bounding box data. The length of the tuple depends on the data format. + data_format: bounding box data format. + """ + + data: tuple[float, float, float, float] + data_format: BoundingBoxDataFormat + + +@dataclass(frozen=True) +class ObjectDetectorResult: + """Object detection result. + + Args: + class_id: class id of the detected object. + confidence: confidence score of the detected object. + bbox: bounding box of the detected object in xywh format. + """ + + class_id: int + confidence: float + bbox: BoundingBox + + +def results_from_detections(detections: Tensor, format: str | BoundingBoxDataFormat) -> list[ObjectDetectorResult]: + """Convert a detection tensor to a list of :py:class:`ObjectDetectorResult`. + + Args: + detections: tensor with shape :math:`(D, 6)`, where :math:`D` is the number of detections in the given image, + :math:`6` represents class id, score, and `xywh` bounding box. + + Returns: + list of :py:class:`ObjectDetectorResult`. + """ + KORNIA_CHECK_SHAPE(detections, ["D", "6"]) + + if isinstance(format, str): + format = BoundingBoxDataFormat[format.upper()] + + results: list[ObjectDetectorResult] = [] + for det in detections: + det = det.squeeze().tolist() + if len(det) != 6: + continue + results.append( + ObjectDetectorResult( + class_id=int(det[0]), + confidence=det[1], + bbox=BoundingBox(data=(det[2], det[3], det[4], det[5]), data_format=format), + ) + ) + return results + + +class ObjectDetector(ModelBase): + """This class wraps an object detection model and performs pre-processing and post-processing.""" + + name: str = "detection" + + @torch.inference_mode() + def forward(self, images: Union[Tensor, list[Tensor]]) -> Union[Tensor, list[Tensor]]: + """Detect objects in a given list of images. + + Args: + images: If list of RGB images. Each image is a Tensor with shape :math:`(3, H, W)`. + If Tensor, a Tensor with shape :math:`(B, 3, H, W)`. + + Returns: + list of detections found in each image. For item in a batch, shape is :math:`(D, 6)`, where :math:`D` is the + number of detections in the given image, :math:`6` represents class id, score, and `xywh` bounding box. + """ + images, images_sizes = self.pre_processor(images) + logits, boxes = self.model(images) + detections = self.post_processor(logits, boxes, images_sizes) + return detections + + def visualize( + self, images: Union[Tensor, list[Tensor]], detections: Optional[Tensor] = None, output_type: str = "torch" + ) -> Union[Tensor, list[Tensor], list[Image.Image]]: # type: ignore + """Very simple drawing. + + Needs to be more fancy later. + """ + dets = detections or self.forward(images) + output = [] + for image, detection in zip(images, dets): + out_img = image[None].clone() + for out in detection: + out_img = draw_rectangle( + out_img, + torch.Tensor([[[out[-4], out[-3], out[-4] + out[-2], out[-3] + out[-1]]]]), + ) + output.append(out_img[0]) + + return self._tensor_to_type(output, output_type, is_batch=isinstance(images, Tensor)) + + def save( + self, images: Union[Tensor, list[Tensor]], detections: Optional[Tensor] = None, directory: Optional[str] = None + ) -> None: + """Saves the output image(s) to a directory. + + Args: + images: input tensor. + detections: detection tensor. + directory: directory to save the images. + """ + outputs = self.visualize(images, detections) + self._save_outputs(outputs, directory) + + def to_onnx( + self, + onnx_name: Optional[str] = None, + image_size: Optional[int] = 640, + include_pre_and_post_processor: bool = True, + ) -> str: + """Exports an RT-DETR object detection model to ONNX format. + + Either `model_name` or `config` must be provided. If neither is provided, + a default pretrained model (`rtdetr_r18vd`) will be built. + + Args: + onnx_name: + The name of the ONNX model. + image_size: + The size to which input images will be resized during preprocessing. + If None, image_size will be dynamic. + For RTDETR, recommended scales include [480, 512, 544, 576, 608, 640, 672, 704, 736, 768, 800]. + include_pre_and_post_processor: + Whether to include the pre-processor and post-processor in the exported model. + + Returns: + - The name of the ONNX model. + """ + if onnx_name is None: + onnx_name = f"kornia_{self.name}_{image_size}.onnx" + + if image_size is None: + val_image = rand(1, 3, 640, 640) + dynamic_axes = {"input": {0: "batch_size"}, "output": {0: "batch_size"}} + else: + val_image = rand(1, 3, image_size, image_size) + dynamic_axes = {"input": {0: "batch_size", 2: "height", 3: "width"}, "output": {0: "batch_size"}} + + torch.onnx.export( + self if include_pre_and_post_processor else self.model, + val_image, + onnx_name, + export_params=True, + opset_version=17, + do_constant_folding=True, + input_names=["input"], + output_names=["output"], + dynamic_axes=dynamic_axes, + ) + + return onnx_name + + def compile( + self, + *, + fullgraph: bool = False, + dynamic: bool = False, + backend: str = "inductor", + mode: Optional[str] = None, + options: Optional[dict[str, str | int | bool]] = None, + disable: bool = False, + ) -> None: + """Compile the internal object detection model with :py:func:`torch.compile()`.""" + self.model = torch.compile( # type: ignore + self.model, + fullgraph=fullgraph, + dynamic=dynamic, + backend=backend, + mode=mode, + options=options, + disable=disable, + ) diff --git a/kornia/models/detection/rtdetr.py b/kornia/models/detection/rtdetr.py new file mode 100644 index 0000000000..27c6c51be0 --- /dev/null +++ b/kornia/models/detection/rtdetr.py @@ -0,0 +1,86 @@ +import warnings +from typing import Optional + +import torch +from torch import nn + +from kornia.contrib.models.rt_detr import DETRPostProcessor +from kornia.contrib.models.rt_detr.model import RTDETR, RTDETRConfig +from kornia.models.detection.base import ObjectDetector +from kornia.models.utils import ResizePreProcessor + +__all__ = ["RTDETRDetectorBuilder"] + + +class RTDETRDetectorBuilder: + """A builder class for constructing RT-DETR object detection models. + + This class provides static methods to: + - Build an object detection model from a model name or configuration. + - Export the model to ONNX format for inference. + + .. code-block:: python + + image = kornia.utils.sample.get_sample_images()[0][None] + model = RTDETRDetectorBuilder.build() + model.save(image) + """ + + @staticmethod + def build( + model_name: Optional[str] = None, + config: Optional[RTDETRConfig] = None, + pretrained: bool = True, + image_size: Optional[int] = 640, + confidence_threshold: Optional[float] = None, + confidence_filtering: Optional[bool] = None, + ) -> ObjectDetector: + """Builds and returns an RT-DETR object detector model. + + Either `model_name` or `config` must be provided. If neither is provided, + a default pretrained model (`rtdetr_r18vd`) will be built. + + Args: + model_name: + Name of the RT-DETR model to load. Can be one of the available pretrained models. + Including 'rtdetr_r18vd', 'rtdetr_r34vd', 'rtdetr_r50vd_m', 'rtdetr_r50vd', 'rtdetr_r101vd'. + config: + A custom configuration object for building the RT-DETR model. + pretrained: + Whether to load a pretrained version of the model (applies when `model_name` is provided). + image_size: + The size to which input images will be resized during preprocessing. + If None, no resizing will be performed before passing to the model. Recommended scales include + [480, 512, 544, 576, 608, 640, 672, 704, 736, 768, 800]. + + Returns: + ObjectDetector + An object detector instance initialized with the specified model, preprocessor, and post-processor. + """ + if model_name is not None and config is not None: + raise ValueError("Either `model_name` or `config` should be `None`.") + + if config is not None: + model = RTDETR.from_config(config) + elif model_name is not None: + if pretrained: + model = RTDETR.from_pretrained(model_name) + else: + model = RTDETR.from_name(model_name) + else: + warnings.warn("No `model_name` or `config` found. Will build pretrained `rtdetr_r18vd`.") + model = RTDETR.from_pretrained("rtdetr_r18vd") + + if confidence_threshold is None: + confidence_threshold = config.confidence_threshold if config is not None else 0.3 + + return ObjectDetector( + model, + ResizePreProcessor(image_size, image_size) if image_size is not None else nn.Identity(), + DETRPostProcessor( + confidence_threshold=confidence_threshold, + confidence_filtering=confidence_filtering or not torch.onnx.is_in_onnx_export(), + num_classes=model.decoder.num_classes, + num_top_queries=model.decoder.num_queries, + ), + ) diff --git a/kornia/models/detection/utils.py b/kornia/models/detection/utils.py new file mode 100644 index 0000000000..8b23686246 --- /dev/null +++ b/kornia/models/detection/utils.py @@ -0,0 +1,96 @@ +from typing import Any, ClassVar, List, Optional, Tuple, Union + +from kornia.core import Module, ONNXExportMixin, Tensor, rand, tensor + +__all__ = ["BoxFiltering"] + + +class BoxFiltering(Module, ONNXExportMixin): + """Filter boxes according to the desired threshold. + + Args: + confidence_threshold: an 0-d scalar that represents the desired threshold. + classes_to_keep: a 1-d list of classes to keep. If None, keep all classes. + filter_as_zero: whether to filter boxes as zero. + """ + + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, -1, 6] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, -1, 6] + ONNX_EXPORT_PSEUDO_SHAPE: ClassVar[List[int]] = [5, 20, 6] + + def __init__( + self, + confidence_threshold: Optional[Union[Tensor, float]] = None, + classes_to_keep: Optional[Union[Tensor, List[int]]] = None, + filter_as_zero: bool = False, + ) -> None: + super().__init__() + self.filter_as_zero = filter_as_zero + self.classes_to_keep = None + self.confidence_threshold = None + if classes_to_keep is not None: + self.classes_to_keep = classes_to_keep if isinstance(classes_to_keep, Tensor) else tensor(classes_to_keep) + if confidence_threshold is not None: + self.confidence_threshold = ( + confidence_threshold or confidence_threshold + if isinstance(confidence_threshold, Tensor) + else tensor(confidence_threshold) + ) + + def forward( + self, boxes: Tensor, confidence_threshold: Optional[Tensor] = None, classes_to_keep: Optional[Tensor] = None + ) -> Union[Tensor, List[Tensor]]: + """Filter boxes according to the desired threshold. + + To be ONNX-friendly, the inputs for direct forwarding need to be all tensors. + + Args: + boxes: [B, D, 6], where B is the batchsize, D is the number of detections in the image, + 6 represent (class_id, confidence_score, x, y, w, h). + confidence_threshold: an 0-d scalar that represents the desired threshold. + classes_to_keep: a 1-d tensor of classes to keep. If None, keep all classes. + + Returns: + Union[Tensor, List[Tensor]] + If `filter_as_zero` is True, return a tensor of shape [D, 6], where D is the total number of + detections as input. + If `filter_as_zero` is False, return a list of tensors of shape [D, 6], where D is the number of + valid detections for each element in the batch. + """ + # Apply confidence filtering + confidence_threshold = confidence_threshold or self.confidence_threshold or 0.0 # If None, use 0 as threshold + confidence_mask = boxes[:, :, 1] > confidence_threshold # [B, D] + + # Apply class filtering + classes_to_keep = classes_to_keep or self.classes_to_keep + if classes_to_keep is not None: + class_ids = boxes[:, :, 0:1] # [B, D, 1] + classes_to_keep = classes_to_keep.view(1, 1, -1) # [1, 1, C] for broadcasting + class_mask = (class_ids == classes_to_keep).any(dim=-1) # [B, D] + else: + # If no class filtering is needed, just use a mask of all `True` + class_mask = (confidence_mask * 0 + 1).bool() + + # Combine the confidence and class masks + combined_mask = confidence_mask & class_mask # [B, D] + + if self.filter_as_zero: + filtered_boxes = boxes * combined_mask[:, :, None] + return filtered_boxes + + filtered_boxes_list = [] + for i in range(boxes.shape[0]): + box = boxes[i] + mask = combined_mask[i] # [D] + valid_boxes = box[mask] + filtered_boxes_list.append(valid_boxes) + + return filtered_boxes_list + + def _create_dummy_input(self, input_shape: List[int]) -> Union[Tuple[Any, ...], Tensor]: + pseudo_input = rand( + *[(self.ONNX_EXPORT_PSEUDO_SHAPE[i] if dim == -1 else dim) for i, dim in enumerate(input_shape)] + ) + if self.confidence_threshold is None: + return pseudo_input, 0.1 + return pseudo_input diff --git a/kornia/models/detector/__init__.py b/kornia/models/detector/__init__.py deleted file mode 100644 index 55a62efc22..0000000000 --- a/kornia/models/detector/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .rtdetr import * diff --git a/kornia/models/detector/rtdetr.py b/kornia/models/detector/rtdetr.py deleted file mode 100644 index 4bc1c6bd2d..0000000000 --- a/kornia/models/detector/rtdetr.py +++ /dev/null @@ -1,154 +0,0 @@ -import warnings -from typing import Optional - -import torch -from torch import nn - -from kornia.contrib.models.rt_detr import DETRPostProcessor -from kornia.contrib.models.rt_detr.model import RTDETR, RTDETRConfig -from kornia.contrib.object_detection import ObjectDetector, ResizePreProcessor -from kornia.core import rand - -__all__ = ["RTDETRDetectorBuilder"] - - -class RTDETRDetectorBuilder: - """A builder class for constructing RT-DETR object detection models. - - This class provides static methods to: - - Build an object detection model from a model name or configuration. - - Export the model to ONNX format for inference. - """ - - @staticmethod - def build( - model_name: Optional[str] = None, - config: Optional[RTDETRConfig] = None, - pretrained: bool = True, - image_size: Optional[int] = 640, - confidence_threshold: float = 0.5, - confidence_filtering: Optional[bool] = None, - ) -> ObjectDetector: - """Builds and returns an RT-DETR object detector model. - - Either `model_name` or `config` must be provided. If neither is provided, - a default pretrained model (`rtdetr_r18vd`) will be built. - - Args: - model_name: - Name of the RT-DETR model to load. Can be one of the available pretrained models. - Including 'rtdetr_r18vd', 'rtdetr_r34vd', 'rtdetr_r50vd_m', 'rtdetr_r50vd', 'rtdetr_r101vd'. - config: - A custom configuration object for building the RT-DETR model. - pretrained: - Whether to load a pretrained version of the model (applies when `model_name` is provided). - image_size: - The size to which input images will be resized during preprocessing. - If None, no resizing will be performed before passing to the model. Recommended scales include - [480, 512, 544, 576, 608, 640, 672, 704, 736, 768, 800]. - confidence_threshold: - The confidence threshold used during post-processing to filter detections. - confidence_filtering: - If to perform filtering on resulting boxes. If None, the filtering will be blocked when exporting - to ONNX, while it would perform as per confidence_threshold when build the model. - - Returns: - ObjectDetector - An object detector instance initialized with the specified model, preprocessor, and post-processor. - """ - if model_name is not None and config is not None: - raise ValueError("Either `model_name` or `config` should be `None`.") - - if config is not None: - model = RTDETR.from_config(config) - elif model_name is not None: - if pretrained: - model = RTDETR.from_pretrained(model_name) - else: - model = RTDETR.from_name(model_name) - else: - warnings.warn("No `model_name` or `config` found. Will build pretrained `rtdetr_r18vd`.") - model = RTDETR.from_pretrained("rtdetr_r18vd") - - return ObjectDetector( - model, - ResizePreProcessor((image_size, image_size)) if image_size is not None else nn.Identity(), - DETRPostProcessor( - confidence_threshold, - num_classes=config.num_classes if config is not None else 80, - confidence_filtering=confidence_filtering or not torch.onnx.is_in_onnx_export, - ), - ) - - @staticmethod - def to_onnx( - model_name: Optional[str] = None, - onnx_name: Optional[str] = None, - config: Optional[RTDETRConfig] = None, - pretrained: bool = True, - image_size: Optional[int] = 640, - confidence_threshold: float = 0.5, - confidence_filtering: Optional[bool] = None, - ) -> tuple[str, ObjectDetector]: - """Exports an RT-DETR object detection model to ONNX format. - - Either `model_name` or `config` must be provided. If neither is provided, - a default pretrained model (`rtdetr_r18vd`) will be built. - - Args: - model_name: - Name of the RT-DETR model to load. Can be one of the available pretrained models. - config: - A custom configuration object for building the RT-DETR model. - pretrained: - Whether to load a pretrained version of the model (applies when `model_name` is provided). - image_size: - The size to which input images will be resized during preprocessing. - If None, image_size will be dynamic. Recommended scales include - [480, 512, 544, 576, 608, 640, 672, 704, 736, 768, 800]. - confidence_threshold: - The confidence threshold used during post-processing to filter detections. - confidence_filtering: - If to perform filtering on resulting boxes. If None, the filtering will be blocked when exporting - to ONNX, while it would perform as per confidence_threshold when build the model. - - Returns: - - The name of the ONNX model. - - The exported torch model. - """ - - detector = RTDETRDetectorBuilder.build( - model_name=model_name, - config=config, - pretrained=pretrained, - image_size=image_size, - confidence_threshold=confidence_threshold, - confidence_filtering=confidence_filtering, - ) - if onnx_name is None: - _model_name = model_name - if model_name is None and config is not None: - _model_name = "rtdetr-customized" - elif model_name is None and config is None: - _model_name = "rtdetr_r18vd" - onnx_name = f"Kornia-RTDETR-{_model_name}-{image_size}.onnx" - - if image_size is None: - val_image = rand(1, 3, 640, 640) - else: - val_image = rand(1, 3, image_size, image_size) - - dynamic_axes = {"input": {0: "batch_size", 2: "height", 3: "width"}, "output": {0: "batch_size"}} - torch.onnx.export( - detector, - val_image, - onnx_name, - export_params=True, - opset_version=17, - do_constant_folding=True, - input_names=["input"], - output_names=["output"], - dynamic_axes=dynamic_axes, - ) - - return onnx_name, detector diff --git a/kornia/models/edge_detection/__init__.py b/kornia/models/edge_detection/__init__.py new file mode 100644 index 0000000000..c9359ff78d --- /dev/null +++ b/kornia/models/edge_detection/__init__.py @@ -0,0 +1,2 @@ +from .base import * +from .dexined import * diff --git a/kornia/models/edge_detection/base.py b/kornia/models/edge_detection/base.py new file mode 100644 index 0000000000..17b8cf3a7b --- /dev/null +++ b/kornia/models/edge_detection/base.py @@ -0,0 +1,73 @@ +from typing import Optional, Union + +from kornia.color.gray import grayscale_to_rgb +from kornia.core import Tensor +from kornia.core.external import PILImage as Image +from kornia.models.base import ModelBase + +__all__ = ["EdgeDetector"] + + +class EdgeDetector(ModelBase): + """EdgeDetector is a module that wraps an edge detection model. + + This module uses EdgeDetectionModel library for edge detection. + """ + + name: str = "edge_detection" + + def forward(self, images: Union[Tensor, list[Tensor]]) -> Union[Tensor, list[Tensor]]: + """Forward pass of the semantic segmentation model. + + Args: + images: input tensor. + + Returns: + output tensor. + """ + images, image_sizes = self.pre_processor(images) + out_images = self.model(images) + return self.post_processor(out_images, image_sizes) + + def visualize( + self, + images: Union[Tensor, list[Tensor]], + edge_maps: Optional[Union[Tensor, list[Tensor]]] = None, + output_type: str = "torch", + ) -> Union[Tensor, list[Tensor], list[Image.Image]]: # type: ignore + """Draw the segmentation results. + + Args: + images: input tensor. + output_type: type of the output. + + Returns: + output tensor. + """ + if edge_maps is None: + edge_maps = self.forward(images) + output = [] + for edge_map in edge_maps: + output.append(grayscale_to_rgb(edge_map)[0]) + + return self._tensor_to_type(output, output_type, is_batch=isinstance(images, Tensor)) + + def save( + self, + images: Union[Tensor, list[Tensor]], + edge_maps: Optional[Union[Tensor, list[Tensor]]] = None, + directory: Optional[str] = None, + output_type: str = "torch", + ) -> None: + """Save the segmentation results. + + Args: + images: input tensor. + output_type: type of the output. + + Returns: + output tensor. + """ + outputs = self.visualize(images, edge_maps, output_type) + self._save_outputs(images, directory, suffix="_src") + self._save_outputs(outputs, directory, suffix="_edge") diff --git a/kornia/models/edge_detection/dexined.py b/kornia/models/edge_detection/dexined.py new file mode 100644 index 0000000000..f56904847b --- /dev/null +++ b/kornia/models/edge_detection/dexined.py @@ -0,0 +1,67 @@ +from typing import Optional, Tuple + +import torch +from torch import nn + +from kornia.core import rand, tensor +from kornia.enhance.normalize import Normalize +from kornia.filters.dexined import DexiNed +from kornia.models.edge_detection.base import EdgeDetector +from kornia.models.utils import ResizePostProcessor, ResizePreProcessor + + +class DexiNedBuilder: + """DexiNedBuilder is a class that builds a DexiNed model. + + .. code-block:: python + + image = kornia.utils.sample.get_sample_images()[0][None] + model = DexiNedBuilder.build() + model.save(image) + """ + + @staticmethod + def build(model_name: str = "dexined", pretrained: bool = True, image_size: Optional[int] = 352) -> EdgeDetector: + if model_name.lower() == "dexined": + # Normalize then scale to [0, 255] + norm = Normalize(mean=tensor([[0.485, 0.456, 0.406]]), std=tensor([[1.0 / 255.0] * 3])) + model = nn.Sequential(norm, DexiNed(pretrained=pretrained), nn.Sigmoid()) + else: + raise ValueError(f"Model {model_name} not found. Please choose from 'DexiNed'.") + + return EdgeDetector( + model, + ResizePreProcessor(image_size, image_size) if image_size is not None else nn.Identity(), + ResizePostProcessor() if image_size is not None else nn.Identity(), + ) + + @staticmethod + def to_onnx( + model_name: str = "dexined", + onnx_name: Optional[str] = None, + pretrained: bool = True, + image_size: Optional[int] = 352, + ) -> Tuple[str, EdgeDetector]: + edge_detector = DexiNedBuilder.build(model_name, pretrained, image_size) + if onnx_name is None: + onnx_name = f"kornia_{model_name.lower()}_{image_size}.onnx" + + if image_size is None: + val_image = rand(1, 3, 352, 352) + else: + val_image = rand(1, 3, image_size, image_size) + + dynamic_axes = {"input": {0: "batch_size", 2: "height", 3: "width"}, "output": {0: "batch_size"}} + torch.onnx.export( + edge_detector, + val_image, + onnx_name, + export_params=True, + opset_version=17, + do_constant_folding=True, + input_names=["input"], + output_names=["output"], + dynamic_axes=dynamic_axes, + ) + + return onnx_name, edge_detector diff --git a/kornia/models/segmentation/__init__.py b/kornia/models/segmentation/__init__.py new file mode 100644 index 0000000000..1bc6249871 --- /dev/null +++ b/kornia/models/segmentation/__init__.py @@ -0,0 +1 @@ +from .segmentation_models import * diff --git a/kornia/models/segmentation/base.py b/kornia/models/segmentation/base.py new file mode 100644 index 0000000000..053eb5c42b --- /dev/null +++ b/kornia/models/segmentation/base.py @@ -0,0 +1,51 @@ +from typing import Union + +from kornia.core import Tensor +from kornia.core.external import PILImage as Image +from kornia.models.base import ModelBase + + +class SemanticSegmentation(ModelBase): + """Semantic Segmentation is a module that wraps a semantic segmentation model. + + This module uses SegmentationModel library for semantic segmentation. + """ + + def forward(self, images: Union[Tensor, list[Tensor]]) -> Union[Tensor, list[Tensor]]: + """Forward pass of the semantic segmentation model. + + Args: + x: input tensor. + + Returns: + output tensor. + """ + images = self.pre_processor(images) + output = self.model(images) + return self.post_processor(output) + + def visualize( + self, images: Union[Tensor, list[Tensor]], output_type: str = "torch" + ) -> Union[Tensor, list[Tensor], list[Image.Image]]: # type: ignore + """Draw the segmentation results. + + Args: + images: input tensor. + output_type: type of the output. + + Returns: + output tensor. + """ + raise NotImplementedError("Visualization is not implemented for this model.") + + def save(self, images: Union[Tensor, list[Tensor]], output_type: str = "torch") -> None: + """Save the segmentation results. + + Args: + images: input tensor. + output_type: type of the output. + + Returns: + output tensor. + """ + raise NotImplementedError("Saving is not implemented for this model.") diff --git a/kornia/models/segmentation/segmentation_models.py b/kornia/models/segmentation/segmentation_models.py new file mode 100644 index 0000000000..78693e84a4 --- /dev/null +++ b/kornia/models/segmentation/segmentation_models.py @@ -0,0 +1,87 @@ +from typing import Any, ClassVar, List, Optional + +import kornia +from kornia.core import Module, Tensor, ones_like, tensor, zeros_like +from kornia.core.external import segmentation_models_pytorch as smp +from kornia.core.module import ONNXExportMixin + + +class SegmentationModels(Module, ONNXExportMixin): + """SegmentationModel is a module that wraps a segmentation model. + + This module uses SegmentationModel library for segmentation. + + Args: + model_name: Name of the model to use. Valid options are: + "Unet", "UnetPlusPlus", "MAnet", "LinkNet", "FPN", "PSPNet", "PAN", "DeepLabV3", "DeepLabV3Plus". + encoder_name: Name of the encoder to use. + encoder_depth: Depth of the encoder. + encoder_weights: Weights of the encoder. + decoder_channels: Number of channels in the decoder. + in_channels: Number of channels in the input. + classes: Number of classes to predict. + **kwargs: Additional arguments to pass to the model. Detailed arguments can be found at: + https://github.com/qubvel-org/segmentation_models.pytorch/tree/main/segmentation_models_pytorch/decoders + + Note: + Only encoder weights are available. + Pretrained weights for the whole model are not available. + """ + + ONNX_DEFAULT_INPUTSHAPE: ClassVar[List[int]] = [-1, 3, -1, -1] + ONNX_DEFAULT_OUTPUTSHAPE: ClassVar[List[int]] = [-1, -1, -1, -1] + + def __init__( + self, + model_name: str = "Unet", + encoder_name: str = "resnet34", + encoder_weights: Optional[str] = "imagenet", + in_channels: int = 3, + classes: int = 1, + **kwargs: Any, + ) -> None: + super().__init__() + self.preproc_params = smp.encoders.get_preprocessing_params(encoder_name) # type: ignore + self.segmentation_model = getattr(smp, model_name)( + encoder_name=encoder_name, + encoder_weights=encoder_weights, + in_channels=in_channels, + classes=classes, + **kwargs, + ) + + def preprocessing(self, input: Tensor) -> Tensor: + # Ensure the color space transformation is ONNX-friendly + input_space = self.preproc_params["input_space"] + if input_space == "BGR": + input = kornia.color.rgb.bgr_to_rgb(input) + elif input_space == "RGB": + pass + else: + raise ValueError(f"Unsupported input space: {input_space}") + + # Normalize input range if needed + input_range = self.preproc_params["input_range"] + if input_range[1] == 255: + input = input * 255.0 + elif input_range[1] == 1: + pass + else: + raise ValueError(f"Unsupported input range: {input_range}") + + # Handle mean and std normalization + if self.preproc_params["mean"] is not None: + mean = tensor([self.preproc_params["mean"]], device=input.device) + else: + mean = zeros_like(input) + + if self.preproc_params["std"] is not None: + std = tensor([self.preproc_params["std"]], device=input.device) + else: + std = ones_like(input) + + return kornia.enhance.normalize(input, mean, std) + + def forward(self, input: Tensor) -> Tensor: + input = self.preprocessing(input) + return self.segmentation_model(input) diff --git a/kornia/models/tracking/__init__.py b/kornia/models/tracking/__init__.py new file mode 100644 index 0000000000..a68c9dce90 --- /dev/null +++ b/kornia/models/tracking/__init__.py @@ -0,0 +1 @@ +from .boxmot_tracker import * diff --git a/kornia/models/tracking/boxmot_tracker.py b/kornia/models/tracking/boxmot_tracker.py new file mode 100644 index 0000000000..254063fd08 --- /dev/null +++ b/kornia/models/tracking/boxmot_tracker.py @@ -0,0 +1,163 @@ +import datetime +import logging +import os +from pathlib import Path +from typing import Any, Optional, Union + +from kornia.config import kornia_config +from kornia.core import Tensor, tensor +from kornia.core.external import boxmot +from kornia.core.external import numpy as np +from kornia.io import write_image +from kornia.models.detection.base import ObjectDetector +from kornia.models.detection.rtdetr import RTDETRDetectorBuilder +from kornia.utils.image import tensor_to_image + +__all__ = ["BoxMotTracker"] + +logger = logging.getLogger(__name__) + + +class BoxMotTracker: + """BoxMotTracker is a module that wraps a detector and a tracker model. + + This module uses BoxMot library for tracking. + + Args: + detector: ObjectDetector: The detector model. + tracker_model_name: The name of the tracker model. Valid options are: + - "BoTSORT" + - "DeepOCSORT" + - "OCSORT" + - "HybridSORT" + - "ByteTrack" + - "StrongSORT" + - "ImprAssoc" + tracker_model_weights: Path to the model weights for ReID (Re-Identification). + device: Device on which to run the model (e.g., 'cpu' or 'cuda'). + fp16: Whether to use half-precision (fp16) for faster inference on compatible devices. + per_class: Whether to perform per-class tracking + track_high_thresh: High threshold for detection confidence. + Detections above this threshold are used in the first association round. + track_low_thresh: Low threshold for detection confidence. + Detections below this threshold are ignored. + new_track_thresh: Threshold for creating a new track. + Detections above this threshold will be considered as potential new tracks. + track_buffer: Number of frames to keep a track alive after it was last detected. + match_thresh: Threshold for the matching step in data association. + proximity_thresh: Threshold for IoU (Intersection over Union) distance in first-round association. + appearance_thresh: Threshold for appearance embedding distance in the ReID module. + cmc_method: Method for correcting camera motion. Options include "sof" (simple optical flow). + frame_rate: Frame rate of the video being processed. Used to scale the track buffer size. + fuse_first_associate: Whether to fuse appearance and motion information during the first association step. + with_reid: Whether to use ReID (Re-Identification) features for association. + + .. code-block:: python + + import kornia + image = kornia.utils.sample.get_sample_images()[0][None] + model = BoxMotTracker() + for i in range(4): # At least 4 frames are needed to initialize the tracking position + model.update(image) + model.save(image) + + .. note:: + At least 4 frames are needed to initialize the tracking position. + """ + + name: str = "boxmot_tracker" + + def __init__( + self, + detector: Union[ObjectDetector, str] = "rtdetr_r18vd", + tracker_model_name: str = "DeepOCSORT", + tracker_model_weights: str = "osnet_x0_25_msmt17.pt", + device: str = "cpu", + fp16: bool = False, + **kwargs: Any, + ) -> None: + super().__init__() + if isinstance(detector, str): + if detector.startswith("rtdetr"): + detector = RTDETRDetectorBuilder.build(model_name=detector) + else: + raise ValueError( + f"Detector `{detector}` not available. You may pass an ObjectDetector instance instead." + ) + self.detector = detector + os.makedirs(f"{kornia_config.hub_models_dir}/boxmot", exist_ok=True) + self.tracker = getattr(boxmot, tracker_model_name)( + model_weights=Path(os.path.join(f"{kornia_config.hub_models_dir}/boxmot", tracker_model_weights)), + device=device, + fp16=fp16, + **kwargs, + ) + + def update(self, image: Tensor) -> None: + """Update the tracker with a new image. + + Args: + image: The input image. + """ + + if not (image.ndim == 4 and image.shape[0] == 1) and not image.ndim == 3: + raise ValueError(f"Input tensor must be of shape (1, 3, H, W) or (3, H, W). Got {image.shape}") + + if image.ndim == 3: + image = image.unsqueeze(0) + + detections_raw: Union[Tensor, list[Tensor]] = self.detector(image) + + detections = detections_raw[0].cpu().numpy() # Batch size is 1 + + detections = np.array( # type: ignore + [ + detections[:, 2], + detections[:, 3], + detections[:, 2] + detections[:, 4], + detections[:, 3] + detections[:, 5], + detections[:, 1], + detections[:, 0], + ] + ).T + + if detections.shape[0] == 0: + # empty N X (x, y, x, y, conf, cls) + detections = np.empty((0, 6)) # type: ignore + + frame_raw = (tensor_to_image(image) * 255).astype(np.uint8) + # --> M X (x, y, x, y, id, conf, cls, ind) + return self.tracker.update(detections, frame_raw) + + def visualize(self, image: Tensor, show_trajectories: bool = True) -> Tensor: + """Visualize the results of the tracker. + + Args: + image: The input image. + show_trajectories: Whether to show the trajectories. + + Returns: + The image with the results of the tracker. + """ + frame_raw = (tensor_to_image(image) * 255).astype(np.uint8) + self.tracker.plot_results(frame_raw, show_trajectories=show_trajectories) + + return tensor(frame_raw).permute(2, 0, 1) + + def save(self, image: Tensor, show_trajectories: bool = True, directory: Optional[str] = None) -> None: + """Save the model to ONNX format. + + Args: + image: The input image. + """ + if directory is None: + name = f"{self.name}_{datetime.datetime.now(tz=datetime.timezone.utc).strftime('%Y%m%d%H%M%S')!s}" + directory = os.path.join("kornia_outputs", name) + output = self.visualize(image, show_trajectories=show_trajectories) + + os.makedirs(directory, exist_ok=True) + write_image( + os.path.join(directory, f"{str(0).zfill(6)}.jpg"), + output.byte(), + ) + logger.info(f"Outputs are saved in {directory}") diff --git a/kornia/models/utils.py b/kornia/models/utils.py new file mode 100644 index 0000000000..34e95b87e7 --- /dev/null +++ b/kornia/models/utils.py @@ -0,0 +1,78 @@ +import warnings +from typing import List, Tuple, Union + +import torch +from torch import Tensor + +from kornia.core import Module, concatenate +from kornia.geometry.transform import resize + +__all__ = ["ResizePreProcessor", "ResizePostProcessor"] + + +class ResizePreProcessor(Module): + """This module resizes a list of image tensors to the given size. + + Additionally, also returns the original image sizes for further post-processing. + """ + + def __init__(self, height: int, width: int, interpolation_mode: str = "bilinear") -> None: + """ + Args: + height: height of the resized image. + width: width of the resized image. + interpolation_mode: interpolation mode for image resizing. Supported values: ``nearest``, ``bilinear``, + ``bicubic``, ``area``, and ``nearest-exact``. + """ + super().__init__() + self.size = (height, width) + self.interpolation_mode = interpolation_mode + + def forward(self, imgs: Union[Tensor, List[Tensor]]) -> Tuple[Tensor, Tensor]: + """ + Returns: + resized_imgs: resized images in a batch. + original_sizes: the original image sizes of (height, width). + """ + # TODO: support other input formats e.g. file path, numpy + resized_imgs: list[Tensor] = [] + + iters = len(imgs) if isinstance(imgs, list) else imgs.shape[0] + original_sizes = imgs[0].new_zeros((iters, 2)) + for i in range(iters): + img = imgs[i] + original_sizes[i, 0] = img.shape[-2] # Height + original_sizes[i, 1] = img.shape[-1] # Width + resized_imgs.append(resize(img[None], size=self.size, interpolation=self.interpolation_mode)) + return concatenate(resized_imgs), original_sizes + + +class ResizePostProcessor(Module): + def __init__(self, interpolation_mode: str = "bilinear") -> None: + super().__init__() + self.interpolation_mode = interpolation_mode + + def forward(self, imgs: Union[Tensor, List[Tensor]], original_sizes: Tensor) -> Union[Tensor, List[Tensor]]: + """ + Returns: + resized_imgs: resized images in a batch. + original_sizes: the original image sizes of (height, width). + """ + # TODO: support other input formats e.g. file path, numpy + resized_imgs: list[Tensor] = [] + + if torch.onnx.is_in_onnx_export(): + warnings.warn( + "ResizePostProcessor is not supported in ONNX export. " + "The output will not be resized back to the original size." + ) + return imgs + + iters = len(imgs) if isinstance(imgs, list) else imgs.shape[0] + for i in range(iters): + img = imgs[i] + size = original_sizes[i] + resized_imgs.append( + resize(img[None], size=size.cpu().long().numpy().tolist(), interpolation=self.interpolation_mode) + ) + return resized_imgs diff --git a/kornia/onnx/__init__.py b/kornia/onnx/__init__.py new file mode 100644 index 0000000000..40358f85b2 --- /dev/null +++ b/kornia/onnx/__init__.py @@ -0,0 +1,2 @@ +from .sequential import * +from .utils import * diff --git a/kornia/onnx/sequential.py b/kornia/onnx/sequential.py new file mode 100644 index 0000000000..52a3806d83 --- /dev/null +++ b/kornia/onnx/sequential.py @@ -0,0 +1,212 @@ +from typing import Any, List, Optional, Tuple, Union + +from kornia.config import kornia_config +from kornia.core.external import numpy as np +from kornia.core.external import onnx +from kornia.core.external import onnxruntime as ort + +from .utils import ONNXLoader + +__all__ = ["ONNXSequential", "load"] + + +class ONNXSequential: + f"""ONNXSequential to chain multiple ONNX operators together. + + Args: + *args: A variable number of ONNX models (either ONNX ModelProto objects or file paths). + For Hugging Face-hosted models, use the format 'hf://model_name'. Valid `model_name` can be found on + https://huggingface.co/kornia/ONNX_models. + providers: A list of execution providers for ONNXRuntime + (e.g., ['CUDAExecutionProvider', 'CPUExecutionProvider']). + session_options: Optional ONNXRuntime session options for optimizing the session. + io_maps: An optional list of list of tuples specifying input-output mappings for combining models. + If None, we assume the default input name and output name are "input" and "output" accordingly, and + only one input and output node for each graph. + If not None, `io_maps[0]` shall represent the `io_map` for combining the first and second ONNX models. + cache_dir: The directory where ONNX models are cached locally (only for downloading from HuggingFace). + Defaults to None, which will use a default `{kornia_config.hub_onnx_dir}` directory. + """ + + def __init__( + self, + *args: Union["onnx.ModelProto", str], # type:ignore + providers: Optional[List[str]] = None, + session_options: Optional["ort.SessionOptions"] = None, # type:ignore + io_maps: Optional[List[Tuple[str, str]]] = None, + cache_dir: Optional[str] = None, + ) -> None: + self.onnx_loader = ONNXLoader(cache_dir) + self.operators = args + self._combined_op = self._combine(io_maps) + self._session = self.create_session(providers=providers, session_options=session_options) + + def _load_op(self, arg: Union["onnx.ModelProto", str]) -> "onnx.ModelProto": # type:ignore + """Loads an ONNX model, either from a file path or use the provided ONNX ModelProto. + + Args: + arg: Either an ONNX ModelProto object or a file path to an ONNX model. + + Returns: + onnx.ModelProto: The loaded ONNX model. + """ + if isinstance(arg, str): + return self.onnx_loader.load_model(arg) + return arg + + def _combine(self, io_maps: Optional[List[Tuple[str, str]]] = None) -> "onnx.ModelProto": # type:ignore + """Combine the provided ONNX models into a single ONNX graph. Optionally, map inputs and outputs between + operators using the `io_map`. + + Args: + io_maps: + A list of list of tuples representing input-output mappings for combining the models. + Example: [[(model1_output_name, model2_input_name)], [(model2_output_name, model3_input_name)]]. + + Returns: + onnx.ModelProto: The combined ONNX model as a single ONNX graph. + """ + if len(self.operators) == 0: + raise ValueError("No operators found.") + + combined_op = self._load_op(self.operators[0]) + combined_op = onnx.compose.add_prefix(combined_op, prefix=f"K{str(0).zfill(2)}-") # type:ignore + + for i, op in enumerate(self.operators[1:]): + next_op = onnx.compose.add_prefix(self._load_op(op), prefix=f"K{str(i + 1).zfill(2)}-") # type:ignore + if io_maps is None: + io_map = [(f"K{str(i).zfill(2)}-output", f"K{str(i + 1).zfill(2)}-input")] + else: + io_map = [(f"K{str(i).zfill(2)}-{it[0]}", f"K{str(i + 1).zfill(2)}-{it[1]}") for it in io_maps[i]] + combined_op = onnx.compose.merge_models(combined_op, next_op, io_map=io_map) # type:ignore + + return combined_op + + def export(self, file_path: str) -> None: + """Export the combined ONNX model to a file. + + Args: + file_path: + The file path to export the combined ONNX model. + """ + onnx.save(self._combined_op, file_path) # type:ignore + + def create_session( + self, + providers: Optional[List[str]] = None, + session_options: Optional["ort.SessionOptions"] = None, # type:ignore + ) -> "ort.InferenceSession": # type:ignore + """Create an optimized ONNXRuntime InferenceSession for the combined model. + + Args: + providers: + Execution providers for ONNXRuntime (e.g., ['CUDAExecutionProvider', 'CPUExecutionProvider']). + session_options: + Optional ONNXRuntime session options for session configuration and optimizations. + + Returns: + ort.InferenceSession: The ONNXRuntime session optimized for inference. + """ + if session_options is None: + sess_options = ort.SessionOptions() # type:ignore + sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_EXTENDED # type:ignore + session = ort.InferenceSession( # type:ignore + self._combined_op.SerializeToString(), + sess_options=sess_options, + providers=providers or ["CPUExecutionProvider"], + ) + return session + + def set_session(self, session: "ort.InferenceSession") -> None: # type: ignore + """Set a custom ONNXRuntime InferenceSession. + + Args: + session: ort.InferenceSession + The custom ONNXRuntime session to be set for inference. + """ + self._session = session + + def get_session(self) -> "ort.InferenceSession": # type: ignore + """Get the current ONNXRuntime InferenceSession. + + Returns: + ort.InferenceSession: The current ONNXRuntime session. + """ + return self._session + + def as_cpu(self) -> None: + """Set the session to run on CPU.""" + self._session.set_providers(["CPUExecutionProvider"]) + + def as_cuda(self, device_id: int = 0, **kwargs: Any) -> None: + """Set the session to run on CUDA. + + We set the ONNX runtime session to use CUDAExecutionProvider. For other CUDAExecutionProvider configurations, + or CUDA/cuDNN/ONNX version issues, + you may refer to https://onnxruntime.ai/docs/execution-providers/CUDA-ExecutionProvider.html. + + Args: + device_id: Select GPU to execute. + """ + self._session.set_providers(["CUDAExecutionProvider"], provider_options=[{"device_id": device_id, **kwargs}]) + + def as_tensorrt(self, device_id: int = 0, **kwargs: Any) -> None: + """Set the session to run on TensorRT. + + We set the ONNX runtime session to use TensorrtExecutionProvider. For other TensorrtExecutionProvider configurations, + or CUDA/cuDNN/ONNX/TensorRT version issues, + you may refer to https://onnxruntime.ai/docs/execution-providers/TensorRT-ExecutionProvider.html. + + Args: + device_id: select GPU to execute. + """ + self._session.set_providers( + ["TensorrtExecutionProvider"], provider_options=[{"device_id": device_id, **kwargs}] + ) + + def as_openvino(self, device_type: str = "GPU", **kwargs: Any) -> None: + """Set the session to run on TensorRT. + + We set the ONNX runtime session to use OpenVINOExecutionProvider. For other OpenVINOExecutionProvider configurations, + or CUDA/cuDNN/ONNX/TensorRT version issues, + you may refer to https://onnxruntime.ai/docs/execution-providers/OpenVINO-ExecutionProvider.html. + + Args: + device_type: CPU, NPU, GPU, GPU.0, GPU.1 based on the avaialable GPUs, NPU, Any valid Hetero combination, + Any valid Multi or Auto devices combination. + """ + self._session.set_providers( + ["OpenVINOExecutionProvider"], provider_options=[{"device_type": device_type, **kwargs}] + ) + + def __call__(self, *inputs: "np.ndarray") -> List["np.ndarray"]: # type:ignore + """Perform inference using the combined ONNX model. + + Args: + *inputs: Inputs to the ONNX model. The number of inputs must match the expected inputs of the session. + + Returns: + List: The outputs from the ONNX model inference. + """ + ort_inputs = self._session.get_inputs() + if len(ort_inputs) != len(inputs): + raise ValueError(f"Expected {len(ort_inputs)} for the session while only {len(inputs)} received.") + + ort_input_values = {ort_inputs[i].name: inputs[i] for i in range(len(ort_inputs))} + outputs = self._session.run(None, ort_input_values) + + return outputs + + +def load(model_name: str) -> "ONNXSequential": + """Load an ONNX model from either a file path or HuggingFace. + + The loaded model is an ONNXSequential object, of which you may run the model with + the `__call__` method, with less boilerplate. + + Args: + model_name: The name of the model to load. For Hugging Face-hosted models, + use the format 'hf://model_name'. Valid `model_name` can be found on + https://huggingface.co/kornia/ONNX_models. + """ + return ONNXSequential(model_name) diff --git a/kornia/onnx/utils.py b/kornia/onnx/utils.py new file mode 100644 index 0000000000..33386c4fdc --- /dev/null +++ b/kornia/onnx/utils.py @@ -0,0 +1,146 @@ +import logging +import os +import pprint +import urllib.request +from typing import Any, Dict, List, Optional + +import requests + +from kornia.config import kornia_config +from kornia.core.external import onnx + +__all__ = ["ONNXLoader"] + +logger = logging.getLogger(__name__) + + +class ONNXLoader: + f"""Manages ONNX models, handling local caching, downloading from Hugging Face, and loading models. + + Attributes: + cache_dir: The directory where ONNX models are cached locally. + Defaults to None, which will use a default `{kornia_config.hub_onnx_dir}` directory. + """ + + def __init__(self, cache_dir: Optional[str] = None): + self.cache_dir = cache_dir + + def _get_file_path(self, model_name: str, cache_dir: Optional[str]) -> str: + """Constructs the file path for the ONNX model based on the model name and cache directory. + + Args: + model_name: The name of the model or operator, typically in the format 'operators/model_name'. + cache_dir: The directory where the model should be cached. + + Returns: + str: The full local path where the model should be stored or loaded from. + """ + # Determine the local file path + if cache_dir is None: + if self.cache_dir is not None: + cache_dir = self.cache_dir + else: + cache_dir = kornia_config.hub_onnx_dir + + # The filename is the model name (without directory path) + file_name = f"{os.path.split(model_name)[-1]}.onnx" + file_path = os.path.join(*os.path.split(cache_dir), *os.path.split(model_name)[:-1], file_name) + return file_path + + def load_model(self, model_name: str, download: bool = True, **kwargs) -> "onnx.ModelProto": # type:ignore + """Loads an ONNX model from the local cache or downloads it from Hugging Face if necessary. + + Args: + model_name: The name of the ONNX model or operator. For Hugging Face-hosted models, + use the format 'hf://model_name'. Valid `model_name` can be found on + https://huggingface.co/kornia/ONNX_models. + Or a URL to the ONNX model. + download: If True, the model will be downloaded from Hugging Face if it's not already in the local cache. + **kwargs: Additional arguments to pass to the download method, if needed. + + Returns: + onnx.ModelProto: The loaded ONNX model. + """ + if model_name.startswith("hf://"): + model_name = model_name[len("hf://") :] + cache_dir = kwargs.get(kornia_config.hub_onnx_dir, None) or self.cache_dir + file_path = self._get_file_path(model_name, cache_dir) + if not os.path.exists(file_path): + # Construct the raw URL for the ONNX file + if download: + url = f"https://huggingface.co/kornia/ONNX_models/resolve/main/{model_name}.onnx" + self.download(url, file_path) + else: + raise ValueError(f"`{model_name}` is not found in `{file_path}`. You may set `download=True`.") + return onnx.load(file_path) # type:ignore + elif model_name.startswith("https://"): + cache_dir = kwargs.get(kornia_config.hub_onnx_dir, None) or self.cache_dir + file_path = self._get_file_path(model_name, cache_dir) + self.download(model_name, file_path) + return onnx.load(file_path) # type:ignore + + if os.path.exists(model_name): + return onnx.load(model_name) # type:ignore + + raise ValueError(f"File {model_name} not found") + + def download( + self, + url: str, + file_path: str, + ) -> None: + """Downloads an ONNX model from the specified URL and saves it to the specified file path. + + Args: + url: The URL of the ONNX model to download. + file_path: The local path where the downloaded model should be saved. + cache_dir: The directory to use for caching the file, defaults to the instance cache + directory if not provided. + """ + + os.makedirs(os.path.dirname(file_path), exist_ok=True) # Create the cache directory if it doesn't exist + + if url.startswith(("http:", "https:")): + try: + logger.info(f"Downloading `{url}` to `{file_path}`.") + urllib.request.urlretrieve(url, file_path) # noqa: S310 + except urllib.error.HTTPError as e: + raise ValueError(f"Error in resolving `{url}`. {e}.") + else: + raise ValueError("URL must start with 'http:' or 'https:'") + + @staticmethod + def _fetch_repo_contents(folder: str) -> List[Dict[str, Any]]: + """Fetches the contents of the Hugging Face repository using the Hugging Face API. + + Returns: + List[dict]: A list of all files in the repository as dictionaries containing file details. + """ + url = f"https://huggingface.co/api/models/kornia/ONNX_models/tree/main/{folder}" + + response = requests.get(url, timeout=10) + + if response.status_code == 200: + return response.json() # Returns the JSON content of the repo + else: + raise ValueError(f"Failed to fetch repository contents: {response.status_code}") + + @staticmethod + def list_operators() -> None: + """Lists all available ONNX operators in the 'operators' folder of the Hugging Face repository.""" + repo_contents = ONNXLoader._fetch_repo_contents("operators") + + # Filter for operators in the 'operators' directory + operators = [file["path"] for file in repo_contents] + + pprint.pp(operators) + + @staticmethod + def list_models() -> None: + """Lists all available ONNX models in the 'models' folder of the Hugging Face repository.""" + repo_contents = ONNXLoader._fetch_repo_contents("models") + + # Filter for models in the 'models' directory + models = [file["path"] for file in repo_contents] + + pprint.pp(models) diff --git a/kornia/utils/__init__.py b/kornia/utils/__init__.py index 8fc674f6fb..61806c28f8 100644 --- a/kornia/utils/__init__.py +++ b/kornia/utils/__init__.py @@ -28,6 +28,7 @@ ) from .one_hot import one_hot from .pointcloud_io import load_pointcloud_ply, save_pointcloud_ply +from .sample import get_sample_images __all__ = [ "batched_forward", @@ -62,4 +63,5 @@ "is_mps_tensor_safe", "dataclass_to_dict", "dict_to_dataclass", + "get_sample_images", ] diff --git a/kornia/utils/sample.py b/kornia/utils/sample.py new file mode 100644 index 0000000000..4fedfa239b --- /dev/null +++ b/kornia/utils/sample.py @@ -0,0 +1,86 @@ +import logging +import os +from typing import List, Optional, Tuple, Union + +import requests + +import kornia +from kornia.core import Tensor, stack +from kornia.core.external import PILImage as Image +from kornia.io import load_image + +__all__ = [ + "get_sample_images", +] + +IMAGE_URLS: List[str] = [ + "https://raw.githubusercontent.com/kornia/data/main/panda.jpg", + "https://raw.githubusercontent.com/kornia/data/main/simba.png", + "https://raw.githubusercontent.com/kornia/data/main/girona.png", + "https://raw.githubusercontent.com/kornia/data/main/baby_giraffe.png", + "https://raw.githubusercontent.com/kornia/data/main/persistencia_memoria.jpg", + "https://raw.githubusercontent.com/kornia/data/main/delorean.png", +] + + +def download_image(url: str, save_to: str) -> None: + """Download an image from a given URL and save it to a specified file path. + + Args: + url: The URL of the image to download. + save_to: The file path where the downloaded image will be saved. + """ + im = Image.open(requests.get(url, stream=True, timeout=30).raw) # type:ignore + im.save(save_to) + + +def get_sample_images( + resize: Optional[Tuple[int, int]] = None, + paths: List[str] = IMAGE_URLS, + download: bool = True, + cache_dir: Optional[str] = None, +) -> Union[Tensor, List[Tensor]]: + """Loads multiple images from the given URLs. + + Optionally download them, resize them if specified, and return them as a batch of tensors or a list of tensors. + + Args: + paths: A list of path or URL from which to load or download images. + Defaults to a pre-defined constant `IMAGE_URLS` if not provided. + resize: Optional target size for resizing all images as a tuple (height, width). + If not provided, the images will not be resized, and their original sizes will be retained. + download (bool): Whether to download the images if they are not already cached. Defaults to True. + cache_dir (Optional[str]): The directory where the downloaded images will be cached. + Defaults to ".kornia_hub/images". + + Returns: + torch.Tensor | list[torch.Tensor]: + If `resize` is provided, returns a single stacked tensor with shape (B, C, H, W). + Otherwise, returns a list of tensors, each with its original shape (C, H, W). + """ + if cache_dir is None: + cache_dir = ".kornia_hub/images" + os.makedirs(cache_dir, exist_ok=True) + tensors = [] + for path in paths: + if path.startswith("http"): + name = os.path.basename(path) + fname = os.path.join(cache_dir, name) + if not os.path.exists(fname) and download: + logging.info(f"Downloading `{path}` to `{fname}`.") + download_image(path, fname) + elif not os.path.exists(fname) and not download: + logging.error( + f"Image `{path}` not found at `{fname}`. You may want to set `download=True` to download it." + ) + else: + fname = path + img_tensor = load_image(fname) + if resize is not None: + img_tensor = kornia.geometry.resize(img_tensor, resize) + tensors.append(img_tensor) + + if resize is not None: + return stack(tensors) + else: + return tensors diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index 6d5dac3c63..21866c5bfb 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -2,11 +2,13 @@ accelerate coverage diffusers mypy -numpy<2 +numpy<3 onnx +onnxruntime pillow pre-commit>=2 pytest==8.3.2 pytest-timeout requests transformers +types-requests diff --git a/requirements/requirements-docs.txt b/requirements/requirements-docs.txt index 770406e842..87d2805a71 100644 --- a/requirements/requirements-docs.txt +++ b/requirements/requirements-docs.txt @@ -1,6 +1,8 @@ furo kornia_moons matplotlib +onnx +onnxruntime opencv-python PyYAML>=5.1 sphinx diff --git a/tests/contrib/test_object_detector.py b/tests/contrib/test_object_detector.py index 00eee98b81..072a56635f 100644 --- a/tests/contrib/test_object_detector.py +++ b/tests/contrib/test_object_detector.py @@ -16,9 +16,9 @@ def test_smoke(self, device, dtype): confidence = 0.3 config = RTDETRConfig("resnet50d", 10, head_num_queries=10) model = RTDETR.from_config(config).to(device, dtype).eval() - pre_processor = kornia.contrib.object_detection.ResizePreProcessor((32, 32)) + pre_processor = kornia.models.utils.ResizePreProcessor(32, 32) post_processor = DETRPostProcessor(confidence, num_top_queries=3).to(device, dtype).eval() - detector = kornia.contrib.ObjectDetector(model, pre_processor, post_processor) + detector = kornia.models.detection.ObjectDetector(model, pre_processor, post_processor) sizes = torch.randint(5, 10, (batch_size, 2)) * 32 imgs = [torch.randn(3, h, w, device=device, dtype=dtype) for h, w in sizes] @@ -29,7 +29,7 @@ def test_smoke(self, device, dtype): assert pre_processor_out[0].shape[-2] == 32 assert len(detections) == batch_size for dets in detections: - assert dets.shape[1] == 6 + assert dets.shape[1] == 6, dets.shape assert torch.all(dets[:, 0].int() == dets[:, 0]) assert torch.all(dets[:, 1] >= 0.3) @@ -39,9 +39,9 @@ def test_smoke(self, device, dtype): def test_onnx(self, device, dtype, tmp_path: Path, variant: str): config = RTDETRConfig(variant, 1) model = RTDETR.from_config(config).to(device=device, dtype=dtype).eval() - pre_processor = kornia.contrib.object_detection.ResizePreProcessor((640, 640)) + pre_processor = kornia.models.utils.ResizePreProcessor(640, 640) post_processor = DETRPostProcessor(0.3, num_top_queries=3) - detector = kornia.contrib.ObjectDetector(model, pre_processor, post_processor) + detector = kornia.models.detection.ObjectDetector(model, pre_processor, post_processor) data = torch.rand(3, 400, 640, device=device, dtype=dtype) diff --git a/tests/enhance/test_normalize.py b/tests/enhance/test_normalize.py index 51dc469f8f..d6f6deeaca 100644 --- a/tests/enhance/test_normalize.py +++ b/tests/enhance/test_normalize.py @@ -10,7 +10,7 @@ class TestNormalize(BaseTester): def test_smoke(self, device, dtype): mean = [0.5] std = [0.1] - repr = "Normalize(mean=tensor([0.5000]), std=tensor([0.1000]))" + repr = "Normalize(mean=tensor([[0.5000]]), std=tensor([[0.1000]]))" assert str(kornia.enhance.Normalize(mean, std)) == repr def test_normalize(self, device, dtype): @@ -137,7 +137,7 @@ def test_random_normalize_different_parameter_types(self, mean, std): def test_random_normalize_invalid_parameter_shape(self, mean, std): f = kornia.enhance.Normalize(mean=mean, std=std) inputs = torch.arange(0.0, 16.0, step=1).reshape(1, 4, 4).unsqueeze(0) - with pytest.raises(ValueError): + with pytest.raises((ValueError, RuntimeError)): f(inputs) @pytest.mark.skip(reason="not implemented yet") diff --git a/tests/filters/test_filters.py b/tests/filters/test_filters.py index 77e669d50d..7482bffa34 100644 --- a/tests/filters/test_filters.py +++ b/tests/filters/test_filters.py @@ -729,9 +729,10 @@ class TestDexiNed(BaseTester): def test_smoke(self, device, dtype): img = torch.rand(2, 3, 32, 32, device=device, dtype=dtype) net = DexiNed(pretrained=False).to(device, dtype) + feat = net.get_features(img) + assert len(feat) == 6 out = net(img) - assert len(out) == 7 - assert out[-1].shape == (2, 1, 32, 32) + assert out.shape == (2, 1, 32, 32) @pytest.mark.slow @pytest.mark.parametrize("data", ["dexined"], indirect=True) diff --git a/tests/models/box_filtering.py b/tests/models/box_filtering.py new file mode 100644 index 0000000000..6d489d27a6 --- /dev/null +++ b/tests/models/box_filtering.py @@ -0,0 +1,100 @@ +import pytest +import torch +from numpy.testing import assert_almost_equal + +from kornia.core import tensor +from kornia.models.detection.utils import BoxFiltering # Replace with the actual module path + + +class TestBoxFiltering: + @pytest.fixture + def sample_boxes(self): + # Setup some sample boxes with the format [class_id, confidence_score, x, y, w, h] + return tensor( + [ + [ + [1, 0.9, 10, 10, 20, 20], # High confidence, class 1 + [2, 0.7, 15, 15, 25, 25], # Medium confidence, class 2 + [3, 0.7, 15, 15, 25, 25], # Medium confidence, class 3 + [4, 0.3, 5, 5, 10, 10], + ], # Low confidence, class 4 + [ + [1, 0.95, 12, 12, 18, 18], # High confidence, class 1 + [2, 0.5, 13, 13, 20, 20], # Low confidence, class 2 + [3, 0.5, 13, 13, 20, 20], # Low confidence, class 3 + [4, 0.2, 7, 7, 14, 14], + ], # Very low confidence, class 4 + [ + [1, 0.1, 12, 12, 18, 18], # Very Low confidence, class 1 + [2, 0.1, 13, 13, 20, 20], # Very Low confidence, class 2 + [3, 0.1, 13, 13, 20, 20], # Very Low confidence, class 3 + [4, 0.1, 7, 7, 14, 14], + ], # Very Low confidence, class 4 + ] + ) # Shape: [3, 4, 6], i.e., [B, D, 6] + + def test_confidence_filtering(self, sample_boxes): + """Test filtering based on confidence threshold.""" + # Set a confidence threshold of 0.7 + filter = BoxFiltering(confidence_threshold=0.7) + filtered_boxes = filter(sample_boxes) + + # Expected output: only boxes with confidence > 0.7 should be kept + assert len(filtered_boxes[0]) == 1 # Only one box in the first batch + assert_almost_equal(filtered_boxes[0][0][1].item(), 0.9) # Box with confidence 0.9 + assert len(filtered_boxes[1]) == 1 # Only one box in the second batch + assert_almost_equal(filtered_boxes[1][0][1].item(), 0.95) # Box with confidence 0.95 + assert len(filtered_boxes[2]) == 0 # No boxes in the third batch + + def test_class_filtering(self, sample_boxes): + """Test filtering based on class IDs.""" + # Set classes_to_keep to [1, 2] + filter = BoxFiltering(classes_to_keep=tensor([1, 2])) + filtered_boxes = filter(sample_boxes) + + # Expected output: only boxes with class_id 1 and 2 should be kept + assert len(filtered_boxes[0]) == 2 # Two boxes in the first batch + assert filtered_boxes[0][0][0].item() == 1 # Box with class_id 1 + assert filtered_boxes[0][1][0].item() == 2 # Box with class_id 2 + assert len(filtered_boxes[1]) == 2 # Two boxes in the second batch + assert filtered_boxes[1][0][0].item() == 1 # Box with class_id 1 + assert filtered_boxes[1][1][0].item() == 2 # Box with class_id 2 + assert len(filtered_boxes[2]) == 2 # Two boxes in the third batch + assert filtered_boxes[2][0][0].item() == 1 # Box with class_id 1 + assert filtered_boxes[2][1][0].item() == 2 # Box with class_id 2 + + def test_combined_confidence_and_class_filtering(self, sample_boxes): + """Test filtering based on both confidence and class IDs.""" + # Set confidence threshold to 0.6 and classes_to_keep to [1, 3] + filter = BoxFiltering(confidence_threshold=0.6, classes_to_keep=tensor([1, 3])) + filtered_boxes = filter(sample_boxes) + + # Expected output: only boxes with confidence > 0.6 and class_id in [1, 3] should be kept + assert len(filtered_boxes[0]) == 2 # Two boxes in the first batch + assert filtered_boxes[0][0][0].item() == 1 # Class_id 1 + assert filtered_boxes[0][1][0].item() == 3 # Class_id 3 + assert filtered_boxes[1][0][0].item() == 1 # Class_id 1 + assert len(filtered_boxes[1]) == 1 # No boxes in the second batch + assert len(filtered_boxes[2]) == 0 # No boxes in the third batch + + def test_filter_as_zero(self, sample_boxes): + """Test filtering boxes as zero when filter_as_zero is True.""" + filter = BoxFiltering(confidence_threshold=0.8, filter_as_zero=True) + filtered_boxes = filter(sample_boxes) + + # Expected output: boxes with confidence <= 0.8 should be zeroed out + assert torch.all(filtered_boxes[0][0] != 0) # Box with confidence 0.9 should remain + assert torch.all(filtered_boxes[0][1:] == 0) # Remaining boxes should be zeroed + assert torch.all(filtered_boxes[1][0] != 0) # Box with confidence 0.95 should remain + assert torch.all(filtered_boxes[1][1:] == 0) # Remaining boxes should be zeroed + assert torch.all(filtered_boxes[2] == 0) # All boxes in the third batch should be zeroed + + def test_no_class_or_confidence_filtering(self, sample_boxes): + """Test when no class or confidence filtering is applied.""" + filter = BoxFiltering() # No thresholds set + filtered_boxes = filter(sample_boxes) + + # Expected output: all boxes should be returned as-is + assert len(filtered_boxes[0]) == 4 # All boxes in the first batch should be kept + assert len(filtered_boxes[1]) == 4 # All boxes in the second batch should be kept + assert len(filtered_boxes[2]) == 4 # All boxes in the third batch should be kept diff --git a/tests/onnx/test_sequential.py b/tests/onnx/test_sequential.py new file mode 100644 index 0000000000..630d7fb70e --- /dev/null +++ b/tests/onnx/test_sequential.py @@ -0,0 +1,65 @@ +from unittest.mock import MagicMock, patch + +import onnx +import onnxruntime as ort +import pytest +from onnx.helper import make_graph, make_model, make_node, make_tensor_value_info + +from kornia.onnx.sequential import ONNXSequential + + +class TestONNXSequential: + @pytest.fixture + def mock_model_proto(self): + # Create a minimal ONNX model with an input and output + input_info = make_tensor_value_info("input", onnx.TensorProto.FLOAT, [1, 2]) + output_info = make_tensor_value_info("output", onnx.TensorProto.FLOAT, [1, 2]) + node = make_node("Identity", ["input"], ["output"]) + graph = make_graph([node], "test_graph", [input_info], [output_info]) + model = make_model(graph) + return model + + @pytest.fixture + def onnx_sequential(self, mock_model_proto): + # Return an ONNXSequential instance with mocked models + return ONNXSequential(mock_model_proto, mock_model_proto) + + def test_load_op_from_proto(self, mock_model_proto, onnx_sequential): + # Test loading a model from an ONNX ModelProto object + model = onnx_sequential._load_op(mock_model_proto) + assert model == mock_model_proto + + @patch("onnx.compose.merge_models") + def test_combine_models(self, mock_merge_models, mock_model_proto): + # Create a small ONNX model as the return value of merge_models + input_info = make_tensor_value_info("input", onnx.TensorProto.FLOAT, [1, 2]) + output_info = make_tensor_value_info("output", onnx.TensorProto.FLOAT, [1, 2]) + node = make_node("Identity", ["input"], ["output"]) + graph = make_graph([node], "combined_graph", [input_info], [output_info]) + combined_model = make_model(graph) + + mock_merge_models.return_value = combined_model + + # Test combining multiple ONNX models with io_maps + onnx_sequential = ONNXSequential(mock_model_proto, mock_model_proto) + combined_op = onnx_sequential._combine([("output1", "input2")]) + + assert isinstance(combined_op, onnx.ModelProto) + + @patch("onnx.save") + def test_export_combined_model(self, mock_save, onnx_sequential): + # Test exporting the combined ONNX model + onnx_sequential.export("exported_model.onnx") + mock_save.assert_called_once_with(onnx_sequential._combined_op, "exported_model.onnx") + + @patch("onnxruntime.InferenceSession") + def test_create_session(self, mock_inference_session, onnx_sequential): + # Test creating an ONNXRuntime session + session = onnx_sequential.create_session() + assert session == mock_inference_session() + + def test_set_get_session(self, onnx_sequential): + # Test setting and getting a custom session + mock_session = MagicMock(spec=ort.InferenceSession) + onnx_sequential.set_session(mock_session) + assert onnx_sequential.get_session() == mock_session diff --git a/tests/onnx/test_utils.py b/tests/onnx/test_utils.py new file mode 100644 index 0000000000..82be0c1629 --- /dev/null +++ b/tests/onnx/test_utils.py @@ -0,0 +1,126 @@ +import os +import urllib +from unittest import mock + +import pytest +from onnx import ModelProto # Assuming `onnx` is installed and ModelProto is part of the library + +from kornia.onnx.utils import ONNXLoader + + +class TestONNXLoader: + @pytest.fixture + def loader(self): + return ONNXLoader(cache_dir=".test_cache") + + def test_get_file_path_with_custom_cache_dir(self, loader): + model_name = os.path.join("operators", "some_model") + expected_path = os.path.join(".test_cache", "operators", "some_model.onnx") + assert loader._get_file_path(model_name, loader.cache_dir) == expected_path + + def test_get_file_path_with_default_cache_dir(self): + loader = ONNXLoader() + model_name = os.path.join("operators", "some_model") + expected_path = os.path.join(".kornia_hub", "onnx_models", "operators", "some_model.onnx") + assert loader._get_file_path(model_name, None) == expected_path + + @mock.patch("onnx.load") + @mock.patch("os.path.exists") + def test_load_model_local(self, mock_exists, mock_onnx_load, loader): + model_name = "local_model.onnx" + mock_exists.return_value = True + + # Simulate onnx.load returning a dummy ModelProto + mock_model = mock.Mock(spec=ModelProto) + mock_onnx_load.return_value = mock_model + + model = loader.load_model(model_name) + assert model == mock_model + mock_onnx_load.assert_called_once_with(model_name) + + @mock.patch("urllib.request.urlretrieve") + @mock.patch("os.path.exists") + def test_load_model_download(self, mock_exists, mock_urlretrieve, loader): + model_name = "hf://operators/some_model" + mock_exists.return_value = False + mock_urlretrieve.return_value = None # Simulating successful download + + with mock.patch("onnx.load") as mock_onnx_load: + mock_model = mock.Mock(spec=ModelProto) + mock_onnx_load.return_value = mock_model + + model = loader.load_model(model_name) + assert model == mock_model + mock_urlretrieve.assert_called_once_with( + "https://huggingface.co/kornia/ONNX_models/resolve/main/operators/some_model.onnx", + os.path.join(".test_cache", "operators", "some_model.onnx"), + ) + + def test_load_model_file_not_found(self, loader): + model_name = "non_existent_model.onnx" + + with pytest.raises(ValueError, match=f"File {model_name} not found"): + loader.load_model(model_name) + + @mock.patch("urllib.request.urlretrieve") + @mock.patch("os.makedirs") + def test_download_success(self, mock_makedirs, mock_urlretrieve, loader): + url = "https://huggingface.co/some_model.onnx" + file_path = os.path.join(".test_cache", "some_model.onnx") + + loader.download(url, file_path) + + mock_makedirs.assert_called_once_with(os.path.dirname(file_path), exist_ok=True) + mock_urlretrieve.assert_called_once_with(url, file_path) + + @mock.patch( + "urllib.request.urlretrieve", + side_effect=urllib.error.HTTPError(url=None, code=404, msg="Not Found", hdrs=None, fp=None), + ) + def test_download_failure(self, mock_urlretrieve, loader): + url = "https://huggingface.co/non_existent_model.onnx" + file_path = os.path.join(".test_cache", "non_existent_model.onnx") + + with pytest.raises(ValueError, match="Error in resolving"): + loader.download(url, file_path) + + @mock.patch("requests.get") + def test_fetch_repo_contents_success(self, mock_get): + mock_response = mock.Mock() + mock_response.status_code = 200 + mock_response.json.return_value = [{"path": os.path.join("operators", "model.onnx")}] + mock_get.return_value = mock_response + + contents = ONNXLoader._fetch_repo_contents("operators") + assert contents == [{"path": os.path.join("operators", "model.onnx")}] + + @mock.patch("requests.get") + def test_fetch_repo_contents_failure(self, mock_get): + mock_response = mock.Mock() + mock_response.status_code = 404 + mock_get.return_value = mock_response + + with pytest.raises(ValueError, match="Failed to fetch repository contents"): + ONNXLoader._fetch_repo_contents("operators") + + @mock.patch("kornia.onnx.utils.ONNXLoader._fetch_repo_contents") + def test_list_operators(self, mock_fetch_repo_contents, capsys): + mock_fetch_repo_contents.return_value = [{"path": os.path.join("operators", "some_model.onnx")}] + + ONNXLoader.list_operators() + + captured = capsys.readouterr() + assert ( + os.path.join("operators", "some_model.onnx").replace("\\", "\\\\") in captured.out + ) # .replace() for Windows + + @mock.patch("kornia.onnx.utils.ONNXLoader._fetch_repo_contents") + def test_list_models(self, mock_fetch_repo_contents, capsys): + mock_fetch_repo_contents.return_value = [{"path": os.path.join("operators", "some_model.onnx")}] + + ONNXLoader.list_models() + + captured = capsys.readouterr() + assert ( + os.path.join("operators", "some_model.onnx").replace("\\", "\\\\") in captured.out + ) # .replace() for Windows