From 25d4362aeaae98c3e2094233c7f872e589872bc1 Mon Sep 17 00:00:00 2001 From: Irvin | OpenDive Date: Tue, 9 Sep 2025 13:17:08 -0400 Subject: [PATCH 1/7] Remove global cost_map `scan` expected rate to account for delays --- src/go2_robot_sdk/config/nav2_params.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/go2_robot_sdk/config/nav2_params.yaml b/src/go2_robot_sdk/config/nav2_params.yaml index 1aa6319..3b9a2af 100644 --- a/src/go2_robot_sdk/config/nav2_params.yaml +++ b/src/go2_robot_sdk/config/nav2_params.yaml @@ -1763,7 +1763,7 @@ global_costmap: # Expected: Sensor's actual update rate (should match increased processing rate) # Impact: Used for timeout detection of stale sensor data # 10.0Hz matches increased global costmap processing for fast obstacle clearing - expected_update_rate: 10.0 + # expected_update_rate: 10.0 # min_obstacle_height: Minimum height to consider as obstacle (meters) # Expected: 0.0-0.3 (depends on what robot should avoid) From 97b14b6abaacf2b468181c1238072b5c20c8e305 Mon Sep 17 00:00:00 2001 From: Irvin | OpenDive Date: Tue, 9 Sep 2025 19:44:07 -0400 Subject: [PATCH 2/7] Add 640x480 camera config --- .../calibration/front_camera_480.yaml | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 src/go2_robot_sdk/calibration/front_camera_480.yaml diff --git a/src/go2_robot_sdk/calibration/front_camera_480.yaml b/src/go2_robot_sdk/calibration/front_camera_480.yaml new file mode 100644 index 0000000..b2a97d9 --- /dev/null +++ b/src/go2_robot_sdk/calibration/front_camera_480.yaml @@ -0,0 +1,26 @@ +image_width: 640 +image_height: 480 +camera_name: front_camera +camera_matrix: + rows: 3 + cols: 3 + data: [436.92609, 0. , 339.57048, + 0. , 586.78253, 283.05632, + 0. , 0. , 1. ] +distortion_model: plumb_bob +distortion_coefficients: + rows: 1 + cols: 5 + data: [-0.415971, 0.158898, -0.015395, -0.008031, 0.000000] +rectification_matrix: + rows: 3 + cols: 3 + data: [1., 0., 0., + 0., 1., 0., + 0., 0., 1.] +projection_matrix: + rows: 3 + cols: 4 + data: [330.75336, 0. , 331.36332, 0. , + 0. , 542.10351, 284.02454, 0. , + 0. , 0. , 1. , 0. ] From 6a83a42cea866882529c7720c4ae7a99d47577f6 Mon Sep 17 00:00:00 2001 From: Irvin | OpenDive Date: Tue, 9 Sep 2025 22:58:48 -0400 Subject: [PATCH 3/7] Minimize logging when video errors take place and add back pressure, cap FPS to 15 --- .../presentation/go2_driver_node.py | 165 +++++++++++++++++- 1 file changed, 160 insertions(+), 5 deletions(-) diff --git a/src/go2_robot_sdk/go2_robot_sdk/presentation/go2_driver_node.py b/src/go2_robot_sdk/go2_robot_sdk/presentation/go2_driver_node.py index 71851b3..4a50575 100644 --- a/src/go2_robot_sdk/go2_robot_sdk/presentation/go2_driver_node.py +++ b/src/go2_robot_sdk/go2_robot_sdk/presentation/go2_driver_node.py @@ -1,9 +1,17 @@ # Copyright (c) 2024, RoboVerse community # SPDX-License-Identifier: BSD-3-Clause +# Phase 2A+: Configure FFmpeg environment variables before any imports +import os +os.environ['AV_LOG_LEVEL'] = 'fatal' # Suppress FFmpeg log messages +os.environ['FFMPEG_LOG_LEVEL'] = 'fatal' # Alternative FFmpeg log control +os.environ['AV_CODEC_FLAG_ERROR_CONCEALMENT'] = '1' # Enable error concealment + import asyncio +import collections import logging import os +import time from typing import Dict, Any from aiortc import MediaStreamTrack @@ -31,6 +39,44 @@ logger.setLevel(logging.INFO) +class VideoErrorTracker: + """Track H.264 error rates for adaptive behavior""" + + def __init__(self, window_size=50): + self.error_window = collections.deque(maxlen=window_size) + self.total_errors = 0 + self.last_error_time = 0 + + def record_error(self): + """Record a video processing error""" + current_time = time.time() + self.error_window.append(1) + self.total_errors += 1 + self.last_error_time = current_time + + def record_success(self): + """Record successful video processing""" + self.error_window.append(0) + + def get_error_rate(self) -> float: + """Get current error rate (0.0 to 1.0)""" + if len(self.error_window) < 10: + return 0.0 + return sum(self.error_window) / len(self.error_window) + + def is_error_storm(self) -> bool: + """Check if we're in an error storm (>50% error rate)""" + return self.get_error_rate() > 0.5 + + def get_stats(self) -> dict: + """Get error statistics for logging""" + return { + 'error_rate': self.get_error_rate(), + 'total_errors': self.total_errors, + 'window_size': len(self.error_window) + } + + class Go2DriverNode(Node): """Main Go2 driver node - entry point to the application""" @@ -68,6 +114,12 @@ def __init__(self, event_loop=None): # Set callback for data self.webrtc_adapter.set_data_callback(self._on_robot_data_received) + # Phase 2A: Configure aiortc logging to reduce H.264 error spam + self._configure_aiortc_logging() + + # Phase 2A+: Configure FFmpeg error concealment for robust H.264 decoding + self._configure_ffmpeg_error_concealment() + # Subscribers initialization self._setup_subscribers() @@ -289,14 +341,84 @@ def _on_robot_data_received(self, msg: Dict[str, Any], robot_id: str) -> None: """Callback for receiving data from robot""" self.robot_data_service.process_webrtc_message(msg, robot_id) + def _configure_aiortc_logging(self) -> None: + """Configure aiortc logging to reduce H.264 error spam""" + logging.getLogger('aiortc.codecs.h264').setLevel(logging.ERROR) + logging.getLogger('aiortc.codecs').setLevel(logging.ERROR) + logging.getLogger('aiortc').setLevel(logging.WARNING) + logger.info("Configured aiortc logging to reduce H.264 error spam") + + def _configure_ffmpeg_error_concealment(self) -> None: + """Configure FFmpeg error concealment and log suppression for robust H.264 decoding""" + try: + import av + import av.logging + + # Primary approach: PyAV log level control (most reliable) + av.logging.set_level(av.logging.QUIET) # Complete suppression of FFmpeg messages + logger.info("Configured PyAV to completely suppress FFmpeg H.264 decoder messages") + + # Secondary approach: Try to configure codec options (may not work in all versions) + try: + # Attempt to set global decoder options for error concealment + h264_codec = av.codec.Codec('h264', 'r') + if hasattr(h264_codec, 'options'): + h264_codec.options.update({ + 'error_concealment': '3', # All error concealment methods + 'ec': '3', # Alternative parameter name + }) + logger.info("Configured H.264 decoder error concealment options") + except Exception as codec_error: + logger.debug(f"Could not configure codec options (this is normal): {codec_error}") + + except ImportError: + logger.warning("PyAV not available - cannot suppress FFmpeg logging") + except Exception as e: + logger.warning(f"Could not configure PyAV logging: {e}") + + # Environment variables were set at module import time + logger.info("FFmpeg environment variables configured at module import (AV_LOG_LEVEL=fatal)") + async def _on_video_frame(self, track: MediaStreamTrack, robot_id: str) -> None: """Callback for processing video frames""" - # logger.info(f"Video frame received for robot {robot_id}") + # Phase 1 optimizations: Frame rate limiting, resolution downscaling, + # subscription checking, and enhanced error handling + max_fps = 15 # Limit to 15 FPS for CPU efficiency + frame_interval = 1.0 / max_fps + last_frame_time = 0 + consecutive_errors = 0 + max_consecutive_errors = 5 + + # Phase 2A: Error tracking for adaptive behavior and diagnostics + error_tracker = VideoErrorTracker() + last_stats_log = 0 while True: try: - frame = await track.recv() - img = frame.to_ndarray(format="rgb24") + # Phase 2A: Add timeout to prevent indefinite blocking during H.264 decoder recovery + frame = await asyncio.wait_for(track.recv(), timeout=0.1) + + # Phase 1: Frame rate limiting - skip frames if processing too fast + current_time = time.time() + if current_time - last_frame_time < frame_interval: + continue # Skip frame to maintain target FPS + + # Phase 1: Enhanced frame skipping - more aggressive under load + if consecutive_errors > 3: # High stress - skip 75% of frames + if int(current_time * 10) % 4 != 0: + continue + elif consecutive_errors > 1: # Moderate stress - skip 50% of frames + if int(current_time * 10) % 2 == 0: + continue + + last_frame_time = current_time + + # Phase 1: Subscription checking - skip processing if no subscribers + if not self.ros2_publisher.has_camera_subscribers(robot_id): + continue # Skip frame processing when no one is listening + + # Phase 1: Resolution downscaling - 85% data reduction (1920x1080 -> 640x480) + img = frame.to_ndarray(format="rgb24", width=640, height=480) # Create camera data camera_data = CameraData( @@ -314,11 +436,44 @@ async def _on_video_frame(self, track: MediaStreamTrack, robot_id: str) -> None: # Publish via ROS2Publisher self.ros2_publisher.publish_camera_data(robot_data) + + # Reset error counter on successful processing + consecutive_errors = 0 + + # Phase 2A: Record successful frame processing + error_tracker.record_success() + + # Phase 2A: Periodic error statistics logging (every 30 seconds) + current_time = time.time() + if current_time - last_stats_log > 30: + stats = error_tracker.get_stats() + if stats['total_errors'] > 0: + logger.info(f"Video error stats: {stats['error_rate']:.1%} error rate, " + f"{stats['total_errors']} total errors") + last_stats_log = current_time + await asyncio.sleep(0) + except asyncio.TimeoutError: + # Phase 2A: Handle frame receive timeout - skip frame without counting as error + continue + except Exception as e: - logger.error(f"Error processing video frame: {e}") - break + # Phase 1: Enhanced error handling for video failures + consecutive_errors += 1 + + # Phase 2A: Record error for tracking and statistics + error_tracker.record_error() + + logger.error(f"Error processing video frame (attempt {consecutive_errors}): {e}") + + # Phase 1: Graceful degradation - break after too many consecutive errors + if consecutive_errors >= max_consecutive_errors: + logger.error(f"Too many consecutive video errors ({consecutive_errors}), stopping video processing for robot {robot_id}") + break + + # Brief pause before retry to avoid tight error loop + await asyncio.sleep(0.1) # CycloneDDS callbacks def _on_cyclonedds_low_state(self, msg: LowState) -> None: From 06ada7718698a4db23f5481b9951f6dd88485d42 Mon Sep 17 00:00:00 2001 From: Irvin | OpenDive Date: Tue, 9 Sep 2025 23:00:28 -0400 Subject: [PATCH 4/7] Optimize clock calls and begin commenting out LiDAR processing --- .../infrastructure/ros2/ros2_publisher.py | 115 +++++++++++------- 1 file changed, 74 insertions(+), 41 deletions(-) diff --git a/src/go2_robot_sdk/go2_robot_sdk/infrastructure/ros2/ros2_publisher.py b/src/go2_robot_sdk/go2_robot_sdk/infrastructure/ros2/ros2_publisher.py index 4da379b..9ba3ce9 100644 --- a/src/go2_robot_sdk/go2_robot_sdk/infrastructure/ros2/ros2_publisher.py +++ b/src/go2_robot_sdk/go2_robot_sdk/infrastructure/ros2/ros2_publisher.py @@ -42,19 +42,22 @@ def publish_odometry(self, robot_data: RobotData) -> None: try: robot_idx = int(robot_data.robot_id) + # Single timestamp for both transform and odometry to ensure logical consistency + current_time = self.node.get_clock().now().to_msg() + # Publish transform - self._publish_transform(robot_data, robot_idx) + self._publish_transform(robot_data, robot_idx, current_time) # Publish odometry topic - self._publish_odometry_topic(robot_data, robot_idx) + self._publish_odometry_topic(robot_data, robot_idx, current_time) except Exception as e: logger.error(f"Error publishing odometry: {e}") - def _publish_transform(self, robot_data: RobotData, robot_idx: int) -> None: + def _publish_transform(self, robot_data: RobotData, robot_idx: int, current_time) -> None: """Publish TF transform""" odom_trans = TransformStamped() - odom_trans.header.stamp = self.node.get_clock().now().to_msg() + odom_trans.header.stamp = current_time odom_trans.header.frame_id = 'odom' if self.config.conn_mode == 'single': @@ -76,10 +79,10 @@ def _publish_transform(self, robot_data: RobotData, robot_idx: int) -> None: self.broadcaster.sendTransform(odom_trans) - def _publish_odometry_topic(self, robot_data: RobotData, robot_idx: int) -> None: + def _publish_odometry_topic(self, robot_data: RobotData, robot_idx: int, current_time) -> None: """Publish Odometry topic""" odom_msg = Odometry() - odom_msg.header.stamp = self.node.get_clock().now().to_msg() + odom_msg.header.stamp = current_time odom_msg.header.frame_id = 'odom' if self.config.conn_mode == 'single': @@ -108,8 +111,11 @@ def publish_joint_state(self, robot_data: RobotData) -> None: try: robot_idx = int(robot_data.robot_id) + # Phase 1 optimization: single timestamp call per message to reduce system call overhead + current_time = self.node.get_clock().now().to_msg() + joint_state = JointState() - joint_state.header.stamp = self.node.get_clock().now().to_msg() + joint_state.header.stamp = current_time # Define joint names if self.config.conn_mode == 'single': @@ -181,37 +187,54 @@ def publish_robot_state(self, robot_data: RobotData) -> None: def publish_lidar_data(self, robot_data: RobotData) -> None: """Publish lidar data""" - if not robot_data.lidar_data or not self.config.decode_lidar: - return - + # COMMENTED OUT FOR CPU SPIKE TESTING - Point cloud processing suspected cause of high CPU usage + # if not robot_data.lidar_data or not self.config.decode_lidar: + # return + + # try: + # robot_idx = int(robot_data.robot_id) + # lidar = robot_data.lidar_data + + # points = update_meshes_for_cloud2( + # lidar.positions, + # lidar.uvs, + # lidar.resolution, + # lidar.origin, + # 0 + # ) + + # # Phase 1 optimization: single timestamp call per message to reduce system call overhead + # current_time = self.node.get_clock().now().to_msg() + # + # point_cloud = PointCloud2() + # point_cloud.header = Header(frame_id="odom") + # point_cloud.header.stamp = current_time + # + # fields = [ + # PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1), + # PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1), + # PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1), + # PointField(name='intensity', offset=12, datatype=PointField.FLOAT32, count=1), + # ] + # + # point_cloud = point_cloud2.create_cloud(point_cloud.header, fields, points) + # self.publishers['lidar'][robot_idx].publish(point_cloud) + + # except Exception as e: + # logger.error(f"Error publishing lidar data: {e}") + + # Skip LiDAR publishing during CPU spike testing + return + + def has_camera_subscribers(self, robot_id: str) -> bool: + """Check if there are active subscribers to camera topics""" try: - robot_idx = int(robot_data.robot_id) - lidar = robot_data.lidar_data - - points = update_meshes_for_cloud2( - lidar.positions, - lidar.uvs, - lidar.resolution, - lidar.origin, - 0 - ) - - point_cloud = PointCloud2() - point_cloud.header = Header(frame_id="odom") - point_cloud.header.stamp = self.node.get_clock().now().to_msg() - - fields = [ - PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1), - PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1), - PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1), - PointField(name='intensity', offset=12, datatype=PointField.FLOAT32, count=1), - ] - - point_cloud = point_cloud2.create_cloud(point_cloud.header, fields, points) - self.publishers['lidar'][robot_idx].publish(point_cloud) - - except Exception as e: - logger.error(f"Error publishing lidar data: {e}") + robot_idx = int(robot_id) + camera_subs = self.publishers['camera'][robot_idx].get_subscription_count() + camera_info_subs = self.publishers['camera_info'][robot_idx].get_subscription_count() + return camera_subs > 0 or camera_info_subs > 0 + except Exception: + return True # Default to processing if check fails def publish_camera_data(self, robot_data: RobotData) -> None: """Publish camera data""" @@ -222,13 +245,23 @@ def publish_camera_data(self, robot_data: RobotData) -> None: robot_idx = int(robot_data.robot_id) camera = robot_data.camera_data + # Phase 1 optimization: single timestamp call per message to reduce system call overhead + current_time = self.node.get_clock().now().to_msg() + # Convert to ROS Image ros_image = self.bridge.cv2_to_imgmsg(camera.image, encoding=camera.encoding) - ros_image.header.stamp = self.node.get_clock().now().to_msg() - - # Camera info + ros_image.header.stamp = current_time + + # Camera info - reuse same timestamp for consistency + # Phase 1: Direct lookup with explicit error handling (no fallback) + if camera.height not in self.camera_info: + available = list(self.camera_info.keys()) + raise ValueError(f"No camera calibration found for {camera.height}p resolution. " + f"Available resolutions: {available}. " + f"Ensure front_camera_{camera.height}.yaml exists in calibration directory.") + camera_info = self.camera_info[camera.height] - camera_info.header.stamp = ros_image.header.stamp + camera_info.header.stamp = current_time if self.config.conn_mode == 'single': camera_info.header.frame_id = 'front_camera' From 0278cede0aeb335e4fcf81c3d5a2875ca67a5497 Mon Sep 17 00:00:00 2001 From: Irvin | OpenDive Date: Tue, 9 Sep 2025 23:01:06 -0400 Subject: [PATCH 5/7] Optimize conditional checks and start commenting out LiDAR processing --- .../services/robot_data_service.py | 65 +++++++++++++------ 1 file changed, 44 insertions(+), 21 deletions(-) diff --git a/src/go2_robot_sdk/go2_robot_sdk/application/services/robot_data_service.py b/src/go2_robot_sdk/go2_robot_sdk/application/services/robot_data_service.py index 93894cd..a7957c4 100644 --- a/src/go2_robot_sdk/go2_robot_sdk/application/services/robot_data_service.py +++ b/src/go2_robot_sdk/go2_robot_sdk/application/services/robot_data_service.py @@ -24,12 +24,13 @@ def process_webrtc_message(self, msg: Dict[str, Any], robot_id: str) -> None: topic = msg.get('topic') robot_data = RobotData(robot_id=robot_id, timestamp=0.0) - if topic == RTC_TOPIC["ULIDAR_ARRAY"]: - self._process_lidar_data(msg, robot_data) - self.publisher.publish_lidar_data(robot_data) - self.publisher.publish_voxel_data(robot_data) + # COMMENTED OUT FOR CPU SPIKE TESTING - LiDAR processing suspected cause of blocking + # if topic == RTC_TOPIC["ULIDAR_ARRAY"]: + # self._process_lidar_data(msg, robot_data) + # self.publisher.publish_lidar_data(robot_data) + # self.publisher.publish_voxel_data(robot_data) - elif topic == RTC_TOPIC["ROBOTODOM"]: + if topic == RTC_TOPIC["ROBOTODOM"]: self._process_odometry_data(msg, robot_data) self.publisher.publish_odometry(robot_data) @@ -70,12 +71,19 @@ def _process_odometry_data(self, msg: Dict[str, Any], robot_data: RobotData) -> position = pose_data['position'] orientation = pose_data['orientation'] - # Data validation + # Data validation - Phase 1 optimization: type-check first for faster rejection pos_vals = [position['x'], position['y'], position['z']] rot_vals = [orientation['x'], orientation['y'], orientation['z'], orientation['w']] + all_vals = pos_vals + rot_vals - if not all(isinstance(v, (int, float)) and math.isfinite(v) for v in pos_vals + rot_vals): - logger.warning("Invalid odometry data - skipping") + # Fast path: check types first (cheaper operation) + if not all(isinstance(v, (int, float)) for v in all_vals): + logger.warning("Invalid odometry data types - skipping") + return + + # Only check finite values if types are correct (more expensive operation) + if not all(math.isfinite(v) for v in all_vals): + logger.warning("Invalid odometry data values - skipping") return robot_data.odometry_data = OdometryData( @@ -90,16 +98,23 @@ def _process_sport_mode_state(self, msg: Dict[str, Any], robot_data: RobotData) try: data = msg["data"] - # Data validation - if not self._validate_float_list(data.get("position", [])): - return - if not self._validate_float_list(data.get("range_obstacle", [])): - return - if not self._validate_float_list(data.get("foot_position_body", [])): - return - if not self._validate_float_list(data.get("foot_speed_body", [])): - return + # Data validation - Phase 1 optimization: consolidated validation to reduce function call overhead + validation_fields = [ + ("position", data.get("position", [])), + ("range_obstacle", data.get("range_obstacle", [])), + ("foot_position_body", data.get("foot_position_body", [])), + ("foot_speed_body", data.get("foot_speed_body", [])) + ] + + # Validate all float lists in one pass + for field_name, field_data in validation_fields: + if not self._validate_float_list(field_data): + logger.warning(f"Invalid sport mode state data in field '{field_name}' - skipping") + return + + # Validate single float value if not self._validate_float(data.get("body_height")): + logger.warning("Invalid sport mode state body_height - skipping") return robot_data.robot_state = RobotState( @@ -144,9 +159,17 @@ def _process_low_state(self, msg: Dict[str, Any], robot_data: RobotData) -> None logger.error(f"Error processing low state: {e}") def _validate_float_list(self, data: list) -> bool: - """Validate a list of float values""" - return all(isinstance(x, (int, float)) and math.isfinite(x) for x in data) + """Validate a list of float values - Phase 1 optimization: type-check first""" + # Fast path: check types first (cheaper operation) + if not all(isinstance(x, (int, float)) for x in data): + return False + # Only check finite values if types are correct (more expensive operation) + return all(math.isfinite(x) for x in data) def _validate_float(self, value: Any) -> bool: - """Validate a float value""" - return isinstance(value, (int, float)) and math.isfinite(value) \ No newline at end of file + """Validate a float value - Phase 1 optimization: type-check first""" + # Fast path: check type first (cheaper operation) + if not isinstance(value, (int, float)): + return False + # Only check finite if type is correct (more expensive operation) + return math.isfinite(value) \ No newline at end of file From 8b92f0d184f57e711654b802cd861dfd4fc5c7e0 Mon Sep 17 00:00:00 2001 From: Irvin | OpenDive Date: Tue, 9 Sep 2025 23:01:27 -0400 Subject: [PATCH 6/7] Comment out LiDAR processing --- .../infrastructure/sensors/lidar_decoder.py | 90 ++++++++++--------- .../infrastructure/webrtc/data_decoder.py | 56 +++++++----- 2 files changed, 86 insertions(+), 60 deletions(-) diff --git a/src/go2_robot_sdk/go2_robot_sdk/infrastructure/sensors/lidar_decoder.py b/src/go2_robot_sdk/go2_robot_sdk/infrastructure/sensors/lidar_decoder.py index 3301e0d..3220439 100644 --- a/src/go2_robot_sdk/go2_robot_sdk/infrastructure/sensors/lidar_decoder.py +++ b/src/go2_robot_sdk/go2_robot_sdk/infrastructure/sensors/lidar_decoder.py @@ -157,47 +157,57 @@ def add_value_arr(self, start, value): def decode(self, compressed_data, data): """Original decode method that actually works with the WASM module""" - self.add_value_arr(self.input, compressed_data) - - some_v = math.floor(data["origin"][2] / data["resolution"]) - - self.generate( - self.store, - self.input, - len(compressed_data), - self.decompressBufferSize, - self.decompressBuffer, - self.decompressedSize, - self.positions, - self.uvs, - self.indices, - self.faceCount, - self.pointCount, - some_v - ) - - self.get_value(self.decompressedSize, "i32") - c = self.get_value(self.pointCount, "i32") - u = self.get_value(self.faceCount, "i32") - - positions_slice = self.HEAPU8[self.positions:self.positions + u * 12] - positions_copy = bytearray(positions_slice) - p = np.frombuffer(positions_copy, dtype=np.uint8) - - uvs_slice = self.HEAPU8[self.uvs:self.uvs + u * 8] - uvs_copy = bytearray(uvs_slice) - r = np.frombuffer(uvs_copy, dtype=np.uint8) - - indices_slice = self.HEAPU8[self.indices:self.indices + u * 24] - indices_copy = bytearray(indices_slice) - o = np.frombuffer(indices_copy, dtype=np.uint32) - + # COMMENTED OUT FOR CPU SPIKE TESTING - WASM decoder is suspected cause of 500ms blocking + # self.add_value_arr(self.input, compressed_data) + + # some_v = math.floor(data["origin"][2] / data["resolution"]) + + # self.generate( + # self.store, + # self.input, + # len(compressed_data), + # self.decompressBufferSize, + # self.decompressBuffer, + # self.decompressedSize, + # self.positions, + # self.uvs, + # self.indices, + # self.faceCount, + # self.pointCount, + # some_v + # ) + + # self.get_value(self.decompressedSize, "i32") + # c = self.get_value(self.pointCount, "i32") + # u = self.get_value(self.faceCount, "i32") + + # positions_slice = self.HEAPU8[self.positions:self.positions + u * 12] + # positions_copy = bytearray(positions_slice) + # p = np.frombuffer(positions_copy, dtype=np.uint8) + + # uvs_slice = self.HEAPU8[self.uvs:self.uvs + u * 8] + # uvs_copy = bytearray(uvs_slice) + # r = np.frombuffer(uvs_copy, dtype=np.uint8) + + # indices_slice = self.HEAPU8[self.indices:self.indices + u * 24] + # indices_copy = bytearray(indices_slice) + # o = np.frombuffer(indices_copy, dtype=np.uint32) + + # return { + # "point_count": c, + # "face_count": u, + # "positions": p, + # "uvs": r, + # "indices": o + # } + + # Return empty result to avoid breaking downstream code return { - "point_count": c, - "face_count": u, - "positions": p, - "uvs": r, - "indices": o + "point_count": 0, + "face_count": 0, + "positions": np.array([], dtype=np.uint8), + "uvs": np.array([], dtype=np.uint8), + "indices": np.array([], dtype=np.uint32) } diff --git a/src/go2_robot_sdk/go2_robot_sdk/infrastructure/webrtc/data_decoder.py b/src/go2_robot_sdk/go2_robot_sdk/infrastructure/webrtc/data_decoder.py index 5dfd0cc..522b0f9 100644 --- a/src/go2_robot_sdk/go2_robot_sdk/infrastructure/webrtc/data_decoder.py +++ b/src/go2_robot_sdk/go2_robot_sdk/infrastructure/webrtc/data_decoder.py @@ -218,27 +218,43 @@ def deal_array_buffer(buffer: bytes, perform_decode: bool = True) -> Optional[Di return None try: + # COMMENTED OUT FOR CPU SPIKE TESTING - LiDAR processing suspected cause of blocking # Use original implementation for full compatibility - if _global_lidar_decoder and perform_decode: - import struct - import json - - length = struct.unpack("H", buffer[:2])[0] - json_segment = buffer[4: 4 + length] - compressed_data = buffer[4 + length:] - json_str = json_segment.decode("utf-8") - obj = json.loads(json_str) - - if compressed_data: - decoded_data = _global_lidar_decoder.decode(compressed_data, obj['data']) - obj["decoded_data"] = decoded_data - else: - obj["compressed_data"] = compressed_data - return obj - else: - # Fallback to new decoder - decoder = get_data_decoder(enable_lidar=perform_decode) - return decoder.decode_array_buffer(buffer) + # if _global_lidar_decoder and perform_decode: + # import struct + # import json + # + # length = struct.unpack("H", buffer[:2])[0] + # json_segment = buffer[4: 4 + length] + # compressed_data = buffer[4 + length:] + # json_str = json_segment.decode("utf-8") + # obj = json.loads(json_str) + # + # if compressed_data: + # decoded_data = _global_lidar_decoder.decode(compressed_data, obj['data']) + # obj["decoded_data"] = decoded_data + # else: + # obj["compressed_data"] = compressed_data + # return obj + # else: + # # Fallback to new decoder + # decoder = get_data_decoder(enable_lidar=perform_decode) + # return decoder.decode_array_buffer(buffer) + + # Return minimal data structure to avoid breaking downstream code + return { + "data": { + "origin": [0, 0, 0], + "resolution": 0.1 + }, + "decoded_data": { + "point_count": 0, + "face_count": 0, + "positions": [], + "uvs": [], + "indices": [] + } + } except Exception as e: logger.error(f"Error in deal_array_buffer: {e}") From 833643da8436de80d43cf7791fe10a5832dc3385 Mon Sep 17 00:00:00 2001 From: Irvin | OpenDive Date: Wed, 10 Sep 2025 01:04:06 -0400 Subject: [PATCH 7/7] Uncomment LiDAR scan processing --- .../services/robot_data_service.py | 11 ++- .../infrastructure/ros2/ros2_publisher.py | 72 +++++++-------- .../infrastructure/sensors/lidar_decoder.py | 90 +++++++++---------- .../infrastructure/webrtc/data_decoder.py | 56 +++++------- 4 files changed, 99 insertions(+), 130 deletions(-) diff --git a/src/go2_robot_sdk/go2_robot_sdk/application/services/robot_data_service.py b/src/go2_robot_sdk/go2_robot_sdk/application/services/robot_data_service.py index a7957c4..ff21095 100644 --- a/src/go2_robot_sdk/go2_robot_sdk/application/services/robot_data_service.py +++ b/src/go2_robot_sdk/go2_robot_sdk/application/services/robot_data_service.py @@ -24,13 +24,12 @@ def process_webrtc_message(self, msg: Dict[str, Any], robot_id: str) -> None: topic = msg.get('topic') robot_data = RobotData(robot_id=robot_id, timestamp=0.0) - # COMMENTED OUT FOR CPU SPIKE TESTING - LiDAR processing suspected cause of blocking - # if topic == RTC_TOPIC["ULIDAR_ARRAY"]: - # self._process_lidar_data(msg, robot_data) - # self.publisher.publish_lidar_data(robot_data) - # self.publisher.publish_voxel_data(robot_data) + if topic == RTC_TOPIC["ULIDAR_ARRAY"]: + self._process_lidar_data(msg, robot_data) + self.publisher.publish_lidar_data(robot_data) + self.publisher.publish_voxel_data(robot_data) - if topic == RTC_TOPIC["ROBOTODOM"]: + elif topic == RTC_TOPIC["ROBOTODOM"]: self._process_odometry_data(msg, robot_data) self.publisher.publish_odometry(robot_data) diff --git a/src/go2_robot_sdk/go2_robot_sdk/infrastructure/ros2/ros2_publisher.py b/src/go2_robot_sdk/go2_robot_sdk/infrastructure/ros2/ros2_publisher.py index 9ba3ce9..b6b67f4 100644 --- a/src/go2_robot_sdk/go2_robot_sdk/infrastructure/ros2/ros2_publisher.py +++ b/src/go2_robot_sdk/go2_robot_sdk/infrastructure/ros2/ros2_publisher.py @@ -187,44 +187,40 @@ def publish_robot_state(self, robot_data: RobotData) -> None: def publish_lidar_data(self, robot_data: RobotData) -> None: """Publish lidar data""" - # COMMENTED OUT FOR CPU SPIKE TESTING - Point cloud processing suspected cause of high CPU usage - # if not robot_data.lidar_data or not self.config.decode_lidar: - # return - - # try: - # robot_idx = int(robot_data.robot_id) - # lidar = robot_data.lidar_data - - # points = update_meshes_for_cloud2( - # lidar.positions, - # lidar.uvs, - # lidar.resolution, - # lidar.origin, - # 0 - # ) - - # # Phase 1 optimization: single timestamp call per message to reduce system call overhead - # current_time = self.node.get_clock().now().to_msg() - # - # point_cloud = PointCloud2() - # point_cloud.header = Header(frame_id="odom") - # point_cloud.header.stamp = current_time - # - # fields = [ - # PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1), - # PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1), - # PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1), - # PointField(name='intensity', offset=12, datatype=PointField.FLOAT32, count=1), - # ] - # - # point_cloud = point_cloud2.create_cloud(point_cloud.header, fields, points) - # self.publishers['lidar'][robot_idx].publish(point_cloud) - - # except Exception as e: - # logger.error(f"Error publishing lidar data: {e}") - - # Skip LiDAR publishing during CPU spike testing - return + if not robot_data.lidar_data or not self.config.decode_lidar: + return + + try: + robot_idx = int(robot_data.robot_id) + lidar = robot_data.lidar_data + + points = update_meshes_for_cloud2( + lidar.positions, + lidar.uvs, + lidar.resolution, + lidar.origin, + 0 + ) + + # Phase 1 optimization: single timestamp call per message to reduce system call overhead + current_time = self.node.get_clock().now().to_msg() + + point_cloud = PointCloud2() + point_cloud.header = Header(frame_id="odom") + point_cloud.header.stamp = current_time + + fields = [ + PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1), + PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1), + PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1), + PointField(name='intensity', offset=12, datatype=PointField.FLOAT32, count=1), + ] + + point_cloud = point_cloud2.create_cloud(point_cloud.header, fields, points) + self.publishers['lidar'][robot_idx].publish(point_cloud) + + except Exception as e: + logger.error(f"Error publishing lidar data: {e}") def has_camera_subscribers(self, robot_id: str) -> bool: """Check if there are active subscribers to camera topics""" diff --git a/src/go2_robot_sdk/go2_robot_sdk/infrastructure/sensors/lidar_decoder.py b/src/go2_robot_sdk/go2_robot_sdk/infrastructure/sensors/lidar_decoder.py index 3220439..3301e0d 100644 --- a/src/go2_robot_sdk/go2_robot_sdk/infrastructure/sensors/lidar_decoder.py +++ b/src/go2_robot_sdk/go2_robot_sdk/infrastructure/sensors/lidar_decoder.py @@ -157,57 +157,47 @@ def add_value_arr(self, start, value): def decode(self, compressed_data, data): """Original decode method that actually works with the WASM module""" - # COMMENTED OUT FOR CPU SPIKE TESTING - WASM decoder is suspected cause of 500ms blocking - # self.add_value_arr(self.input, compressed_data) - - # some_v = math.floor(data["origin"][2] / data["resolution"]) - - # self.generate( - # self.store, - # self.input, - # len(compressed_data), - # self.decompressBufferSize, - # self.decompressBuffer, - # self.decompressedSize, - # self.positions, - # self.uvs, - # self.indices, - # self.faceCount, - # self.pointCount, - # some_v - # ) - - # self.get_value(self.decompressedSize, "i32") - # c = self.get_value(self.pointCount, "i32") - # u = self.get_value(self.faceCount, "i32") - - # positions_slice = self.HEAPU8[self.positions:self.positions + u * 12] - # positions_copy = bytearray(positions_slice) - # p = np.frombuffer(positions_copy, dtype=np.uint8) - - # uvs_slice = self.HEAPU8[self.uvs:self.uvs + u * 8] - # uvs_copy = bytearray(uvs_slice) - # r = np.frombuffer(uvs_copy, dtype=np.uint8) - - # indices_slice = self.HEAPU8[self.indices:self.indices + u * 24] - # indices_copy = bytearray(indices_slice) - # o = np.frombuffer(indices_copy, dtype=np.uint32) - - # return { - # "point_count": c, - # "face_count": u, - # "positions": p, - # "uvs": r, - # "indices": o - # } - - # Return empty result to avoid breaking downstream code + self.add_value_arr(self.input, compressed_data) + + some_v = math.floor(data["origin"][2] / data["resolution"]) + + self.generate( + self.store, + self.input, + len(compressed_data), + self.decompressBufferSize, + self.decompressBuffer, + self.decompressedSize, + self.positions, + self.uvs, + self.indices, + self.faceCount, + self.pointCount, + some_v + ) + + self.get_value(self.decompressedSize, "i32") + c = self.get_value(self.pointCount, "i32") + u = self.get_value(self.faceCount, "i32") + + positions_slice = self.HEAPU8[self.positions:self.positions + u * 12] + positions_copy = bytearray(positions_slice) + p = np.frombuffer(positions_copy, dtype=np.uint8) + + uvs_slice = self.HEAPU8[self.uvs:self.uvs + u * 8] + uvs_copy = bytearray(uvs_slice) + r = np.frombuffer(uvs_copy, dtype=np.uint8) + + indices_slice = self.HEAPU8[self.indices:self.indices + u * 24] + indices_copy = bytearray(indices_slice) + o = np.frombuffer(indices_copy, dtype=np.uint32) + return { - "point_count": 0, - "face_count": 0, - "positions": np.array([], dtype=np.uint8), - "uvs": np.array([], dtype=np.uint8), - "indices": np.array([], dtype=np.uint32) + "point_count": c, + "face_count": u, + "positions": p, + "uvs": r, + "indices": o } diff --git a/src/go2_robot_sdk/go2_robot_sdk/infrastructure/webrtc/data_decoder.py b/src/go2_robot_sdk/go2_robot_sdk/infrastructure/webrtc/data_decoder.py index 522b0f9..5dfd0cc 100644 --- a/src/go2_robot_sdk/go2_robot_sdk/infrastructure/webrtc/data_decoder.py +++ b/src/go2_robot_sdk/go2_robot_sdk/infrastructure/webrtc/data_decoder.py @@ -218,43 +218,27 @@ def deal_array_buffer(buffer: bytes, perform_decode: bool = True) -> Optional[Di return None try: - # COMMENTED OUT FOR CPU SPIKE TESTING - LiDAR processing suspected cause of blocking # Use original implementation for full compatibility - # if _global_lidar_decoder and perform_decode: - # import struct - # import json - # - # length = struct.unpack("H", buffer[:2])[0] - # json_segment = buffer[4: 4 + length] - # compressed_data = buffer[4 + length:] - # json_str = json_segment.decode("utf-8") - # obj = json.loads(json_str) - # - # if compressed_data: - # decoded_data = _global_lidar_decoder.decode(compressed_data, obj['data']) - # obj["decoded_data"] = decoded_data - # else: - # obj["compressed_data"] = compressed_data - # return obj - # else: - # # Fallback to new decoder - # decoder = get_data_decoder(enable_lidar=perform_decode) - # return decoder.decode_array_buffer(buffer) - - # Return minimal data structure to avoid breaking downstream code - return { - "data": { - "origin": [0, 0, 0], - "resolution": 0.1 - }, - "decoded_data": { - "point_count": 0, - "face_count": 0, - "positions": [], - "uvs": [], - "indices": [] - } - } + if _global_lidar_decoder and perform_decode: + import struct + import json + + length = struct.unpack("H", buffer[:2])[0] + json_segment = buffer[4: 4 + length] + compressed_data = buffer[4 + length:] + json_str = json_segment.decode("utf-8") + obj = json.loads(json_str) + + if compressed_data: + decoded_data = _global_lidar_decoder.decode(compressed_data, obj['data']) + obj["decoded_data"] = decoded_data + else: + obj["compressed_data"] = compressed_data + return obj + else: + # Fallback to new decoder + decoder = get_data_decoder(enable_lidar=perform_decode) + return decoder.decode_array_buffer(buffer) except Exception as e: logger.error(f"Error in deal_array_buffer: {e}")