diff --git a/src/go2_robot_sdk/calibration/front_camera_480.yaml b/src/go2_robot_sdk/calibration/front_camera_480.yaml new file mode 100644 index 0000000..b2a97d9 --- /dev/null +++ b/src/go2_robot_sdk/calibration/front_camera_480.yaml @@ -0,0 +1,26 @@ +image_width: 640 +image_height: 480 +camera_name: front_camera +camera_matrix: + rows: 3 + cols: 3 + data: [436.92609, 0. , 339.57048, + 0. , 586.78253, 283.05632, + 0. , 0. , 1. ] +distortion_model: plumb_bob +distortion_coefficients: + rows: 1 + cols: 5 + data: [-0.415971, 0.158898, -0.015395, -0.008031, 0.000000] +rectification_matrix: + rows: 3 + cols: 3 + data: [1., 0., 0., + 0., 1., 0., + 0., 0., 1.] +projection_matrix: + rows: 3 + cols: 4 + data: [330.75336, 0. , 331.36332, 0. , + 0. , 542.10351, 284.02454, 0. , + 0. , 0. , 1. , 0. ] diff --git a/src/go2_robot_sdk/config/nav2_params.yaml b/src/go2_robot_sdk/config/nav2_params.yaml index 1aa6319..3b9a2af 100644 --- a/src/go2_robot_sdk/config/nav2_params.yaml +++ b/src/go2_robot_sdk/config/nav2_params.yaml @@ -1763,7 +1763,7 @@ global_costmap: # Expected: Sensor's actual update rate (should match increased processing rate) # Impact: Used for timeout detection of stale sensor data # 10.0Hz matches increased global costmap processing for fast obstacle clearing - expected_update_rate: 10.0 + # expected_update_rate: 10.0 # min_obstacle_height: Minimum height to consider as obstacle (meters) # Expected: 0.0-0.3 (depends on what robot should avoid) diff --git a/src/go2_robot_sdk/go2_robot_sdk/application/services/robot_data_service.py b/src/go2_robot_sdk/go2_robot_sdk/application/services/robot_data_service.py index 93894cd..ff21095 100644 --- a/src/go2_robot_sdk/go2_robot_sdk/application/services/robot_data_service.py +++ b/src/go2_robot_sdk/go2_robot_sdk/application/services/robot_data_service.py @@ -70,12 +70,19 @@ def _process_odometry_data(self, msg: Dict[str, Any], robot_data: RobotData) -> position = pose_data['position'] orientation = pose_data['orientation'] - # Data validation + # Data validation - Phase 1 optimization: type-check first for faster rejection pos_vals = [position['x'], position['y'], position['z']] rot_vals = [orientation['x'], orientation['y'], orientation['z'], orientation['w']] + all_vals = pos_vals + rot_vals - if not all(isinstance(v, (int, float)) and math.isfinite(v) for v in pos_vals + rot_vals): - logger.warning("Invalid odometry data - skipping") + # Fast path: check types first (cheaper operation) + if not all(isinstance(v, (int, float)) for v in all_vals): + logger.warning("Invalid odometry data types - skipping") + return + + # Only check finite values if types are correct (more expensive operation) + if not all(math.isfinite(v) for v in all_vals): + logger.warning("Invalid odometry data values - skipping") return robot_data.odometry_data = OdometryData( @@ -90,16 +97,23 @@ def _process_sport_mode_state(self, msg: Dict[str, Any], robot_data: RobotData) try: data = msg["data"] - # Data validation - if not self._validate_float_list(data.get("position", [])): - return - if not self._validate_float_list(data.get("range_obstacle", [])): - return - if not self._validate_float_list(data.get("foot_position_body", [])): - return - if not self._validate_float_list(data.get("foot_speed_body", [])): - return + # Data validation - Phase 1 optimization: consolidated validation to reduce function call overhead + validation_fields = [ + ("position", data.get("position", [])), + ("range_obstacle", data.get("range_obstacle", [])), + ("foot_position_body", data.get("foot_position_body", [])), + ("foot_speed_body", data.get("foot_speed_body", [])) + ] + + # Validate all float lists in one pass + for field_name, field_data in validation_fields: + if not self._validate_float_list(field_data): + logger.warning(f"Invalid sport mode state data in field '{field_name}' - skipping") + return + + # Validate single float value if not self._validate_float(data.get("body_height")): + logger.warning("Invalid sport mode state body_height - skipping") return robot_data.robot_state = RobotState( @@ -144,9 +158,17 @@ def _process_low_state(self, msg: Dict[str, Any], robot_data: RobotData) -> None logger.error(f"Error processing low state: {e}") def _validate_float_list(self, data: list) -> bool: - """Validate a list of float values""" - return all(isinstance(x, (int, float)) and math.isfinite(x) for x in data) + """Validate a list of float values - Phase 1 optimization: type-check first""" + # Fast path: check types first (cheaper operation) + if not all(isinstance(x, (int, float)) for x in data): + return False + # Only check finite values if types are correct (more expensive operation) + return all(math.isfinite(x) for x in data) def _validate_float(self, value: Any) -> bool: - """Validate a float value""" - return isinstance(value, (int, float)) and math.isfinite(value) \ No newline at end of file + """Validate a float value - Phase 1 optimization: type-check first""" + # Fast path: check type first (cheaper operation) + if not isinstance(value, (int, float)): + return False + # Only check finite if type is correct (more expensive operation) + return math.isfinite(value) \ No newline at end of file diff --git a/src/go2_robot_sdk/go2_robot_sdk/infrastructure/ros2/ros2_publisher.py b/src/go2_robot_sdk/go2_robot_sdk/infrastructure/ros2/ros2_publisher.py index 4da379b..b6b67f4 100644 --- a/src/go2_robot_sdk/go2_robot_sdk/infrastructure/ros2/ros2_publisher.py +++ b/src/go2_robot_sdk/go2_robot_sdk/infrastructure/ros2/ros2_publisher.py @@ -42,19 +42,22 @@ def publish_odometry(self, robot_data: RobotData) -> None: try: robot_idx = int(robot_data.robot_id) + # Single timestamp for both transform and odometry to ensure logical consistency + current_time = self.node.get_clock().now().to_msg() + # Publish transform - self._publish_transform(robot_data, robot_idx) + self._publish_transform(robot_data, robot_idx, current_time) # Publish odometry topic - self._publish_odometry_topic(robot_data, robot_idx) + self._publish_odometry_topic(robot_data, robot_idx, current_time) except Exception as e: logger.error(f"Error publishing odometry: {e}") - def _publish_transform(self, robot_data: RobotData, robot_idx: int) -> None: + def _publish_transform(self, robot_data: RobotData, robot_idx: int, current_time) -> None: """Publish TF transform""" odom_trans = TransformStamped() - odom_trans.header.stamp = self.node.get_clock().now().to_msg() + odom_trans.header.stamp = current_time odom_trans.header.frame_id = 'odom' if self.config.conn_mode == 'single': @@ -76,10 +79,10 @@ def _publish_transform(self, robot_data: RobotData, robot_idx: int) -> None: self.broadcaster.sendTransform(odom_trans) - def _publish_odometry_topic(self, robot_data: RobotData, robot_idx: int) -> None: + def _publish_odometry_topic(self, robot_data: RobotData, robot_idx: int, current_time) -> None: """Publish Odometry topic""" odom_msg = Odometry() - odom_msg.header.stamp = self.node.get_clock().now().to_msg() + odom_msg.header.stamp = current_time odom_msg.header.frame_id = 'odom' if self.config.conn_mode == 'single': @@ -108,8 +111,11 @@ def publish_joint_state(self, robot_data: RobotData) -> None: try: robot_idx = int(robot_data.robot_id) + # Phase 1 optimization: single timestamp call per message to reduce system call overhead + current_time = self.node.get_clock().now().to_msg() + joint_state = JointState() - joint_state.header.stamp = self.node.get_clock().now().to_msg() + joint_state.header.stamp = current_time # Define joint names if self.config.conn_mode == 'single': @@ -196,9 +202,12 @@ def publish_lidar_data(self, robot_data: RobotData) -> None: 0 ) + # Phase 1 optimization: single timestamp call per message to reduce system call overhead + current_time = self.node.get_clock().now().to_msg() + point_cloud = PointCloud2() point_cloud.header = Header(frame_id="odom") - point_cloud.header.stamp = self.node.get_clock().now().to_msg() + point_cloud.header.stamp = current_time fields = [ PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1), @@ -213,6 +222,16 @@ def publish_lidar_data(self, robot_data: RobotData) -> None: except Exception as e: logger.error(f"Error publishing lidar data: {e}") + def has_camera_subscribers(self, robot_id: str) -> bool: + """Check if there are active subscribers to camera topics""" + try: + robot_idx = int(robot_id) + camera_subs = self.publishers['camera'][robot_idx].get_subscription_count() + camera_info_subs = self.publishers['camera_info'][robot_idx].get_subscription_count() + return camera_subs > 0 or camera_info_subs > 0 + except Exception: + return True # Default to processing if check fails + def publish_camera_data(self, robot_data: RobotData) -> None: """Publish camera data""" if not robot_data.camera_data: @@ -222,13 +241,23 @@ def publish_camera_data(self, robot_data: RobotData) -> None: robot_idx = int(robot_data.robot_id) camera = robot_data.camera_data + # Phase 1 optimization: single timestamp call per message to reduce system call overhead + current_time = self.node.get_clock().now().to_msg() + # Convert to ROS Image ros_image = self.bridge.cv2_to_imgmsg(camera.image, encoding=camera.encoding) - ros_image.header.stamp = self.node.get_clock().now().to_msg() - - # Camera info + ros_image.header.stamp = current_time + + # Camera info - reuse same timestamp for consistency + # Phase 1: Direct lookup with explicit error handling (no fallback) + if camera.height not in self.camera_info: + available = list(self.camera_info.keys()) + raise ValueError(f"No camera calibration found for {camera.height}p resolution. " + f"Available resolutions: {available}. " + f"Ensure front_camera_{camera.height}.yaml exists in calibration directory.") + camera_info = self.camera_info[camera.height] - camera_info.header.stamp = ros_image.header.stamp + camera_info.header.stamp = current_time if self.config.conn_mode == 'single': camera_info.header.frame_id = 'front_camera' diff --git a/src/go2_robot_sdk/go2_robot_sdk/presentation/go2_driver_node.py b/src/go2_robot_sdk/go2_robot_sdk/presentation/go2_driver_node.py index 71851b3..4a50575 100644 --- a/src/go2_robot_sdk/go2_robot_sdk/presentation/go2_driver_node.py +++ b/src/go2_robot_sdk/go2_robot_sdk/presentation/go2_driver_node.py @@ -1,9 +1,17 @@ # Copyright (c) 2024, RoboVerse community # SPDX-License-Identifier: BSD-3-Clause +# Phase 2A+: Configure FFmpeg environment variables before any imports +import os +os.environ['AV_LOG_LEVEL'] = 'fatal' # Suppress FFmpeg log messages +os.environ['FFMPEG_LOG_LEVEL'] = 'fatal' # Alternative FFmpeg log control +os.environ['AV_CODEC_FLAG_ERROR_CONCEALMENT'] = '1' # Enable error concealment + import asyncio +import collections import logging import os +import time from typing import Dict, Any from aiortc import MediaStreamTrack @@ -31,6 +39,44 @@ logger.setLevel(logging.INFO) +class VideoErrorTracker: + """Track H.264 error rates for adaptive behavior""" + + def __init__(self, window_size=50): + self.error_window = collections.deque(maxlen=window_size) + self.total_errors = 0 + self.last_error_time = 0 + + def record_error(self): + """Record a video processing error""" + current_time = time.time() + self.error_window.append(1) + self.total_errors += 1 + self.last_error_time = current_time + + def record_success(self): + """Record successful video processing""" + self.error_window.append(0) + + def get_error_rate(self) -> float: + """Get current error rate (0.0 to 1.0)""" + if len(self.error_window) < 10: + return 0.0 + return sum(self.error_window) / len(self.error_window) + + def is_error_storm(self) -> bool: + """Check if we're in an error storm (>50% error rate)""" + return self.get_error_rate() > 0.5 + + def get_stats(self) -> dict: + """Get error statistics for logging""" + return { + 'error_rate': self.get_error_rate(), + 'total_errors': self.total_errors, + 'window_size': len(self.error_window) + } + + class Go2DriverNode(Node): """Main Go2 driver node - entry point to the application""" @@ -68,6 +114,12 @@ def __init__(self, event_loop=None): # Set callback for data self.webrtc_adapter.set_data_callback(self._on_robot_data_received) + # Phase 2A: Configure aiortc logging to reduce H.264 error spam + self._configure_aiortc_logging() + + # Phase 2A+: Configure FFmpeg error concealment for robust H.264 decoding + self._configure_ffmpeg_error_concealment() + # Subscribers initialization self._setup_subscribers() @@ -289,14 +341,84 @@ def _on_robot_data_received(self, msg: Dict[str, Any], robot_id: str) -> None: """Callback for receiving data from robot""" self.robot_data_service.process_webrtc_message(msg, robot_id) + def _configure_aiortc_logging(self) -> None: + """Configure aiortc logging to reduce H.264 error spam""" + logging.getLogger('aiortc.codecs.h264').setLevel(logging.ERROR) + logging.getLogger('aiortc.codecs').setLevel(logging.ERROR) + logging.getLogger('aiortc').setLevel(logging.WARNING) + logger.info("Configured aiortc logging to reduce H.264 error spam") + + def _configure_ffmpeg_error_concealment(self) -> None: + """Configure FFmpeg error concealment and log suppression for robust H.264 decoding""" + try: + import av + import av.logging + + # Primary approach: PyAV log level control (most reliable) + av.logging.set_level(av.logging.QUIET) # Complete suppression of FFmpeg messages + logger.info("Configured PyAV to completely suppress FFmpeg H.264 decoder messages") + + # Secondary approach: Try to configure codec options (may not work in all versions) + try: + # Attempt to set global decoder options for error concealment + h264_codec = av.codec.Codec('h264', 'r') + if hasattr(h264_codec, 'options'): + h264_codec.options.update({ + 'error_concealment': '3', # All error concealment methods + 'ec': '3', # Alternative parameter name + }) + logger.info("Configured H.264 decoder error concealment options") + except Exception as codec_error: + logger.debug(f"Could not configure codec options (this is normal): {codec_error}") + + except ImportError: + logger.warning("PyAV not available - cannot suppress FFmpeg logging") + except Exception as e: + logger.warning(f"Could not configure PyAV logging: {e}") + + # Environment variables were set at module import time + logger.info("FFmpeg environment variables configured at module import (AV_LOG_LEVEL=fatal)") + async def _on_video_frame(self, track: MediaStreamTrack, robot_id: str) -> None: """Callback for processing video frames""" - # logger.info(f"Video frame received for robot {robot_id}") + # Phase 1 optimizations: Frame rate limiting, resolution downscaling, + # subscription checking, and enhanced error handling + max_fps = 15 # Limit to 15 FPS for CPU efficiency + frame_interval = 1.0 / max_fps + last_frame_time = 0 + consecutive_errors = 0 + max_consecutive_errors = 5 + + # Phase 2A: Error tracking for adaptive behavior and diagnostics + error_tracker = VideoErrorTracker() + last_stats_log = 0 while True: try: - frame = await track.recv() - img = frame.to_ndarray(format="rgb24") + # Phase 2A: Add timeout to prevent indefinite blocking during H.264 decoder recovery + frame = await asyncio.wait_for(track.recv(), timeout=0.1) + + # Phase 1: Frame rate limiting - skip frames if processing too fast + current_time = time.time() + if current_time - last_frame_time < frame_interval: + continue # Skip frame to maintain target FPS + + # Phase 1: Enhanced frame skipping - more aggressive under load + if consecutive_errors > 3: # High stress - skip 75% of frames + if int(current_time * 10) % 4 != 0: + continue + elif consecutive_errors > 1: # Moderate stress - skip 50% of frames + if int(current_time * 10) % 2 == 0: + continue + + last_frame_time = current_time + + # Phase 1: Subscription checking - skip processing if no subscribers + if not self.ros2_publisher.has_camera_subscribers(robot_id): + continue # Skip frame processing when no one is listening + + # Phase 1: Resolution downscaling - 85% data reduction (1920x1080 -> 640x480) + img = frame.to_ndarray(format="rgb24", width=640, height=480) # Create camera data camera_data = CameraData( @@ -314,11 +436,44 @@ async def _on_video_frame(self, track: MediaStreamTrack, robot_id: str) -> None: # Publish via ROS2Publisher self.ros2_publisher.publish_camera_data(robot_data) + + # Reset error counter on successful processing + consecutive_errors = 0 + + # Phase 2A: Record successful frame processing + error_tracker.record_success() + + # Phase 2A: Periodic error statistics logging (every 30 seconds) + current_time = time.time() + if current_time - last_stats_log > 30: + stats = error_tracker.get_stats() + if stats['total_errors'] > 0: + logger.info(f"Video error stats: {stats['error_rate']:.1%} error rate, " + f"{stats['total_errors']} total errors") + last_stats_log = current_time + await asyncio.sleep(0) + except asyncio.TimeoutError: + # Phase 2A: Handle frame receive timeout - skip frame without counting as error + continue + except Exception as e: - logger.error(f"Error processing video frame: {e}") - break + # Phase 1: Enhanced error handling for video failures + consecutive_errors += 1 + + # Phase 2A: Record error for tracking and statistics + error_tracker.record_error() + + logger.error(f"Error processing video frame (attempt {consecutive_errors}): {e}") + + # Phase 1: Graceful degradation - break after too many consecutive errors + if consecutive_errors >= max_consecutive_errors: + logger.error(f"Too many consecutive video errors ({consecutive_errors}), stopping video processing for robot {robot_id}") + break + + # Brief pause before retry to avoid tight error loop + await asyncio.sleep(0.1) # CycloneDDS callbacks def _on_cyclonedds_low_state(self, msg: LowState) -> None: