From 9fa328bd3a8e4074437b75273a6655ccb39d28ef Mon Sep 17 00:00:00 2001 From: Ishaan Datta Date: Wed, 6 Nov 2024 14:09:24 -0800 Subject: [PATCH] test utilities , partial repair of tools --- python_wip/bbox_display.py | 2 +- tools/benchmarking/engine-perf-tests.py | 536 +----------------- tools/benchmarking/model-perf-tests.py | 91 +++ tools/testing/engine-test.py | 0 tools/testing/inference-results.py | 50 ++ tools/testing/model-unit-tests.py | 100 ++++ tools/testing/model_inference.py | 378 ------------ tools/testing/onnx-test.py | 101 ++++ tools/testing/results-unit-tests.py | 1 + tools/testing/ros-unit-tests.py | 107 ++++ tools/utils/engine-inference.py | 72 +++ tools/utils/model-inference.py | 141 +++++ .../model_post.py => utils/post-process.py} | 341 ++++++----- tools/utils/pre-process.py | 22 + tools/video/mp4-recorder.py | 71 +++ tools/video/svo-recorder.py | 70 +++ tools/video/video-infer.py | 99 ++++ .../python_workspace/extermination_node.py | 24 +- 18 files changed, 1171 insertions(+), 1035 deletions(-) create mode 100644 tools/testing/engine-test.py create mode 100644 tools/testing/inference-results.py delete mode 100644 tools/testing/model_inference.py create mode 100644 tools/testing/onnx-test.py create mode 100644 tools/testing/results-unit-tests.py create mode 100644 tools/utils/engine-inference.py create mode 100644 tools/utils/model-inference.py rename tools/{testing/model_post.py => utils/post-process.py} (56%) create mode 100644 tools/utils/pre-process.py create mode 100644 tools/video/mp4-recorder.py create mode 100644 tools/video/svo-recorder.py create mode 100644 tools/video/video-infer.py diff --git a/python_wip/bbox_display.py b/python_wip/bbox_display.py index c2e8663..636b9f8 100644 --- a/python_wip/bbox_display.py +++ b/python_wip/bbox_display.py @@ -49,6 +49,6 @@ def read_bounding_boxes(txt_file): bboxes.append((class_id, x_center, y_center, bbox_width, bbox_height)) return bboxes -os.chdir("C:/Users/ishaa/Coding Projects/Applied-AI/ROS/assets/maize") +os.chdir("C:/Users/Ishaan/Coding Projects/ROS/assets/maize") boxes = read_bounding_boxes("IMG_2884_18.txt") draw_bounding_boxes("IMG_2884_18.JPG", boxes) \ No newline at end of file diff --git a/tools/benchmarking/engine-perf-tests.py b/tools/benchmarking/engine-perf-tests.py index 32859b1..93939a0 100644 --- a/tools/benchmarking/engine-perf-tests.py +++ b/tools/benchmarking/engine-perf-tests.py @@ -1,379 +1,29 @@ -import argparse -import tensorrt as trt -import pycuda.driver as cuda -import pycuda.autoinit # Automatically initializes CUDA driver -# import numpy as np -import time -import torch - -## need to adapt after finalizing preprocessing/postprocessing steps -# should also do unit testing, but toggle functionality w/ param - -# allocates input/ouput buffers for the TensorRT engine inference -def allocate_buffers(engine): - inputs = [] - outputs = [] - bindings = [] - stream = cuda.Stream() - - for binding in engine: - size = trt.volume(engine.get_binding_shape(binding)) * engine.max_batch_size - dtype = trt.nptype(engine.get_binding_dtype(binding)) - - # Allocate host and device buffers - host_mem = cuda.pagelocked_empty(size, dtype) - device_mem = cuda.mem_alloc(host_mem.nbytes) - - # Append the device buffer to device bindings - bindings.append(int(device_mem)) - - # Append to the appropriate list - if engine.binding_is_input(binding): - inputs.append((host_mem, device_mem)) - else: - outputs.append((host_mem, device_mem)) - - return inputs, outputs, bindings, stream - -# performs inference on the input data using the TensorRT engine -def infer(engine, inputs, outputs, bindings, stream, input_data): - # Transfer input data to the device - np.copyto(inputs[0][0], input_data.ravel()) - cuda.memcpy_htod_async(inputs[0][1], inputs[0][0], stream) - - # Execute the model - context = engine.create_execution_context() - context.execute_async_v2(bindings=bindings, stream_handle=stream.handle) - - # Transfer predictions back from the GPU - cuda.memcpy_dtoh_async(outputs[0][0], outputs[0][1], stream) +# combines calling the engine inference utility with system_metrics +# along with pre-processing and post-processing utilities +# should plot results cleanly, able to export summary, used in github? +# can also toggle unit tests functionality +# Update paths for the .trt model, test images, and ground truth file. +# Ensure the bounding box coordinates are converted between the format of the ground truth and model output if necessary. +# Expand with more metrics such as precision, recall, or F1 score based on IoU thresholds if relevant. - # Wait for the stream to complete the operation - stream.synchronize() +import os +# need imports for inference module +from utils import pre-process, postprocess - return outputs[0][0] +def verify_path(trt_engine_path=None): + pass # tests a TensorRT engine file by performing inference and checking outputs def test_trt_engine(trt_engine_path='model_trt.trt', input_shape=(1,3,224,224), input_data=None, expected_output=None): - # Load the TensorRT engine from the file - with open(trt_engine_path, "rb") as f: - runtime = trt.Runtime(trt.Logger(trt.Logger.WARNING)) - engine = runtime.deserialize_cuda_engine(f.read()) - - # Allocate buffers for inference + engine = load_engine() inputs, outputs, bindings, stream = allocate_buffers(engine) - # Generate random input data if not provided - if input_data is None: - input_data = np.random.rand(*input_shape).astype(np.float32) - - # Perform inference using the TensorRT engine - output = infer(engine, inputs, outputs, bindings, stream, input_data) - - # Print the inference result - print("Inference output:", output) - - # Compare with expected output if provided - if expected_output is not None: - if np.allclose(output, expected_output, rtol=1e-3, atol=1e-3): - print("The inference result matches the expected output.") - return True - else: - print("The inference result does not match the expected output.") - return False - else: - print("No expected output provided. Unable to verify accuracy.") - return True # Pass as long as inference ran without errors -if __name__ == "__main__": - print("Usage: python3 TensorRT_test.py ") - print("Example: python3 TensorRT_test.py model.trt (1, 3, 224, 224) None None") - - if len(sys.argv) < 2: - test_trt_engine() - else: - for i in range(len(sys.argv), 5): - sys.argv.append(None) - test_trt_engine(*sys.argv[1:5]) - - -def benchmark_trt_model(trt_engine_path): - TRT_LOGGER = trt.Logger(trt.Logger.WARNING) - with open(trt_engine_path, 'rb') as f, trt.Runtime(TRT_LOGGER) as runtime: - engine = runtime.deserialize_cuda_engine(f.read()) - - context = engine.create_execution_context() - # Example input input_shape = (1, 3, 224, 224) input_data = torch.randn(input_shape).cuda() - - # Allocate buffers - inputs, outputs, bindings = [], [], [] - for binding in engine: - size = trt.volume(engine.get_binding_shape(binding)) * engine.max_batch_size - dtype = trt.nptype(engine.get_binding_dtype(binding)) - host_mem = torch.empty(size, dtype=torch.float32).cuda() - inputs.append(host_mem) - bindings.append(int(host_mem.data_ptr())) - - # Execute inference - start_time = time.time() - context.execute_v2(bindings=bindings) - end_time = time.time() - - # Report time - latency = end_time - start_time - print(f"Model Inference Time: {latency * 1000:.2f} ms") - -benchmark_trt_model(args.model) - -## new: -import time -import numpy as np -import cv2 -import pycuda.driver as cuda -import pycuda.autoinit -import tensorrt as trt -from onnxruntime import InferenceSession - -# Helper functions -def load_ground_truth(file_path): - """Load ground truth bounding boxes from text file.""" - with open(file_path, 'r') as f: - bboxes = [] - for line in f: - tokens = line.strip().split() - cls, x_center, y_center, width, height = map(float, tokens) - bboxes.append((cls, x_center, y_center, width, height)) - return bboxes - -def iou(boxA, boxB): - """Compute Intersection Over Union (IoU) between two bounding boxes.""" - xA = max(boxA[0], boxB[0]) - yA = max(boxA[1], boxB[1]) - xB = min(boxA[2], boxB[2]) - yB = min(boxA[3], boxB[3]) - - interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1) - boxAArea = (boxA[2] - boxA[0] + 1) * (boxA[3] - boxA[1] + 1) - boxBArea = (boxB[2] - boxB[0] + 1) * (boxB[3] - boxB[1] + 1) - - iou = interArea / float(boxAArea + boxBArea - interArea) - return iou - -def calculate_centroid_offset(pred_box, gt_box): - """Calculate percentage centroid offset between two boxes.""" - pred_center = (pred_box[0] + pred_box[2]) / 2, (pred_box[1] + pred_box[3]) / 2 - gt_center = (gt_box[0] + gt_box[2]) / 2, (gt_box[1] + gt_box[3]) / 2 - offset_x = abs(pred_center[0] - gt_center[0]) / (gt_box[2] - gt_box[0]) - offset_y = abs(pred_center[1] - gt_center[1]) / (gt_box[3] - gt_box[1]) - return (offset_x + offset_y) / 2 * 100 - -# Load TensorRT model -TRT_LOGGER = trt.Logger(trt.Logger.WARNING) - -class TRTInference: - def __init__(self, engine_path): - self.engine = self.load_engine(engine_path) - self.context = self.engine.create_execution_context() - - def load_engine(self, engine_path): - with open(engine_path, 'rb') as f, trt.Runtime(TRT_LOGGER) as runtime: - return runtime.deserialize_cuda_engine(f.read()) - - def allocate_buffers(self): - inputs = [] - outputs = [] - bindings = [] - stream = cuda.Stream() - - for binding in self.engine: - size = trt.volume(self.engine.get_binding_shape(binding)) * self.engine.max_batch_size - dtype = trt.nptype(self.engine.get_binding_dtype(binding)) - host_mem = cuda.pagelocked_empty(size, dtype) - device_mem = cuda.mem_alloc(host_mem.nbytes) - bindings.append(int(device_mem)) - - if self.engine.binding_is_input(binding): - inputs.append({'host': host_mem, 'device': device_mem}) - else: - outputs.append({'host': host_mem, 'device': device_mem}) - - return inputs, outputs, bindings, stream - - def infer(self, image, inputs, outputs, bindings, stream): - np.copyto(inputs[0]['host'], image.ravel()) - - # Transfer input data to the GPU. - cuda.memcpy_htod_async(inputs[0]['device'], inputs[0]['host'], stream) - - # Run inference. - self.context.execute_async_v2(bindings=bindings, stream_handle=stream.handle) - - # Transfer predictions back from GPU. - cuda.memcpy_dtoh_async(outputs[0]['host'], outputs[0]['device'], stream) - stream.synchronize() - - return outputs[0]['host'] - -def preprocess_image(image_path, input_shape): - """Preprocess image for inference.""" - image = cv2.imread(image_path) - image_resized = cv2.resize(image, (input_shape[1], input_shape[0])) - image = np.asarray(image_resized).astype(np.float32) - return np.transpose(image, (2, 0, 1)) / 255.0 # CHW format and normalized - -def run_benchmark(trt_model_path, test_images, ground_truth_path): - """Run benchmark on the model.""" - # Load ground truth - ground_truth_bboxes = load_ground_truth(ground_truth_path) - - # Initialize TensorRT inference - trt_infer = TRTInference(trt_model_path) - inputs, outputs, bindings, stream = trt_infer.allocate_buffers() - - inference_times = [] - iou_scores = [] - centroid_offsets = [] - - for idx, img_path in enumerate(test_images): - # Preprocess image - image = preprocess_image(img_path, (300, 300)) # Adjust size as needed - - # Perform inference and measure time - start_time = time.time() - pred_bbox = trt_infer.infer(image, inputs, outputs, bindings, stream) - inference_time = time.time() - start_time - - # Compute IoU, centroid offset - gt_bbox = ground_truth_bboxes[idx] - iou_score = iou(pred_bbox, gt_bbox) - offset = calculate_centroid_offset(pred_bbox, gt_bbox) - - # Store results - inference_times.append(inference_time) - iou_scores.append(iou_score) - centroid_offsets.append(offset) - - # Summary of benchmark - print(f"Average Inference Time: {np.mean(inference_times):.4f} seconds") - print(f"Average IoU: {np.mean(iou_scores) * 100:.2f}%") - print(f"Average Centroid Offset: {np.mean(centroid_offsets):.2f}%") - -if __name__ == "__main__": - trt_model_path = "model.trt" # Replace with your TensorRT model path - test_images = ["test1.jpg", "test2.jpg"] # Replace with your test images - ground_truth_path = "ground_truth.txt" # Replace with your ground truth file path - - run_benchmark(trt_model_path, test_images, ground_truth_path) - -# pip install pycuda onnxruntime numpy opencv-python - -# Preprocessing: The image is resized and normalized to be input into the model. -# Inference: Uses TensorRT to make predictions on the preprocessed image. -# Metrics: - -# Intersection over Union (IoU) to measure accuracy. -# Centroid offset, which checks the difference in the center of predicted and ground truth bounding boxes. -# Inference time for each image. - -# Ground Truth Parsing: The ground truth bounding boxes are read from the text file provided. - -# Update paths for the .trt model, test images, and ground truth file. -# Ensure the bounding box coordinates are converted between the format of the ground truth and model output if necessary. -# Expand with more metrics such as precision, recall, or F1 score based on IoU thresholds if relevant. - - -import tensorrt as trt -import pycuda.driver as cuda -import pycuda.autoinit -import numpy as np - -def allocate_buffers(engine): - """ - Allocates input/output buffers for TensorRT engine inference. - Args: - engine: The TensorRT engine. - Returns: - inputs: List of input GPU buffers. - outputs: List of output GPU buffers. - bindings: List of bindings for the model. - stream: CUDA stream for the inference. - """ - inputs = [] - outputs = [] - bindings = [] - stream = cuda.Stream() - - for binding in engine: - size = trt.volume(engine.get_binding_shape(binding)) * engine.max_batch_size - dtype = trt.nptype(engine.get_binding_dtype(binding)) - - # Allocate host and device buffers - host_mem = cuda.pagelocked_empty(size, dtype) - device_mem = cuda.mem_alloc(host_mem.nbytes) - - # Append the device buffer to device bindings - bindings.append(int(device_mem)) - - # Append to the appropriate list - if engine.binding_is_input(binding): - inputs.append((host_mem, device_mem)) - else: - outputs.append((host_mem, device_mem)) - - return inputs, outputs, bindings, stream - -def infer(engine, inputs, outputs, bindings, stream, input_data): - """ - Performs inference on the input data using the TensorRT engine. - Args: - engine: The TensorRT engine. - inputs: List of input buffers. - outputs: List of output buffers. - bindings: List of bindings for the model. - stream: CUDA stream for the inference. - input_data: The data to be used as input for the model. - Returns: - output: The model's output. - """ - # Transfer input data to the device - np.copyto(inputs[0][0], input_data.ravel()) - cuda.memcpy_htod_async(inputs[0][1], inputs[0][0], stream) - - # Execute the model - context = engine.create_execution_context() - context.execute_async_v2(bindings=bindings, stream_handle=stream.handle) - - # Transfer predictions back from the GPU - cuda.memcpy_dtoh_async(outputs[0][0], outputs[0][1], stream) - - # Wait for the stream to complete the operation - stream.synchronize() - - return outputs[0][0] - -def test_trt_engine(trt_engine_path, input_shape, input_data=None, expected_output=None): - """ - Tests a TensorRT engine file by performing inference and checking outputs. - Args: - trt_engine_path: Path to the TensorRT engine file. - input_shape: Shape of the input data. - input_data: Optional input data. If None, random data will be generated. - expected_output: Optional expected output. If provided, it will be compared to the TensorRT inference result. - Returns: - True if the engine works and inference results match the expected output (if provided), otherwise False. - """ - # Load the TensorRT engine from the file - with open(trt_engine_path, "rb") as f: - runtime = trt.Runtime(trt.Logger(trt.Logger.WARNING)) - engine = runtime.deserialize_cuda_engine(f.read()) - - # Allocate buffers for inference - inputs, outputs, bindings, stream = allocate_buffers(engine) - - # Generate random input data if not provided + + # Generate random input data if not provided (should be cp) if input_data is None: input_data = np.random.rand(*input_shape).astype(np.float32) @@ -393,158 +43,4 @@ def test_trt_engine(trt_engine_path, input_shape, input_data=None, expected_outp return False else: print("No expected output provided. Unable to verify accuracy.") - return True # Pass as long as inference ran without errors - -# Example usage: -# Test TensorRT engine using random input -trt_engine_path = "model.trt" # Path to your TensorRT engine file -input_shape = (1, 3, 224, 224) # Adjust based on your model's input shape - -test_trt_engine(trt_engine_path, input_shape) - -input_data = np.random.rand(1, 3, 224, 224).astype(np.float32) # Example input, replace with actual data -expected_output = np.random.rand(1, 1000).astype(np.float32) # Example expected output (optional) -test_trt_engine("path_to_your_model.trt", (1, 3, 224, 224), input_data=input_data, expected_output=expected_output) - -### new! - -import time -import numpy as np -import cv2 -import pycuda.driver as cuda -import pycuda.autoinit -import tensorrt as trt -from onnxruntime import InferenceSession - -# Helper functions -def load_ground_truth(file_path): - """Load ground truth bounding boxes from text file.""" - with open(file_path, 'r') as f: - bboxes = [] - for line in f: - tokens = line.strip().split() - cls, x_center, y_center, width, height = map(float, tokens) - bboxes.append((cls, x_center, y_center, width, height)) - return bboxes - -def iou(boxA, boxB): - """Compute Intersection Over Union (IoU) between two bounding boxes.""" - xA = max(boxA[0], boxB[0]) - yA = max(boxA[1], boxB[1]) - xB = min(boxA[2], boxB[2]) - yB = min(boxA[3], boxB[3]) - - interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1) - boxAArea = (boxA[2] - boxA[0] + 1) * (boxA[3] - boxA[1] + 1) - boxBArea = (boxB[2] - boxB[0] + 1) * (boxB[3] - boxB[1] + 1) - - iou = interArea / float(boxAArea + boxBArea - interArea) - return iou - -def calculate_centroid_offset(pred_box, gt_box): - """Calculate percentage centroid offset between two boxes.""" - pred_center = (pred_box[0] + pred_box[2]) / 2, (pred_box[1] + pred_box[3]) / 2 - gt_center = (gt_box[0] + gt_box[2]) / 2, (gt_box[1] + gt_box[3]) / 2 - offset_x = abs(pred_center[0] - gt_center[0]) / (gt_box[2] - gt_box[0]) - offset_y = abs(pred_center[1] - gt_center[1]) / (gt_box[3] - gt_box[1]) - return (offset_x + offset_y) / 2 * 100 - -# Load TensorRT model -TRT_LOGGER = trt.Logger(trt.Logger.WARNING) - -class TRTInference: - def __init__(self, engine_path): - self.engine = self.load_engine(engine_path) - self.context = self.engine.create_execution_context() - - def load_engine(self, engine_path): - with open(engine_path, 'rb') as f, trt.Runtime(TRT_LOGGER) as runtime: - return runtime.deserialize_cuda_engine(f.read()) - - def allocate_buffers(self): - inputs = [] - outputs = [] - bindings = [] - stream = cuda.Stream() - - for binding in self.engine: - size = trt.volume(self.engine.get_binding_shape(binding)) * self.engine.max_batch_size - dtype = trt.nptype(self.engine.get_binding_dtype(binding)) - host_mem = cuda.pagelocked_empty(size, dtype) - device_mem = cuda.mem_alloc(host_mem.nbytes) - bindings.append(int(device_mem)) - - if self.engine.binding_is_input(binding): - inputs.append({'host': host_mem, 'device': device_mem}) - else: - outputs.append({'host': host_mem, 'device': device_mem}) - - return inputs, outputs, bindings, stream - - def infer(self, image, inputs, outputs, bindings, stream): - np.copyto(inputs[0]['host'], image.ravel()) - - # Transfer input data to the GPU. - cuda.memcpy_htod_async(inputs[0]['device'], inputs[0]['host'], stream) - - # Run inference. - self.context.execute_async_v2(bindings=bindings, stream_handle=stream.handle) - - # Transfer predictions back from GPU. - cuda.memcpy_dtoh_async(outputs[0]['host'], outputs[0]['device'], stream) - stream.synchronize() - - return outputs[0]['host'] - -def preprocess_image(image_path, input_shape): - """Preprocess image for inference.""" - image = cv2.imread(image_path) - image_resized = cv2.resize(image, (input_shape[1], input_shape[0])) - image = np.asarray(image_resized).astype(np.float32) - return np.transpose(image, (2, 0, 1)) / 255.0 # CHW format and normalized - -def run_benchmark(trt_model_path, test_images, ground_truth_path): - """Run benchmark on the model.""" - # Load ground truth - ground_truth_bboxes = load_ground_truth(ground_truth_path) - - # Initialize TensorRT inference - trt_infer = TRTInference(trt_model_path) - inputs, outputs, bindings, stream = trt_infer.allocate_buffers() - - inference_times = [] - iou_scores = [] - centroid_offsets = [] - - for idx, img_path in enumerate(test_images): - # Preprocess image - image = preprocess_image(img_path, (300, 300)) # Adjust size as needed - - # Perform inference and measure time - start_time = time.time() - pred_bbox = trt_infer.infer(image, inputs, outputs, bindings, stream) - inference_time = time.time() - start_time - - # Compute IoU, centroid offset - gt_bbox = ground_truth_bboxes[idx] - iou_score = iou(pred_bbox, gt_bbox) - offset = calculate_centroid_offset(pred_bbox, gt_bbox) - - # Store results - inference_times.append(inference_time) - iou_scores.append(iou_score) - centroid_offsets.append(offset) - - # Summary of benchmark - print(f"Average Inference Time: {np.mean(inference_times):.4f} seconds") - print(f"Average IoU: {np.mean(iou_scores) * 100:.2f}%") - print(f"Average Centroid Offset: {np.mean(centroid_offsets):.2f}%") - -if __name__ == "__main__": - trt_model_path = "model.trt" # Replace with your TensorRT model path - test_images = ["test1.jpg", "test2.jpg"] # Replace with your test images - ground_truth_path = "ground_truth.txt" # Replace with your ground truth file path - - run_benchmark(trt_model_path, test_images, ground_truth_path) - -# Create performance report based on relative bounding box centroid for sample images (accuracy %, error offset %, etc.) \ No newline at end of file + return True # Pass as long as inference ran without errors \ No newline at end of file diff --git a/tools/benchmarking/model-perf-tests.py b/tools/benchmarking/model-perf-tests.py index e69de29..c51444f 100644 --- a/tools/benchmarking/model-perf-tests.py +++ b/tools/benchmarking/model-perf-tests.py @@ -0,0 +1,91 @@ +import time +import numpy as np +import cv2 +import pycuda.driver as cuda +import pycuda.autoinit +import tensorrt as trt +from onnxruntime import InferenceSession + +# Helper functions +def load_ground_truth(file_path): + """Load ground truth bounding boxes from text file.""" + with open(file_path, 'r') as f: + bboxes = [] + for line in f: + tokens = line.strip().split() + cls, x_center, y_center, width, height = map(float, tokens) + bboxes.append((cls, x_center, y_center, width, height)) + return bboxes + +def iou(boxA, boxB): + """Compute Intersection Over Union (IoU) between two bounding boxes.""" + xA = max(boxA[0], boxB[0]) + yA = max(boxA[1], boxB[1]) + xB = min(boxA[2], boxB[2]) + yB = min(boxA[3], boxB[3]) + + interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1) + boxAArea = (boxA[2] - boxA[0] + 1) * (boxA[3] - boxA[1] + 1) + boxBArea = (boxB[2] - boxB[0] + 1) * (boxB[3] - boxB[1] + 1) + + iou = interArea / float(boxAArea + boxBArea - interArea) + return iou + +def calculate_centroid_offset(pred_box, gt_box): + """Calculate percentage centroid offset between two boxes.""" + pred_center = (pred_box[0] + pred_box[2]) / 2, (pred_box[1] + pred_box[3]) / 2 + gt_center = (gt_box[0] + gt_box[2]) / 2, (gt_box[1] + gt_box[3]) / 2 + offset_x = abs(pred_center[0] - gt_center[0]) / (gt_box[2] - gt_box[0]) + offset_y = abs(pred_center[1] - gt_center[1]) / (gt_box[3] - gt_box[1]) + return (offset_x + offset_y) / 2 * 100 + +def preprocess_image(image_path, input_shape): + """Preprocess image for inference.""" + image = cv2.imread(image_path) + image_resized = cv2.resize(image, (input_shape[1], input_shape[0])) + image = np.asarray(image_resized).astype(np.float32) + return np.transpose(image, (2, 0, 1)) / 255.0 # CHW format and normalized + +def run_benchmark(trt_model_path, test_images, ground_truth_path): + """Run benchmark on the model.""" + # Load ground truth + ground_truth_bboxes = load_ground_truth(ground_truth_path) + + # Initialize TensorRT inference + trt_infer = TRTInference(trt_model_path) + inputs, outputs, bindings, stream = trt_infer.allocate_buffers() + + inference_times = [] + iou_scores = [] + centroid_offsets = [] + + for idx, img_path in enumerate(test_images): + # Preprocess image + image = preprocess_image(img_path, (300, 300)) # Adjust size as needed + + # Perform inference and measure time + start_time = time.time() + pred_bbox = trt_infer.infer(image, inputs, outputs, bindings, stream) + inference_time = time.time() - start_time + + # Compute IoU, centroid offset + gt_bbox = ground_truth_bboxes[idx] + iou_score = iou(pred_bbox, gt_bbox) + offset = calculate_centroid_offset(pred_bbox, gt_bbox) + + # Store results + inference_times.append(inference_time) + iou_scores.append(iou_score) + centroid_offsets.append(offset) + + # Summary of benchmark + print(f"Average Inference Time: {np.mean(inference_times):.4f} seconds") + print(f"Average IoU: {np.mean(iou_scores) * 100:.2f}%") + print(f"Average Centroid Offset: {np.mean(centroid_offsets):.2f}%") + +if __name__ == "__main__": + trt_model_path = "model.trt" # Replace with your TensorRT model path + test_images = ["test1.jpg", "test2.jpg"] # Replace with your test images + ground_truth_path = "ground_truth.txt" # Replace with your ground truth file path + + run_benchmark(trt_model_path, test_images, ground_truth_path) \ No newline at end of file diff --git a/tools/testing/engine-test.py b/tools/testing/engine-test.py new file mode 100644 index 0000000..e69de29 diff --git a/tools/testing/inference-results.py b/tools/testing/inference-results.py new file mode 100644 index 0000000..048d4bb --- /dev/null +++ b/tools/testing/inference-results.py @@ -0,0 +1,50 @@ +# results should be dict with image, bounding boxes array, inference time +# model name for largest unnested +# should also take a model meta data dict with model name/path, scaling factors + +import os +import cv2 +import numpy as np +# import cupy as cp +import time +import logging +import tqdm + +# does this construct the thing from input? +# maybe has methods to append to appropriate list/dict + +logging.basicConfig(format='%(message)s', level=logging.INFO) + +class Results: + + # class property + # .results + + def __init__(self, results={}, gpu_support=True): + self.results = results + + if results == []: + raise ValueError("No results available") + else: + logging.info(f"{len(self.results)} results available") + # should be able to determine gpu_support based on datatype passed? + # does this construct the results object? + + def model_metadata(self): + pass + + def display_results(self): + pass + + def run_unit_tests(self): + pass + + def save_infer(self, results_path): + pass + + def save_test(self, tests_path): + pass + + def compare(self, other_results=[]): + # plot diffs + pass \ No newline at end of file diff --git a/tools/testing/model-unit-tests.py b/tools/testing/model-unit-tests.py index 5ef22af..560f328 100644 --- a/tools/testing/model-unit-tests.py +++ b/tools/testing/model-unit-tests.py @@ -2,6 +2,8 @@ import numpy as np from my_package.bbox_node import BBoxNode +# class for handling results with class methods for MSE, etc. + # get outputs of ultralytics and assert function diff between ultralytics and module vlaue is less than 0.5% # unit tests for also length of list (object count) @@ -107,3 +109,101 @@ def test_performance_report(self): unittest.main() # colcon test --packages-select my_package + +# given the predictions from the original model and the converted model, check if they are consistent +# shape of predictions_original and converted_results should be the same +# only checks for the predicted class (aka the argmax) +# takes in two 2D arrays: first dimension is the number of samples, second dimension is the number of classes and values correspond to confidence +def checkPredictionConsistency(predictions_original, converted_results): + for n in range(predictions_original.shape[0]): + if np.argmax(predictions_original[n]) != np.argmax(converted_results[n]): + print(f"Original: {np.argmax(predictions_original[n])}, ONNX: {np.argmax(converted_results[n])}") + print(f"{predictions_original[n]}, \n{converted_results[n]}") + print("=====================================") + raise ValueError("Predictions are not consistent") + + print("All predictions are consistent") + +# given the predictions from the original model and the converted model, check if they are consistent +# shape of predictions_original and converted_results should be the same +# only checks for the difference in confidence +# takes in two 2D arrays: first dimension is the number of samples, second dimension is the number of classes and values correspond to confidence +# tolerance: the maximum difference in confidence that is allowed +def checkConfidenceConsistency(predictions_original, converted_results, tolerance=1e-5): + np.testing.assert_allclose(predictions_original, converted_results,atol=tolerance) + # for n in range(predictions_original.shape[0]): + # if not np.allclose(predictions_original[n], converted_results[n], atol=tolerance): + # print(f"Original: \t {predictions_original[n]}, \nONNX: \t{converted_results[n]}") + # print("=====================================") + # return + + print("All confidence percentages are consistent") + +# put random input shape into CUDA if using CUDA provider? +def verify_onnx(model_path, compared_outputs, model_dimensions, fp_16): + print("Verifying the converted model") + onnx_output, onnx_inference = predict_onnx(model_path, fp_16, model_dimensions) + + print("ONNX inference time:", onnx_inference, "ms") + + # Calculate MSE (Mean Squared Error) + mse = np.mean((onnx_output - compared_outputs) ** 2) + print("MSE between ONNX and TensorRT outputs:", mse) + + # Calculate MAE (Mean Absolute Error) + mae = np.mean(np.abs(onnx_output - compared_outputs)) + print("MAE between ONNX and TensorRT outputs:", mae) + return + +def preprocess_image(image_path, input_shape): + """Preprocess image for inference.""" + image = cv2.imread(image_path) + image_resized = cv2.resize(image, (input_shape[1], input_shape[0])) + image = np.asarray(image_resized).astype(np.float32) + return np.transpose(image, (2, 0, 1)) / 255.0 # CHW format and normalized + +def run_benchmark(trt_model_path, test_images, ground_truth_path): + """Run benchmark on the model.""" + # Load ground truth + ground_truth_bboxes = load_ground_truth(ground_truth_path) + + # Initialize TensorRT inference + trt_infer = TRTInference(trt_model_path) + inputs, outputs, bindings, stream = trt_infer.allocate_buffers() + + inference_times = [] + iou_scores = [] + centroid_offsets = [] + + for idx, img_path in enumerate(test_images): + # Preprocess image + image = preprocess_image(img_path, (300, 300)) # Adjust size as needed + + # Perform inference and measure time + start_time = time.time() + pred_bbox = trt_infer.infer(image, inputs, outputs, bindings, stream) + inference_time = time.time() - start_time + + # Compute IoU, centroid offset + gt_bbox = ground_truth_bboxes[idx] + iou_score = iou(pred_bbox, gt_bbox) + offset = calculate_centroid_offset(pred_bbox, gt_bbox) + + # Store results + inference_times.append(inference_time) + iou_scores.append(iou_score) + centroid_offsets.append(offset) + + # Summary of benchmark + print(f"Average Inference Time: {np.mean(inference_times):.4f} seconds") + print(f"Average IoU: {np.mean(iou_scores) * 100:.2f}%") + print(f"Average Centroid Offset: {np.mean(centroid_offsets):.2f}%") + +if __name__ == "__main__": + trt_model_path = "model.trt" # Replace with your TensorRT model path + test_images = ["test1.jpg", "test2.jpg"] # Replace with your test images + ground_truth_path = "ground_truth.txt" # Replace with your ground truth file path + + run_benchmark(trt_model_path, test_images, ground_truth_path) + +# Create performance report based on relative bounding box centroid for sample images (accuracy %, error offset %, etc.) diff --git a/tools/testing/model_inference.py b/tools/testing/model_inference.py deleted file mode 100644 index 522e929..0000000 --- a/tools/testing/model_inference.py +++ /dev/null @@ -1,378 +0,0 @@ -# stream off: -from ultralytics import YOLO - -# Load a model -model = YOLO("yolov8n.pt") # pretrained YOLOv8n model - -# Run batched inference on a list of images -results = model(["image1.jpg", "image2.jpg"]) # return a list of Results objects - -# Process results list -for result in results: - boxes = result.boxes # Boxes object for bounding box outputs - masks = result.masks # Masks object for segmentation masks outputs - keypoints = result.keypoints # Keypoints object for pose outputs - probs = result.probs # Probs object for classification outputs - obb = result.obb # Oriented boxes object for OBB outputs - result.show() # display to screen - result.save(filename="result.jpg") # save to disk - -# stream on: -from ultralytics import YOLO - -# Load a model -model = YOLO("yolov8n.pt") # pretrained YOLOv8n model - -# Run batched inference on a list of images -results = model(["image1.jpg", "image2.jpg"], stream=True) # return a generator of Results objects - -# Process results generator -for result in results: - boxes = result.boxes # Boxes object for bounding box outputs - masks = result.masks # Masks object for segmentation masks outputs - keypoints = result.keypoints # Keypoints object for pose outputs - probs = result.probs # Probs object for classification outputs - obb = result.obb # Oriented boxes object for OBB outputs - result.show() # display to screen - result.save(filename="result.jpg") # save to disk - - -from ultralytics import YOLO - -# Load a pretrained YOLOv8n model -model = YOLO("yolov8n.pt") - -# Run inference on 'bus.jpg' with arguments -model.predict("bus.jpg", save=True, imgsz=320, conf=0.5) - -from ultralytics import YOLO - -# Load a pretrained YOLOv8n model -model = YOLO("yolov8n.pt") - -# Run inference on an image -results = model("bus.jpg") # results list - -# View results -for r in results: - print(r.boxes) # print the Boxes object containing the detection bounding boxes - -from PIL import Image - -from ultralytics import YOLO - -# Load a pretrained YOLOv8n model -model = YOLO("yolov8n.pt") - -# Run inference on 'bus.jpg' -results = model(["bus.jpg", "zidane.jpg"]) # results list - -# Visualize the results -for i, r in enumerate(results): - # Plot results image - im_bgr = r.plot() # BGR-order numpy array - im_rgb = Image.fromarray(im_bgr[..., ::-1]) # RGB-order PIL image - - # Show results to screen (in supported environments) - r.show() - - # Save results to disk - r.save(filename=f"results{i}.jpg") - -import cv2 -from ultralytics import YOLO -import time - -model = YOLO("/home/user/ROS/models/maize/Maize.engine") -image = cv2.imread("/home/user/ROS/assets/maize/IMG_1822_14.JPG") - -sum = 0 -# stream = True? -for _ in range(100): - tic = time.perf_counter_ns() - result = model.predict( - image, # batch=8 of the same image - verbose=False, - device="cuda", - ) - elapsed_time = (time.perf_counter_ns() - tic) / 1e6 - print(f"Elapsed time: {(elapsed_time):.2f} ms") - sum += elapsed_time - annotated_frame = result[0].plot() - cv2.imshow("YOLOv8 Inference", annotated_frame) - if cv2.waitKey(1) & 0xFF == ord("q"): - break - -avearage_time = (sum - 2660) / 100 -print(f"Average time: {avearage_time:.2f} ms") -cv2.destroyAllWindows() - -# source, conf, iou, imgz, half, visualize, agnostic_nms -# visualization arguments: - -import os -import cv2 -import numpy as np -import onnxruntime as ort -import time -import logging - -# Set up logging -logging.basicConfig(format='%(message)s', level=logging.INFO) - -class Model: - def __init__(self, model_path=None, onnx_model=None, input_size=(640, 640)): - """ - Initialize the model by loading the ONNX model with GPU (CUDA) support. - :param model_path: Path to the ONNX model file. - :param onnx_model: ONNX model object if passed directly. - :param input_size: Expected input size for the model (width, height). - """ - self.input_size = input_size # Model's expected input size - - if model_path: - logging.info(f"ONNX: starting from '{model_path}' with input shape (1, 3, {input_size[0]}, {input_size[1]}) BCHW") - self.session = ort.InferenceSession(model_path, providers=['CUDAExecutionProvider']) - elif onnx_model: - self.session = ort.InferenceSession(onnx_model.SerializeToString(), providers=['CUDAExecutionProvider']) - else: - raise ValueError("Either model_path or onnx_model must be provided.") - - # Input and output information from the ONNX model - self.input_name = self.session.get_inputs()[0].name - self.output_names = [output.name for output in self.session.get_outputs()] - - logging.info(f"ONNX: loaded successfully, using CUDA (GPU)") - - def load_images(self, image_dir='images/'): - """ - Load images from the specified directory. - :param image_dir: Directory containing the images to load. - :return: List of loaded image file paths. - """ - image_files = [os.path.join(image_dir, img) for img in os.listdir(image_dir) if img.endswith(('.png', '.jpg', '.jpeg'))] - return image_files - - def preprocess_image(self, image_path): - """ - Preprocess the image: resize, normalize, and convert to the required format for the model. - :param image_path: Path to the image file. - :return: Preprocessed image ready for inference, original image, and scaling factors. - """ - img = cv2.imread(image_path) - original_size = img.shape[:2] # Original size (height, width) - img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) - resized = cv2.resize(img, self.input_size) - # Normalize the image (assuming mean=0.5, std=0.5 for demonstration) - normalized = resized / 255.0 - normalized = (normalized - 0.5) / 0.5 - # HWC to CHW format for model input - input_tensor = np.transpose(normalized, (2, 0, 1)).astype(np.float32) - input_tensor = np.expand_dims(input_tensor, axis=0) # Add batch dimension - - # Compute scaling factors to map back to original size - scale_x = original_size[1] / self.input_size[0] - scale_y = original_size[0] / self.input_size[1] - - return input_tensor, img, (scale_x, scale_y) - - def post_process(self, output, original_image, scales, conf_threshold=0.5, iou_threshold=0.4): - """ - Post-process the model output to extract bounding boxes, confidence, and class scores. - Rescale the boxes back to the original image size. - :param output: Raw output from the model. - :param original_image: Original image for drawing bounding boxes. - :param scales: Scaling factors to map the boxes back to original size. - :param conf_threshold: Confidence score threshold for filtering detections. - :param iou_threshold: IOU threshold for non-maximum suppression (NMS). - :return: Image with annotated bounding boxes. - """ - scale_x, scale_y = scales - boxes = output[0] - filtered_boxes = [] - - # Iterate over boxes and filter by confidence - for box in boxes: - x1, y1, x2, y2, score, class_id = box - if score >= conf_threshold: - # Rescale box coordinates to the original image size - x1 *= scale_x - x2 *= scale_x - y1 *= scale_y - y2 *= scale_y - filtered_boxes.append([x1, y1, x2, y2, score, class_id]) - - # Apply Non-Maximum Suppression (NMS) - filtered_boxes = self.nms(filtered_boxes, iou_threshold) - - # Annotate the image with bounding boxes - for (x1, y1, x2, y2, score, class_id) in filtered_boxes: - # Draw bounding box - cv2.rectangle(original_image, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2) - # Put label and score - label = f"Class {int(class_id)}: {score:.2f}" - cv2.putText(original_image, label, (int(x1), int(y1)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2) - - return original_image, filtered_boxes - - def nms(self, boxes, iou_threshold): - """ - Perform Non-Maximum Suppression (NMS) on the bounding boxes. - :param boxes: List of boxes in the format [x1, y1, x2, y2, score, class_id]. - :param iou_threshold: Intersection-over-Union threshold for filtering overlapping boxes. - :return: Filtered list of bounding boxes after NMS. - """ - if len(boxes) == 0: - return [] - - boxes = sorted(boxes, key=lambda x: x[4], reverse=True) # Sort by confidence score - - keep_boxes = [] - while boxes: - chosen_box = boxes.pop(0) - keep_boxes.append(chosen_box) - boxes = [box for box in boxes if self.iou(chosen_box, box) < iou_threshold] - - return keep_boxes - - def iou(self, box1, box2): - """ - Calculate Intersection over Union (IoU) between two boxes. - :param box1: First box in the format [x1, y1, x2, y2, score, class_id]. - :param box2: Second box in the same format. - :return: IoU score. - """ - x1 = max(box1[0], box2[0]) - y1 = max(box1[1], box2[1]) - x2 = min(box1[2], box2[2]) - y2 = min(box1[3], box2[3]) - - inter_area = max(0, x2 - x1) * max(0, y2 - y1) - box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1]) - box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1]) - union_area = box1_area + box2_area - inter_area - - return inter_area / union_area - - def infer(self, image_dir='images/'): - """ - Perform inference on all images in the specified directory and display results with bounding boxes. - :param image_dir: Directory containing images to run inference on. - :return: None. Displays images with bounding boxes. - """ - image_files = self.load_images(image_dir) - results = {} - - for image_file in image_files: - input_tensor, original_image, scales = self.preprocess_image(image_file) - - # Log inference start - logging.info(f"Predict: ONNX inference started on {image_file}") - start_time = time.time() - - # Perform inference - outputs = self.session.run(self.output_names, {self.input_name: input_tensor}) - - # Measure inference time - inference_time = time.time() - start_time - logging.info(f"Inference complete in {inference_time:.2f} seconds.") - - # Post-process and display results - annotated_image, boxes = self.post_process(outputs, original_image, scales) - num_boxes = len(boxes) - - # Log number of bounding boxes and confidence - for i, box in enumerate(boxes): - logging.info(f"Box {i+1}: [x1={box[0]:.1f}, y1={box[1]:.1f}, x2={box[2]:.1f}, y2={box[3]:.1f}], score={box[4]:.2f}") - - logging.info(f"Total bounding boxes: {num_boxes}") - - # Show the image with bounding boxes - cv2.imshow('Inference Result', annotated_image) - cv2.waitKey(0) # Press any key to continue - cv2.destroyAllWindows() - -# Example usage: -if __name__ == "__main__": - model_path = 'model.onnx' - model = Model(model_path=model_path) - - # Log ONNX loading details - logging.info(f"ONNX: model loaded from '{model_path}', performing inference...") - - model.infer('test_images/') - -import time -import onnx -import onnxruntime as ort -import numpy as np - -# put random input shape into CUDA if using CUDA provider? -def verify_onnx(model_path, compared_outputs, model_dimensions, fp_16): - print("Verifying the converted model") - onnx_output, onnx_inference = predict_onnx(model_path, fp_16, model_dimensions) - - print("ONNX inference time:", onnx_inference, "ms") - - # Calculate MSE (Mean Squared Error) - mse = np.mean((onnx_output - compared_outputs) ** 2) - print("MSE between ONNX and TensorRT outputs:", mse) - - # Calculate MAE (Mean Absolute Error) - mae = np.mean(np.abs(onnx_output - compared_outputs)) - print("MAE between ONNX and TensorRT outputs:", mae) - return - -# any other chanes for fp_16 to work? -def predict_onnx(model_path, fp_16, input_shape): - - # random_input = np.random.randn(*input_shape).astype(np.float32) - - # # Run inference with ONNX - # input_name = onnx_session.get_inputs()[0].name - # onnx_output = onnx_session.run(None, {input_name: random_input}) - onnx_session = ort.InferenceSession(model_path,providers=["CUDAExecutionProvider"]) - - if fp_16: - random_input = np.random.randn(input_shape).astype(np.float16) - else: - random_input = np.random.randn(input_shape).astype(np.float32) - - input_name = onnx_session.get_inputs()[0].name - tic = time.perf_counter_ns() - # results_ort = session.run([out.name for out in session.get_outputs()], {session.get_inputs()[0].name: x_test}) - # results_ort = onnx_session.run([out.name for out in session.get_outputs()], {session.get_inputs()[0].name: model_test}) - onnx_output = onnx_session.run(None, {input_name: random_input}) - toc = time.perf_counter_ns() - onnx_output = onnx_output[0] - # onnx_output= np.array(onnx_output) - return onnx_output, (toc - tic) / 1e6 - -# given the predictions from the original model and the converted model, check if they are consistent -# shape of predictions_original and converted_results should be the same -# only checks for the predicted class (aka the argmax) -# takes in two 2D arrays: first dimension is the number of samples, second dimension is the number of classes and values correspond to confidence -def checkPredictionConsistency(predictions_original, converted_results): - for n in range(predictions_original.shape[0]): - if np.argmax(predictions_original[n]) != np.argmax(converted_results[n]): - print(f"Original: {np.argmax(predictions_original[n])}, ONNX: {np.argmax(converted_results[n])}") - print(f"{predictions_original[n]}, \n{converted_results[n]}") - print("=====================================") - raise ValueError("Predictions are not consistent") - - print("All predictions are consistent") - -# given the predictions from the original model and the converted model, check if they are consistent -# shape of predictions_original and converted_results should be the same -# only checks for the difference in confidence -# takes in two 2D arrays: first dimension is the number of samples, second dimension is the number of classes and values correspond to confidence -# tolerance: the maximum difference in confidence that is allowed -def checkConfidenceConsistency(predictions_original, converted_results, tolerance=1e-5): - np.testing.assert_allclose(predictions_original, converted_results,atol=tolerance) - # for n in range(predictions_original.shape[0]): - # if not np.allclose(predictions_original[n], converted_results[n], atol=tolerance): - # print(f"Original: \t {predictions_original[n]}, \nONNX: \t{converted_results[n]}") - # print("=====================================") - # return - - print("All confidence percentages are consistent") \ No newline at end of file diff --git a/tools/testing/onnx-test.py b/tools/testing/onnx-test.py new file mode 100644 index 0000000..c3bf2bb --- /dev/null +++ b/tools/testing/onnx-test.py @@ -0,0 +1,101 @@ +import os +import numpy as np +import onnxruntime as ort +import time +import logging +import tqdm + +logging.basicConfig(format='%(message)s', level=logging.INFO) + +# add compatibility with float 16? +# cupy, cp.float32, 16 +# relative path +# make a dict for each image, with inference time +# add gpu support (conditional imports) + +# should take gpu support in here + +class Model: + def __init__(self, model_path=None, onnx_model=None, input_size=(640, 640)): + """ + Initialize the model by loading the ONNX model with GPU (CUDA) support. + :param model_path: Path to the ONNX model file. + :param onnx_model: ONNX model object if passed directly. + :param input_size: Expected input size for the model (width, height). + """ + self.input_size = input_size # Model's expected input size + + if model_path: + logging.info(f"ONNX: starting from '{model_path}' with input shape (1, 3, {input_size[0]}, {input_size[1]}) BCHW") + self.session = ort.InferenceSession(model_path, providers=['CUDAExecutionProvider']) + elif onnx_model: + self.session = ort.InferenceSession(onnx_model.SerializeToString(), providers=['CUDAExecutionProvider']) + else: + raise ValueError("Either model_path or onnx_model must be provided.") + + # Input and output information from the ONNX model + self.input_name = self.session.get_inputs()[0].name + self.output_names = [output.name for output in self.session.get_outputs()] + + logging.info(f"ONNX: loaded successfully, using CUDA (GPU)") + + def load_images(self, image_dir='images/'): + """ + Load images from the specified directory. + :param image_dir: Directory containing the images to load. + :return: List of loaded image file paths. + """ + image_files = [os.path.join(image_dir, img) for img in os.listdir(image_dir) if img.endswith(('.png', '.jpg', '.jpeg'))] + return image_files + + def infer(self, image_dir='images/'): + """ + Perform inference on all images in the specified directory and display results with bounding boxes. + :param image_dir: Directory containing images to run inference on. + :return: None. Displays images with bounding boxes. + """ + image_files = self.load_images(image_dir) + results = {} + + # make tqdm status bar here and call the func inference call + # from model_inference.py with appropriate args + + + for image_file in image_files: + input_tensor, original_image, scales = self.preprocess_image(image_file) + + # Log inference start + logging.info(f"Predict: ONNX inference started on {image_file}") + start_time = time.time() + + # Perform inference + outputs = self.session.run(self.output_names, {self.input_name: input_tensor}) + + # Measure inference time + inference_time = time.time() - start_time + logging.info(f"Inference complete in {inference_time:.2f} seconds.") + + # Post-process and display results + annotated_image, boxes = self.post_process(outputs, original_image, scales) + num_boxes = len(boxes) + + # Log number of bounding boxes and confidence + for i, box in enumerate(boxes): + logging.info(f"Box {i+1}: [x1={box[0]:.1f}, y1={box[1]:.1f}, x2={box[2]:.1f}, y2={box[3]:.1f}], score={box[4]:.2f}") + + logging.info(f"Total bounding boxes: {num_boxes}") + + # Show the image with bounding boxes + cv2.imshow('Inference Result', annotated_image) + cv2.waitKey(0) # Press any key to continue + cv2.destroyAllWindows() + +# Example usage: +if __name__ == "__main__": + model_path = 'model.onnx' + model = Model(model_path=model_path) + + # Log ONNX loading details + logging.info(f"ONNX: model loaded from '{model_path}', performing inference...") + + model.infer('test_images/') \ No newline at end of file diff --git a/tools/testing/results-unit-tests.py b/tools/testing/results-unit-tests.py new file mode 100644 index 0000000..6c689e8 --- /dev/null +++ b/tools/testing/results-unit-tests.py @@ -0,0 +1 @@ +# should reference the results class? \ No newline at end of file diff --git a/tools/testing/ros-unit-tests.py b/tools/testing/ros-unit-tests.py index e69de29..616dc96 100644 --- a/tools/testing/ros-unit-tests.py +++ b/tools/testing/ros-unit-tests.py @@ -0,0 +1,107 @@ +# test_bbox_accuracy.py + +import unittest +import numpy as np +import rosbag2_py +from some_custom_pkg.msg import BoundingBox # Import your bounding box message type +from datetime import datetime + +class TestBoundingBoxAccuracy(unittest.TestCase): + def setUp(self): + # Load the ground truth data from the .txt file + self.ground_truth = self.load_ground_truth("ground_truth.txt") + + # Initialize rosbag reader + storage_options = rosbag2_py.StorageOptions(uri="path/to/rosbag", storage_id="sqlite3") + converter_options = rosbag2_py.ConverterOptions("", "") + self.reader = rosbag2_py.SequentialReader() + self.reader.open(storage_options, converter_options) + + # Set the topic to read bounding box messages + self.reader.set_filter(rosbag2_py.TopicFilter(topic_name="/bbox_topic")) + + def load_ground_truth(self, filepath): + """Load ground truth bounding boxes from a .txt file.""" + ground_truth_data = [] + with open(filepath, 'r') as file: + for line in file: + parts = line.strip().split() + timestamp = float(parts[0]) + bbox = list(map(int, parts[1:5])) + ground_truth_data.append((timestamp, bbox)) + return ground_truth_data + + def calculate_iou(self, bbox1, bbox2): + """Calculate Intersection over Union (IoU) between two bounding boxes.""" + x_min1, y_min1, x_max1, y_max1 = bbox1 + x_min2, y_min2, x_max2, y_max2 = bbox2 + + # Calculate intersection + x_min_inter = max(x_min1, x_min2) + y_min_inter = max(y_min1, y_min2) + x_max_inter = min(x_max1, x_max2) + y_max_inter = min(y_max1, y_max2) + + if x_min_inter >= x_max_inter or y_min_inter >= y_max_inter: + return 0.0 # No overlap + + intersection_area = (x_max_inter - x_min_inter) * (y_max_inter - y_min_inter) + + # Calculate union + area1 = (x_max1 - x_min1) * (y_max1 - y_min1) + area2 = (x_max2 - x_min2) * (y_max2 - y_min2) + union_area = area1 + area2 - intersection_area + + return intersection_area / union_area + + def test_bounding_box_accuracy(self): + """Compare bounding box predictions in ROS bag with ground truth.""" + tolerance = 0.1 # IoU threshold for considering a match + ground_truth_idx = 0 # Index for tracking ground truth entries + + while self.reader.has_next(): + (topic, msg, t) = self.reader.read_next() + timestamp_ros = t / 1e9 # Convert nanoseconds to seconds + bbox_ros = [msg.x_min, msg.y_min, msg.x_max, msg.y_max] + + # Find the closest ground truth bbox based on timestamp + timestamp_gt, bbox_gt = self.ground_truth[ground_truth_idx] + + # If the ROS bag timestamp matches the ground truth timestamp, compare bboxes + if abs(timestamp_ros - timestamp_gt) < 0.05: # 50ms tolerance + iou = self.calculate_iou(bbox_ros, bbox_gt) + self.assertGreaterEqual(iou, tolerance, f"Low IoU ({iou}) at time {timestamp_ros}") + + # Optional: Calculate other metrics + offset = np.linalg.norm(np.array(bbox_ros) - np.array(bbox_gt)) + self.assertLessEqual(offset, 10, f"High offset ({offset}) at time {timestamp_ros}") + + # Move to next ground truth entry + ground_truth_idx += 1 + + elif timestamp_ros < timestamp_gt: + # If ROS bag timestamp is earlier than ground truth, continue to next ROS msg + continue + else: + # If ROS bag timestamp is later, increment ground truth index to catch up + ground_truth_idx += 1 + + # Stop if we run out of ground truth data + if ground_truth_idx >= len(self.ground_truth): + break + + def tearDown(self): + self.reader.close() + +if __name__ == '__main__': + unittest.main() + + +# ground truth format: +# +# + +# metrics for comparison: +# Bounding Box Overlap: Calculate Intersection over Union (IoU) between the ground truth bounding box and the predicted bounding box. +# Coordinate Accuracy: Calculate the offset of each bounding box coordinate. +# FPS Consistency: Check that the timestamps in the bag data follow the expected frequency. \ No newline at end of file diff --git a/tools/utils/engine-inference.py b/tools/utils/engine-inference.py new file mode 100644 index 0000000..5b9278f --- /dev/null +++ b/tools/utils/engine-inference.py @@ -0,0 +1,72 @@ +import tensorrt as trt +import pycuda.driver as cuda +import pycuda.autoinit # Automatically initializes CUDA driver +import numpy as np +import cupy as cp +import time +import torch + +# should add quantized and fp16 + +# Load TensorRT model +TRT_LOGGER = trt.Logger(trt.Logger.WARNING) + +class TRTEngine: + def __init__(self, engine_path): + self.engine = self.load_engine(engine_path) + self.context = self.engine.create_execution_context() + # should also allocate buffers in this stage + + def load_engine(self, engine_path): + with open(engine_path, 'rb') as f, trt.Runtime(TRT_LOGGER) as runtime: + return runtime.deserialize_cuda_engine(f.read()) + + def allocate_buffers(self): + inputs = [] + outputs = [] + bindings = [] + stream = cuda.Stream() + + for binding in self.engine: + size = trt.volume(self.engine.get_binding_shape(binding)) * self.engine.max_batch_size + dtype = trt.nptype(self.engine.get_binding_dtype(binding)) + # Allocate host and device buffers? + host_mem = cuda.pagelocked_empty(size, dtype) + device_mem = cuda.mem_alloc(host_mem.nbytes) + + # host_mem = torch.empty(size, dtype=torch.float32).cuda() + # inputs.append(host_mem) + # bindings.append(int(host_mem.data_ptr())) + + # Append the device buffer to device binding + bindings.append(int(device_mem)) + + # Append to the appropriate list + if self.engine.binding_is_input(binding): + inputs.append({'host': host_mem, 'device': device_mem}) + else: + outputs.append({'host': host_mem, 'device': device_mem}) + + return inputs, outputs, bindings, stream + + # performs inference on the input data using the TensorRT engine + def infer(engine, inputs, outputs, bindings, stream, input_data): + # should assign buffers in self initialization earlier and reference just self + + # Transfer input data to the device + np.copyto(inputs[0][0], input_data.ravel()) + cuda.memcpy_htod_async(inputs[0][1], inputs[0][0], stream) + + # Execute the model + context = engine.create_execution_context() + start_time = time.perf_counter_ns() + context.execute_async_v2(bindings=bindings, stream_handle=stream.handle) + end_time = time.perf_counter_ns() + + # Transfer predictions back from the GPU + cuda.memcpy_dtoh_async(outputs[0][0], outputs[0][1], stream) + + # Wait for the stream to complete the operation + stream.synchronize() + + return outputs[0][0], (end_time - start_time)/1e6 \ No newline at end of file diff --git a/tools/utils/model-inference.py b/tools/utils/model-inference.py new file mode 100644 index 0000000..4757e2d --- /dev/null +++ b/tools/utils/model-inference.py @@ -0,0 +1,141 @@ +# stream off: +from ultralytics import YOLO + +# Load a model +model = YOLO("yolov8n.pt") # pretrained YOLOv8n model + +# Run batched inference on a list of images +results = model(["image1.jpg", "image2.jpg"]) # return a list of Results objects + +# Process results list +for result in results: + boxes = result.boxes # Boxes object for bounding box outputs + masks = result.masks # Masks object for segmentation masks outputs + keypoints = result.keypoints # Keypoints object for pose outputs + probs = result.probs # Probs object for classification outputs + obb = result.obb # Oriented boxes object for OBB outputs + result.show() # display to screen + result.save(filename="result.jpg") # save to disk + +# stream on: +from ultralytics import YOLO + +# Load a model +model = YOLO("yolov8n.pt") # pretrained YOLOv8n model + +# Run batched inference on a list of images +results = model(["image1.jpg", "image2.jpg"], stream=True) # return a generator of Results objects + +# Process results generator +for result in results: + boxes = result.boxes # Boxes object for bounding box outputs + masks = result.masks # Masks object for segmentation masks outputs + keypoints = result.keypoints # Keypoints object for pose outputs + probs = result.probs # Probs object for classification outputs + obb = result.obb # Oriented boxes object for OBB outputs + result.show() # display to screen + result.save(filename="result.jpg") # save to disk + + +from ultralytics import YOLO + +# Load a pretrained YOLOv8n model +model = YOLO("yolov8n.pt") + +# Run inference on 'bus.jpg' with arguments +model.predict("bus.jpg", save=True, imgsz=320, conf=0.5) + +from ultralytics import YOLO + +# Load a pretrained YOLOv8n model +model = YOLO("yolov8n.pt") + +# Run inference on an image +results = model("bus.jpg") # results list + +# View results +for r in results: + print(r.boxes) # print the Boxes object containing the detection bounding boxes + +from PIL import Image + +from ultralytics import YOLO + +# Load a pretrained YOLOv8n model +model = YOLO("yolov8n.pt") + +# Run inference on 'bus.jpg' +results = model(["bus.jpg", "zidane.jpg"]) # results list + +# Visualize the results +for i, r in enumerate(results): + # Plot results image + im_bgr = r.plot() # BGR-order numpy array + im_rgb = Image.fromarray(im_bgr[..., ::-1]) # RGB-order PIL image + + # Show results to screen (in supported environments) + r.show() + + # Save results to disk + r.save(filename=f"results{i}.jpg") + +import cv2 +from ultralytics import YOLO +import time + +model = YOLO("/home/user/ROS/models/maize/Maize.engine") +image = cv2.imread("/home/user/ROS/assets/maize/IMG_1822_14.JPG") + +sum = 0 +# stream = True? +for _ in range(100): + tic = time.perf_counter_ns() + result = model.predict( + image, # batch=8 of the same image + verbose=False, + device="cuda", + ) + elapsed_time = (time.perf_counter_ns() - tic) / 1e6 + print(f"Elapsed time: {(elapsed_time):.2f} ms") + sum += elapsed_time + annotated_frame = result[0].plot() + cv2.imshow("YOLOv8 Inference", annotated_frame) + if cv2.waitKey(1) & 0xFF == ord("q"): + break + +avearage_time = (sum - 2660) / 100 +print(f"Average time: {avearage_time:.2f} ms") +cv2.destroyAllWindows() + +# source, conf, iou, imgz, half, visualize, agnostic_nms +# visualization arguments: + +import time +import onnx +import onnxruntime as ort +import numpy as np + +# any other chanes for fp_16 to work? +def predict_onnx(model_path, fp_16, input_shape): + + # random_input = np.random.randn(*input_shape).astype(np.float32) + + # # Run inference with ONNX + # input_name = onnx_session.get_inputs()[0].name + # onnx_output = onnx_session.run(None, {input_name: random_input}) + onnx_session = ort.InferenceSession(model_path,providers=["CUDAExecutionProvider"]) + + if fp_16: + random_input = np.random.randn(input_shape).astype(np.float16) + else: + random_input = np.random.randn(input_shape).astype(np.float32) + + input_name = onnx_session.get_inputs()[0].name + tic = time.perf_counter_ns() + # results_ort = session.run([out.name for out in session.get_outputs()], {session.get_inputs()[0].name: x_test}) + # results_ort = onnx_session.run([out.name for out in session.get_outputs()], {session.get_inputs()[0].name: model_test}) + onnx_output = onnx_session.run(None, {input_name: random_input}) + toc = time.perf_counter_ns() + onnx_output = onnx_output[0] + # onnx_output= np.array(onnx_output) + return onnx_output, (toc - tic) / 1e6 \ No newline at end of file diff --git a/tools/testing/model_post.py b/tools/utils/post-process.py similarity index 56% rename from tools/testing/model_post.py rename to tools/utils/post-process.py index 2532536..92b7e4a 100644 --- a/tools/testing/model_post.py +++ b/tools/utils/post-process.py @@ -1,130 +1,211 @@ -# Ultralytics YOLO 🚀, AGPL-3.0 license - -import argparse - -import cv2.dnn -import numpy as np - -from ultralytics.utils import ASSETS, yaml_load -from ultralytics.utils.checks import check_yaml - -CLASSES = yaml_load(check_yaml("coco8.yaml"))["names"] -colors = np.random.uniform(0, 255, size=(len(CLASSES), 3)) - - -def draw_bounding_box(img, class_id, confidence, x, y, x_plus_w, y_plus_h): - """ - Draws bounding boxes on the input image based on the provided arguments. - - Args: - img (numpy.ndarray): The input image to draw the bounding box on. - class_id (int): Class ID of the detected object. - confidence (float): Confidence score of the detected object. - x (int): X-coordinate of the top-left corner of the bounding box. - y (int): Y-coordinate of the top-left corner of the bounding box. - x_plus_w (int): X-coordinate of the bottom-right corner of the bounding box. - y_plus_h (int): Y-coordinate of the bottom-right corner of the bounding box. - """ - label = f"{CLASSES[class_id]} ({confidence:.2f})" - color = colors[class_id] - cv2.rectangle(img, (x, y), (x_plus_w, y_plus_h), color, 2) - cv2.putText(img, label, (x - 10, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) - - -def main(onnx_model, input_image): - """ - Main function to load ONNX model, perform inference, draw bounding boxes, and display the output image. - - Args: - onnx_model (str): Path to the ONNX model. - input_image (str): Path to the input image. - - Returns: - list: List of dictionaries containing detection information such as class_id, class_name, confidence, etc. - """ - # Load the ONNX model - model: cv2.dnn.Net = cv2.dnn.readNetFromONNX(onnx_model) - - # Read the input image - original_image: np.ndarray = cv2.imread(input_image) - [height, width, _] = original_image.shape - - # Prepare a square image for inference - length = max((height, width)) - image = np.zeros((length, length, 3), np.uint8) - image[0:height, 0:width] = original_image - - # Calculate scale factor - scale = length / 640 - - # Preprocess the image and prepare blob for model - blob = cv2.dnn.blobFromImage(image, scalefactor=1 / 255, size=(640, 640), swapRB=True) - model.setInput(blob) - - # Perform inference - outputs = model.forward() - - # Prepare output array - outputs = np.array([cv2.transpose(outputs[0])]) - rows = outputs.shape[1] - - boxes = [] - scores = [] - class_ids = [] - - # Iterate through output to collect bounding boxes, confidence scores, and class IDs - for i in range(rows): - classes_scores = outputs[0][i][4:] - (minScore, maxScore, minClassLoc, (x, maxClassIndex)) = cv2.minMaxLoc(classes_scores) - if maxScore >= 0.25: - box = [ - outputs[0][i][0] - (0.5 * outputs[0][i][2]), - outputs[0][i][1] - (0.5 * outputs[0][i][3]), - outputs[0][i][2], - outputs[0][i][3], - ] - boxes.append(box) - scores.append(maxScore) - class_ids.append(maxClassIndex) - - # Apply NMS (Non-maximum suppression) - result_boxes = cv2.dnn.NMSBoxes(boxes, scores, 0.25, 0.45, 0.5) - - detections = [] - - # Iterate through NMS results to draw bounding boxes and labels - for i in range(len(result_boxes)): - index = result_boxes[i] - box = boxes[index] - detection = { - "class_id": class_ids[index], - "class_name": CLASSES[class_ids[index]], - "confidence": scores[index], - "box": box, - "scale": scale, - } - detections.append(detection) - draw_bounding_box( - original_image, - class_ids[index], - scores[index], - round(box[0] * scale), - round(box[1] * scale), - round((box[0] + box[2]) * scale), - round((box[1] + box[3]) * scale), - ) - - # Display the image with bounding boxes - cv2.imshow("image", original_image) - cv2.waitKey(0) - cv2.destroyAllWindows() - - return detections - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--model", default="yolov8n.onnx", help="Input your ONNX model.") - parser.add_argument("--img", default=str(ASSETS / "bus.jpg"), help="Path to input image.") - args = parser.parse_args() - main(args.model, args.img) +# Ultralytics YOLO 🚀, AGPL-3.0 license + +# need gpu +# nms + +import argparse + +import cv2.dnn +import numpy as np + +from ultralytics.utils import ASSETS, yaml_load +from ultralytics.utils.checks import check_yaml + +CLASSES = yaml_load(check_yaml("coco8.yaml"))["names"] +colors = np.random.uniform(0, 255, size=(len(CLASSES), 3)) + + +def draw_bounding_box(img, class_id, confidence, x, y, x_plus_w, y_plus_h): + """ + Draws bounding boxes on the input image based on the provided arguments. + + Args: + img (numpy.ndarray): The input image to draw the bounding box on. + class_id (int): Class ID of the detected object. + confidence (float): Confidence score of the detected object. + x (int): X-coordinate of the top-left corner of the bounding box. + y (int): Y-coordinate of the top-left corner of the bounding box. + x_plus_w (int): X-coordinate of the bottom-right corner of the bounding box. + y_plus_h (int): Y-coordinate of the bottom-right corner of the bounding box. + """ + label = f"{CLASSES[class_id]} ({confidence:.2f})" + color = colors[class_id] + cv2.rectangle(img, (x, y), (x_plus_w, y_plus_h), color, 2) + cv2.putText(img, label, (x - 10, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) + + +def main(onnx_model, input_image): + """ + Main function to load ONNX model, perform inference, draw bounding boxes, and display the output image. + + Args: + onnx_model (str): Path to the ONNX model. + input_image (str): Path to the input image. + + Returns: + list: List of dictionaries containing detection information such as class_id, class_name, confidence, etc. + """ + # Load the ONNX model + model: cv2.dnn.Net = cv2.dnn.readNetFromONNX(onnx_model) + + # Read the input image + original_image: np.ndarray = cv2.imread(input_image) + [height, width, _] = original_image.shape + + # Prepare a square image for inference + length = max((height, width)) + image = np.zeros((length, length, 3), np.uint8) + image[0:height, 0:width] = original_image + + # Calculate scale factor + scale = length / 640 + + # Preprocess the image and prepare blob for model + blob = cv2.dnn.blobFromImage(image, scalefactor=1 / 255, size=(640, 640), swapRB=True) + model.setInput(blob) + + # Perform inference + outputs = model.forward() + + # Prepare output array + outputs = np.array([cv2.transpose(outputs[0])]) + rows = outputs.shape[1] + + boxes = [] + scores = [] + class_ids = [] + + # Iterate through output to collect bounding boxes, confidence scores, and class IDs + for i in range(rows): + classes_scores = outputs[0][i][4:] + (minScore, maxScore, minClassLoc, (x, maxClassIndex)) = cv2.minMaxLoc(classes_scores) + if maxScore >= 0.25: + box = [ + outputs[0][i][0] - (0.5 * outputs[0][i][2]), + outputs[0][i][1] - (0.5 * outputs[0][i][3]), + outputs[0][i][2], + outputs[0][i][3], + ] + boxes.append(box) + scores.append(maxScore) + class_ids.append(maxClassIndex) + + # Apply NMS (Non-maximum suppression) + result_boxes = cv2.dnn.NMSBoxes(boxes, scores, 0.25, 0.45, 0.5) + + detections = [] + + # Iterate through NMS results to draw bounding boxes and labels + for i in range(len(result_boxes)): + index = result_boxes[i] + box = boxes[index] + detection = { + "class_id": class_ids[index], + "class_name": CLASSES[class_ids[index]], + "confidence": scores[index], + "box": box, + "scale": scale, + } + detections.append(detection) + draw_bounding_box( + original_image, + class_ids[index], + scores[index], + round(box[0] * scale), + round(box[1] * scale), + round((box[0] + box[2]) * scale), + round((box[1] + box[3]) * scale), + ) + + # Display the image with bounding boxes + cv2.imshow("image", original_image) + cv2.waitKey(0) + cv2.destroyAllWindows() + + return detections + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--model", default="yolov8n.onnx", help="Input your ONNX model.") + parser.add_argument("--img", default=str(ASSETS / "bus.jpg"), help="Path to input image.") + args = parser.parse_args() + main(args.model, args.img) + + def post_process(self, output, original_image, scales, conf_threshold=0.5, iou_threshold=0.4): + """ + Post-process the model output to extract bounding boxes, confidence, and class scores. + Rescale the boxes back to the original image size. + :param output: Raw output from the model. + :param original_image: Original image for drawing bounding boxes. + :param scales: Scaling factors to map the boxes back to original size. + :param conf_threshold: Confidence score threshold for filtering detections. + :param iou_threshold: IOU threshold for non-maximum suppression (NMS). + :return: Image with annotated bounding boxes. + """ + scale_x, scale_y = scales + boxes = output[0] + filtered_boxes = [] + + # Iterate over boxes and filter by confidence + for box in boxes: + x1, y1, x2, y2, score, class_id = box + if score >= conf_threshold: + # Rescale box coordinates to the original image size + x1 *= scale_x + x2 *= scale_x + y1 *= scale_y + y2 *= scale_y + filtered_boxes.append([x1, y1, x2, y2, score, class_id]) + + # Apply Non-Maximum Suppression (NMS) + filtered_boxes = self.nms(filtered_boxes, iou_threshold) + + # Annotate the image with bounding boxes + for (x1, y1, x2, y2, score, class_id) in filtered_boxes: + # Draw bounding box + cv2.rectangle(original_image, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2) + # Put label and score + label = f"Class {int(class_id)}: {score:.2f}" + cv2.putText(original_image, label, (int(x1), int(y1)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2) + + return original_image, filtered_boxes + + def nms(self, boxes, iou_threshold): + """ + Perform Non-Maximum Suppression (NMS) on the bounding boxes. + :param boxes: List of boxes in the format [x1, y1, x2, y2, score, class_id]. + :param iou_threshold: Intersection-over-Union threshold for filtering overlapping boxes. + :return: Filtered list of bounding boxes after NMS. + """ + if len(boxes) == 0: + return [] + + boxes = sorted(boxes, key=lambda x: x[4], reverse=True) # Sort by confidence score + + keep_boxes = [] + while boxes: + chosen_box = boxes.pop(0) + keep_boxes.append(chosen_box) + boxes = [box for box in boxes if self.iou(chosen_box, box) < iou_threshold] + + return keep_boxes + + def iou(self, box1, box2): + """ + Calculate Intersection over Union (IoU) between two boxes. + :param box1: First box in the format [x1, y1, x2, y2, score, class_id]. + :param box2: Second box in the same format. + :return: IoU score. + """ + x1 = max(box1[0], box2[0]) + y1 = max(box1[1], box2[1]) + x2 = min(box1[2], box2[2]) + y2 = min(box1[3], box2[3]) + + inter_area = max(0, x2 - x1) * max(0, y2 - y1) + box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1]) + box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1]) + union_area = box1_area + box2_area - inter_area + + return inter_area / union_area \ No newline at end of file diff --git a/tools/utils/pre-process.py b/tools/utils/pre-process.py new file mode 100644 index 0000000..d17264f --- /dev/null +++ b/tools/utils/pre-process.py @@ -0,0 +1,22 @@ +def preprocess_image(self, image_path): + """ + Preprocess the image: resize, normalize, and convert to the required format for the model. + :param image_path: Path to the image file. + :return: Preprocessed image ready for inference, original image, and scaling factors. + """ + img = cv2.imread(image_path) + original_size = img.shape[:2] # Original size (height, width) + img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) + resized = cv2.resize(img, self.input_size) + # Normalize the image (assuming mean=0.5, std=0.5 for demonstration) + normalized = resized / 255.0 + normalized = (normalized - 0.5) / 0.5 + # HWC to CHW format for model input + input_tensor = np.transpose(normalized, (2, 0, 1)).astype(np.float32) + input_tensor = np.expand_dims(input_tensor, axis=0) # Add batch dimension + + # Compute scaling factors to map back to original size + scale_x = original_size[1] / self.input_size[0] + scale_y = original_size[0] / self.input_size[1] + + return input_tensor, img, (scale_x, scale_y) \ No newline at end of file diff --git a/tools/video/mp4-recorder.py b/tools/video/mp4-recorder.py new file mode 100644 index 0000000..8d83d99 --- /dev/null +++ b/tools/video/mp4-recorder.py @@ -0,0 +1,71 @@ +import sys +import cv2 +import numpy as np +import pyzed.sl as sl + +def record_zed_to_mp4(output_file="output.mp4", fps=30, duration=10): + # Create a ZED camera object + zed = sl.Camera() + + # Set initialization parameters + init_params = sl.InitParameters() + init_params.camera_resolution = sl.RESOLUTION.HD720 # Set resolution + init_params.camera_fps = fps # Set FPS + + # Open the ZED camera + if zed.open(init_params) != sl.ERROR_CODE.SUCCESS: + print("Failed to open ZED camera") + zed.close() + sys.exit(1) + + # Get the resolution of the ZED camera + image_size = zed.get_camera_information().camera_resolution + width = image_size.width + height = image_size.height + + # Set up the OpenCV video writer + fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for .mp4 + video_writer = cv2.VideoWriter(output_file, fourcc, fps, (width, height)) + + # Check if video writer opened successfully + if not video_writer.isOpened(): + print("Failed to open video writer") + zed.close() + sys.exit(1) + + # Prepare the runtime parameters + runtime_parameters = sl.RuntimeParameters() + + # Main loop + frame_count = int(duration * fps) + for i in range(frame_count): + # Grab the current image + if zed.grab(runtime_parameters) == sl.ERROR_CODE.SUCCESS: + # Retrieve left image in RGBA format + zed_image = sl.Mat() + zed.retrieve_image(zed_image, sl.VIEW.LEFT) + + # Convert ZED image to numpy array + frame = zed_image.get_data() + frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2RGB) # Convert RGBA to RGB + + # Write the frame to the video file + video_writer.write(frame) + + # Optional: Display the frame (press 'q' to exit early) + cv2.imshow("ZED Video", frame) + if cv2.waitKey(1) & 0xFF == ord('q'): + break + else: + print("Frame grab failed") + break + + # Release resources + video_writer.release() + zed.close() + cv2.destroyAllWindows() + print(f"Video saved as {output_file}") + +if __name__ == "__main__": + # Parameters: output file name, FPS, duration in seconds + record_zed_to_mp4("output.mp4", fps=30, duration=10) diff --git a/tools/video/svo-recorder.py b/tools/video/svo-recorder.py new file mode 100644 index 0000000..4cd5f20 --- /dev/null +++ b/tools/video/svo-recorder.py @@ -0,0 +1,70 @@ +import sys +import cv2 +import pyzed.sl as sl + +def convert_svo_to_mp4(svo_file, output_file="output.mp4", fps=30): + # Create a ZED camera object for reading the SVO + zed = sl.Camera() + + # Set initialization parameters for reading the SVO file + init_params = sl.InitParameters() + init_params.set_from_svo_file(svo_file) + init_params.svo_real_time_mode = False # Disable real-time mode for faster reading + + # Open the SVO file + if zed.open(init_params) != sl.ERROR_CODE.SUCCESS: + print("Failed to open SVO file") + zed.close() + sys.exit(1) + + # Get image size from the SVO file + image_size = zed.get_camera_information().camera_resolution + width = image_size.width + height = image_size.height + + # Set up the OpenCV video writer for the MP4 output + fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for .mp4 + video_writer = cv2.VideoWriter(output_file, fourcc, fps, (width, height)) + + # Check if video writer opened successfully + if not video_writer.isOpened(): + print("Failed to open video writer") + zed.close() + sys.exit(1) + + # Prepare runtime parameters + runtime_parameters = sl.RuntimeParameters() + + # Loop through each frame in the SVO file + while True: + # Grab frame from the SVO file + if zed.grab(runtime_parameters) == sl.ERROR_CODE.SUCCESS: + # Retrieve left image + zed_image = sl.Mat() + zed.retrieve_image(zed_image, sl.VIEW.LEFT) + + # Convert ZED image to numpy array for OpenCV + frame = zed_image.get_data() + frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2RGB) # Convert RGBA to RGB + + # Write frame to MP4 file + video_writer.write(frame) + + # Optional: Display frame (press 'q' to exit early) + cv2.imshow("SVO to MP4 Conversion", frame) + if cv2.waitKey(1) & 0xFF == ord('q'): + break + else: + # End of SVO file + break + + # Release resources + video_writer.release() + zed.close() + cv2.destroyAllWindows() + print(f"Conversion complete. Video saved as {output_file}") + +if __name__ == "__main__": + # Specify the SVO file path and output MP4 file + svo_file = "input.svo" + convert_svo_to_mp4(svo_file, output_file="output.mp4", fps=30) diff --git a/tools/video/video-infer.py b/tools/video/video-infer.py new file mode 100644 index 0000000..61b7dd0 --- /dev/null +++ b/tools/video/video-infer.py @@ -0,0 +1,99 @@ +### decord: +import os +import cv2 +import decord +from decord import VideoReader +from decord import cpu + +def extract_frames_from_mp4(video_path, output_dir, frame_rate=1): + # Check if the output directory exists, create if not + if not os.path.exists(output_dir): + os.makedirs(output_dir) + + # Initialize VideoReader with decord + vr = VideoReader(video_path, ctx=cpu(0)) + + # Get the video frame rate and calculate intervals based on target frame_rate + video_fps = vr.get_avg_fps() + interval = int(video_fps / frame_rate) + + print(f"Video FPS: {video_fps}") + print(f"Extracting frames every {interval} frames") + + # Loop through frames and save them as JPEG images + for i in range(0, len(vr), interval): + frame = vr[i].asnumpy() # Get frame as numpy array + output_filename = os.path.join(output_dir, f"frame_{i:06d}.jpg") + + # Save frame as JPEG + cv2.imwrite(output_filename, cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)) + print(f"Saved {output_filename}") + + print("Frame extraction complete.") + +if __name__ == "__main__": + # Define video file path and output directory + video_path = "input.mp4" + output_dir = "output_frames" + + # Extract frames at a specified rate (e.g., 1 frame per second) + extract_frames_from_mp4(video_path, output_dir, frame_rate=1) + +# inference + +import cv2 +from ultralytics import YOLO +from tqdm import tqdm + +def yolo_inference_on_video(input_video_path, output_video_path, model_name='yolov8n'): + # Load YOLO model + model = YOLO(model_name) # e.g., yolov8n, yolov8s, etc. + + # Open the input video + cap = cv2.VideoCapture(input_video_path) + if not cap.isOpened(): + print("Error: Could not open video.") + return + + # Get video properties + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fps = cap.get(cv2.CAP_PROP_FPS) + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + print(f"Video loaded: {width}x{height} at {fps} FPS with {total_frames} frames.") + + # Set up video writer for the output video + fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for .mp4 + out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) + + # Process each frame with tqdm progress bar + with tqdm(total=total_frames, desc="Processing video", unit="frame") as pbar: + while cap.isOpened(): + ret, frame = cap.read() + if not ret: + break # End of video + + # Run YOLO inference on the frame + results = model(frame) + + # Annotate the frame with bounding boxes and labels + annotated_frame = results[0].plot() # Plot the detections directly on the frame + + # Write the annotated frame to the output video + out.write(annotated_frame) + + # Update progress bar + pbar.update(1) + + # Release resources + cap.release() + out.release() + print(f"\nInference complete. Output saved to {output_video_path}") + +if __name__ == "__main__": + # Define input and output video file paths + input_video_path = "input.mp4" + output_video_path = "output_annotated.mp4" + + # Perform inference and save the annotated video + yolo_inference_on_video(input_video_path, output_video_path, model_name='yolov8n') diff --git a/workspace_python/ros2_ws/src/python_workspace/python_workspace/extermination_node.py b/workspace_python/ros2_ws/src/python_workspace/python_workspace/extermination_node.py index 7a8bc2f..7740891 100644 --- a/workspace_python/ros2_ws/src/python_workspace/python_workspace/extermination_node.py +++ b/workspace_python/ros2_ws/src/python_workspace/python_workspace/extermination_node.py @@ -1,5 +1,6 @@ import time, os import cv2 +import serial # pip3 install pyserial # import pycuda.driver as cuda # from tracker import * # depth point cloud here... @@ -14,6 +15,7 @@ from sensor_msgs.msg import Image from custom_interfaces.msg import InferenceOutput # CHANGE from .scripts.utils import ModelInference +from std_msgs.msg import String # Example message type # cuda.init() # device = cuda.Device(0) @@ -22,7 +24,7 @@ class ExterminationNode(Node): def __init__(self): super().__init__('extermination_node') - + self.declare_parameter('use_display_node', True) # self.declare_parameter('lower_range', [78, 158, 124]) #todo: make this a parameter # self.declare_parameter('upper_range', [60, 255, 255]) @@ -48,19 +50,23 @@ def __init__(self): # if self.use_display_node: # self.window = "Right Camera" + self.boxes_present = 0 self.window = "Left Camera" - self.model = ModelInference() self.bridge = CvBridge() - + # Open serial port to Arduino + self.ser = serial.Serial('/dev/ttyUSB0', 115200, timeout=1) # Adjust USB port as needed self.subscription = self.create_subscription(InferenceOutput, 'inference_out', self.inference_callback, 10) + # Create a timer that calls the listener_callback every second + self.timer = self.create_timer(1.0, self.timer_callback) + + time.sleep(2) # Wait for Arduino to reset def inference_callback(self, msg): preprocessed_image = self.bridge.imgmsg_to_cv2(msg.preprocessed_image, desired_encoding='passthrough') raw_image = self.bridge.imgmsg_to_cv2(msg.raw_image, desired_encoding='passthrough') - bounding_boxes = self.model.postprocess(msg.confidences.data,msg.bounding_boxes.data, raw_image,msg.velocity) final_image = self.model.draw_boxes(raw_image,bounding_boxes,velocity=msg.velocity) @@ -71,9 +77,15 @@ def inference_callback(self, msg): cv2.waitKey(10) if len(bounding_boxes) > 0: - pass # return 1 + self.boxes_present = 1 else: - pass # return 0 + self.boxes_present = 0 + + def timer_callback(self): + # Serialize and send the message to Arduino + serialized_msg = str(self.boxes_present) + '\n' # Add a newline as a delimiter + self.ser.write(serialized_msg.encode()) + self.get_logger().info(f'Sent to Arduino: {self.boxes_present}') def main(args=None):