From add7e7c656060ef6293d1f46e39e4cff0631a034 Mon Sep 17 00:00:00 2001 From: Rob Royce Date: Wed, 21 Aug 2024 10:03:34 -0700 Subject: [PATCH] ROS2 tools update (#10) * refactor: better error handling and response parsing for ROS2 tools, add blacklist where applicable. * feat(ros2): add ros2 topic echo tool. * chore: bump version to 1.0.4, update CHANGELOG.md --- CHANGELOG.md | 15 ++ Dockerfile | 2 +- setup.py | 2 +- src/rosa/tools/ros1.py | 5 +- src/rosa/tools/ros2.py | 426 +++++++++++++++++++++-------------------- 5 files changed, 233 insertions(+), 217 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 263cd3c..f4ecc9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,21 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.0.4] - 2024-08-21 + +### Added + +* Implemented ros2 topic echo tool. + +### Changed + +* Refactored ROS2 tools for better error handling and response parsing. +* Added blacklist parameters to relevant ROS2 tools. + +### Fixed + +* Fixed a bug where getting a list of ROS2 log files failed. + ## [1.0.3] - 2024-08-17 ### Added diff --git a/Dockerfile b/Dockerfile index 7ac98ed..b26d22f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -28,7 +28,7 @@ RUN apt-get update && apt-get install -y \ RUN apt-get update && apt-get install -y python3.9 RUN apt-get update && apt-get install -y python3-pip RUN python3 -m pip install -U python-dotenv catkin_tools -RUN python3.9 -m pip install -U jpl-rosa>=1.0.1 +RUN python3.9 -m pip install -U jpl-rosa>=1.0.4 # Configure ROS RUN rosdep update diff --git a/setup.py b/setup.py index 3d1a130..926ba7a 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ setup( name="jpl-rosa", - version="1.0.3", + version="1.0.4", license="Apache 2.0", description="ROSA: the Robot Operating System Agent", long_description=long_description, diff --git a/src/rosa/tools/ros1.py b/src/rosa/tools/ros1.py index 9ed7e2a..36bcd93 100644 --- a/src/rosa/tools/ros1.py +++ b/src/rosa/tools/ros1.py @@ -338,7 +338,7 @@ def rostopic_echo( timeout: float = 1.0, ) -> dict: """ - Opens a new terminal window and echoes the contents of a specific ROS topic. + Echoes the contents of a specific ROS topic. :param topic: The name of the ROS topic to echo. :param count: The number of messages to echo. Valid range is 1-100. @@ -675,7 +675,7 @@ def roslog_list(min_size: int = 2048, blacklist: Optional[List[str]] = None) -> """ logs = [] - log_dirs = get_roslog_directories.invoke({}) + log_dirs = get_roslog_directories() for _, log_dir in log_dirs.items(): if not log_dir: @@ -729,7 +729,6 @@ def roslog_list(min_size: int = 2048, blacklist: Optional[List[str]] = None) -> ) -@tool def get_roslog_directories() -> dict: """Returns any available ROS log directories.""" default_directory = rospkg.get_log_dir() diff --git a/src/rosa/tools/ros2.py b/src/rosa/tools/ros2.py index cd07229..b825b69 100644 --- a/src/rosa/tools/ros2.py +++ b/src/rosa/tools/ros2.py @@ -13,18 +13,21 @@ # limitations under the License. import os +import re import subprocess -from typing import List, Optional +import time +from typing import List, Optional, Tuple from langchain.agents import tool +from rclpy.logging import get_logging_directory -def execute_ros_command(command: str, regex_pattern: str = None) -> str: +def execute_ros_command(command: str) -> Tuple[bool, str]: """ Execute a ROS2 command. :param command: The ROS2 command to execute. - :return: The output of the command. + :return: A tuple containing a boolean indicating success and the output of the command. """ # Validate the command is a proper ROS2 command @@ -40,56 +43,129 @@ def execute_ros_command(command: str, regex_pattern: str = None) -> str: try: output = subprocess.check_output(command, shell=True).decode() + return True, output except Exception as e: - return f"Error executing command '{command}': {e}" + return False, str(e) - if regex_pattern: - output = subprocess.check_output( - f"echo '{output}' | grep -E '{regex_pattern}'", shell=True - ).decode() - return output +def get_entities( + cmd: str, + delimiter: str = "\n", + pattern: str = None, + blacklist: Optional[List[str]] = None, +) -> List[str]: + """ + Get a list of ROS2 entities (nodes, topics, services, etc.). + + :param cmd: the ROS2 command to execute. + :param delimiter: The delimiter to split the output by. + :param pattern: A regular expression pattern to filter the list of entities. + :return: + """ + success, output = execute_ros_command(cmd) + + if not success: + return [output] + + entities = output.split(delimiter) + + # Filter out blacklisted entities + if blacklist: + entities = list( + filter( + lambda x: not any( + re.match(f".*{pattern}.*", x) for pattern in blacklist + ), + entities, + ) + ) + + if pattern: + entities = list(filter(lambda x: re.match(f".*{pattern}.*", x), entities)) + + return entities @tool -def ros2_node_list( - regex_pattern: str = None, blacklist: Optional[List[str]] = None -) -> List[str]: +def ros2_node_list(pattern: str = None, blacklist: Optional[List[str]] = None) -> dict: """ Get a list of ROS2 nodes running on the system. - :param regex_pattern: A regular expression pattern to filter the list of nodes. + :param pattern: A regular expression pattern to filter the list of nodes. """ cmd = "ros2 node list" - output = execute_ros_command(cmd, regex_pattern) - nodes = output.split("\n") - return [node for node in nodes if node] + nodes = get_entities(cmd, pattern=pattern, blacklist=blacklist) + return {"nodes": nodes} @tool -def ros2_topic_list(regex_pattern: str = None) -> List[str]: +def ros2_topic_list(pattern: str = None, blacklist: Optional[List[str]] = None) -> dict: """ Get a list of ROS2 topics. - :param regex_pattern: A regular expression pattern to filter the list of topics. + :param pattern: A regular expression pattern to filter the list of topics. """ cmd = "ros2 topic list" - output = execute_ros_command(cmd, regex_pattern) - topics = output.split("\n") - return [topic for topic in topics if topic] + topics = get_entities(cmd, pattern=pattern, blacklist=blacklist) + return {"topics": topics} @tool -def ros2_service_list(regex_pattern: str = None) -> List[str]: +def ros2_topic_echo( + topic: str, + count: int = 1, + return_echoes: bool = False, + delay: float = 1.0, + timeout: float = 1.0, +) -> dict: + """ + Echoes the contents of a specific ROS2 topic. + + :param topic: The name of the ROS topic to echo. + :param count: The number of messages to echo. Valid range is 1-10. + :param return_echoes: If True, return the messages as a list with the response. + :param delay: Time to wait between each message in seconds. + :param timeout: Max time to wait for a message before timing out. + + :note: Do not set return_echoes to True if the number of messages is large. + This will cause the response to be too large and may cause the tool to fail. + """ + cmd = f"ros2 topic echo {topic} --once --spin-time {timeout}" + + if count < 1 or count > 10: + return {"error": "Count must be between 1 and 10."} + + echoes = [] + for i in range(count): + success, output = execute_ros_command(cmd) + + if not success: + return {"error": output} + + print(output) + if return_echoes: + echoes.append(output) + + time.sleep(delay) + + if return_echoes: + return {"echoes": echoes} + + return {"success": True} + + +@tool +def ros2_service_list( + pattern: str = None, blacklist: Optional[List[str]] = None +) -> dict: """ Get a list of ROS2 services. - :param regex_pattern: A regular expression pattern to filter the list of services. + :param pattern: A regular expression pattern to filter the list of services. """ cmd = "ros2 service list" - output = execute_ros_command(cmd, regex_pattern) - services = output.split("\n") - return [service for service in services if service] + services = get_entities(cmd, pattern=pattern, blacklist=blacklist) + return {"services": services} @tool @@ -104,56 +180,11 @@ def ros2_node_info(nodes: List[str]) -> dict: for node_name in nodes: cmd = f"ros2 node info {node_name}" - - try: - output = execute_ros_command(cmd) - except subprocess.CalledProcessError as e: - print(f"Error getting info for node '{node_name}': {e}") - data[node_name] = dict(error=str(e)) + success, output = execute_ros_command(cmd) + if not success: + data[node_name] = dict(error=output) continue - - data[node_name] = dict( - name=node_name, - subscribers=[], - publishers=[], - service_servers=[], - service_clients=[], - action_servers=[], - action_clients=[], - ) - - lines = output.split("\n") - # Find indices for each section - subscriber_idx = lines.index(" Subscribers:") - publisher_idx = lines.index(" Publishers:") - service_server_idx = lines.index(" Service Servers:") - service_client_idx = lines.index(" Service Clients:") - action_server_idx = lines.index(" Action Servers:") - action_client_idx = lines.index(" Action Clients:") - - # Get subscribers - for i in range(subscriber_idx + 1, publisher_idx): - data[node_name]["subscribers"].append(lines[i].strip()) - - # Get publishers - for i in range(publisher_idx + 1, service_server_idx): - data[node_name]["publishers"].append(lines[i].strip()) - - # Get service servers - for i in range(service_server_idx + 1, service_client_idx): - data[node_name]["service_servers"].append(lines[i].strip()) - - # Get service clients - for i in range(service_client_idx + 1, action_server_idx): - data[node_name]["service_clients"].append(lines[i].strip()) - - # Get action servers - for i in range(action_server_idx + 1, action_client_idx): - data[node_name]["action_servers"].append(lines[i].strip()) - - # Get action clients - for i in range(action_client_idx + 1, len(lines)): - data[node_name]["action_clients"].append(lines[i].strip()) + data[node_name] = output return data @@ -219,35 +250,40 @@ def ros2_topic_info(topics: List[str]) -> dict: data = {} for topic in topics: - try: - cmd = f"ros2 topic info {topic} --verbose" - output = execute_ros_command(cmd) + cmd = f"ros2 topic info {topic} --verbose" + success, output = execute_ros_command(cmd) + if not success: + topic_info = dict(error=output) + else: topic_info = parse_ros2_topic_info(output) - except subprocess.CalledProcessError as e: - topic_info = dict(error=str(e)) + data[topic] = topic_info return data @tool -def ros2_param_list(node_name: Optional[str]) -> dict: +def ros2_param_list( + node_name: Optional[str] = None, + pattern: str = None, + blacklist: Optional[List[str]] = None, +) -> dict: """ Get a list of parameters for a ROS2 node. :param node_name: An optional ROS2 node name to get parameters for. If not provided, all parameters are listed. + :param pattern: A regular expression pattern to filter the list of parameters. """ if node_name: cmd = f"ros2 param list {node_name}" - output = execute_ros_command(cmd) - - # Trim all whitespace and split by newline - params = output.strip().split("\n") - params = [param.strip() for param in params if param] + params = get_entities(cmd, pattern=pattern, blacklist=blacklist) return {node_name: params} else: cmd = f"ros2 param list" - output = execute_ros_command(cmd) + success, output = execute_ros_command(cmd) + + if not success: + return {"error": output} # When we get a list of all nodes params, we have to parse it # The node name starts with a '/' and the params are indented @@ -258,13 +294,13 @@ def ros2_param_list(node_name: Optional[str]) -> dict: if line.startswith("/"): current_node = line data[current_node] = [] - else: + elif line.strip() != "": data[current_node].append(line.strip()) return data @tool -def ros2_param_get(node_name: str, param_name: str) -> str: +def ros2_param_get(node_name: str, param_name: str) -> dict: """ Get the value of a parameter for a ROS2 node. @@ -272,12 +308,16 @@ def ros2_param_get(node_name: str, param_name: str) -> str: :param param_name: The name of the parameter. """ cmd = f"ros2 param get {node_name} {param_name}" - output = execute_ros_command(cmd) - return output + success, output = execute_ros_command(cmd) + + if not success: + return {"error": output} + + return {param_name: output} @tool -def ros2_param_set(node_name: str, param_name: str, param_value: str) -> str: +def ros2_param_set(node_name: str, param_name: str, param_value: str) -> dict: """ Set the value of a parameter for a ROS2 node. @@ -286,28 +326,12 @@ def ros2_param_set(node_name: str, param_name: str, param_value: str) -> str: :param param_value: The value to set the parameter to. """ cmd = f"ros2 param set {node_name} {param_name} {param_value}" - output = execute_ros_command(cmd) - return output + success, output = execute_ros_command(cmd) + if not success: + return {"error": output} -@tool -def ros2_service_info(services: List[str]) -> dict: - """ - Get information about a ROS2 service. - - :param service_name: The name of the ROS2 service. - """ - data = {} - - for service_name in services: - cmd = f"ros2 service info {service_name}" - try: - output = execute_ros_command(cmd) - data[service_name] = output - except subprocess.CalledProcessError as e: - data[service_name] = dict(error=str(e)) - - return data + return {param_name: output} @tool @@ -315,23 +339,25 @@ def ros2_service_info(services: List[str]) -> dict: """ Get information about a ROS2 service. - :param service_name: The name of the ROS2 service. + :param services: a list of ROS2 service names. """ data = {} for service_name in services: - cmd = f"ros2 service info {service_name}" - try: - output = execute_ros_command(cmd) - data[service_name] = output - except subprocess.CalledProcessError as e: - data[service_name] = dict(error=str(e)) + cmd = f"ros2 service type {service_name}" + success, output = execute_ros_command(cmd) + + if not success: + data[service_name] = dict(error=output) + continue + + data[service_name] = output return data @tool -def ros2_service_call(service_name: str, srv_type: str, request: str) -> str: +def ros2_service_call(service_name: str, srv_type: str, request: str) -> dict: """ Call a ROS2 service. @@ -340,116 +366,92 @@ def ros2_service_call(service_name: str, srv_type: str, request: str) -> str: :param request: The request to send to the service. """ cmd = f'ros2 service call {service_name} {srv_type} "{request}"' - try: - output = execute_ros_command(cmd) - except Exception as e: - output = f"Error calling '{service_name}'. Command that was run: {cmd}. Error message: {e}" - return output + success, output = execute_ros_command(cmd) + if not success: + return {"error": output} + return {"response": output} @tool -def ros2_doctor() -> str: +def ros2_doctor() -> dict: """ Check ROS setup and other potential issues. """ cmd = "ros2 doctor" - output = execute_ros_command(cmd) - return output - + success, output = execute_ros_command(cmd) + if not success: + return {"error": output} + return {"results": output} -def get_ros2_log_root() -> str: - """ - Get the root directory for ROS2 log files. - """ - ros2_log_dir = os.environ.get("ROS_LOG_DIR", None) - ros_home = os.environ.get("ROS_HOME", None) - if not ros2_log_dir and ros_home: - ros2_log_dir = os.path.join(ros_home, "log") - elif not ros2_log_dir: - ros2_log_dir = os.path.join(os.path.expanduser("~"), ".ros/log") +def ros2_log_directories(): + """Get any available ROS2 log directories.""" + log_dir = get_logging_directory() + print(f"ROS 2 logs are stored in: {log_dir}") - return ros2_log_dir + return {"default": f"{log_dir}"} @tool -def ros2_log_list(ros_log_dir: Optional[str]) -> dict: - """Returns a list of ROS2 log files. - - :param ros_log_dir: The directory where ROS2 log files are stored. If not provided, the default ROS2 log directory is used. +def roslog_list(min_size: int = 2048, blacklist: Optional[List[str]] = None) -> dict: """ + Returns a list of ROS log files. - # The log files will either be in $ROS_LOG_DIR (if it exists) or $ROS_HOME/log - # First check if either of those env variables are set, starting with ROS_LOG_DIR - ros_log_dir = ros_log_dir or get_ros2_log_root() - - if not os.path.exists(ros_log_dir): - return dict(error=f"ROS log directory '{ros_log_dir}' does not exist.") - - log_files = [f for f in os.listdir(ros_log_dir) if f.endswith(".log")] - - # Get metadata for each file - log_files_with_metadata = [] - for log_file in log_files: - log_file_path = os.path.join(ros_log_dir, log_file) - log_file_size = os.path.getsize(log_file_path) - - log_lines = [] - with open(log_file_path, "r") as f: - log_lines = f.readlines() - - debug = 0 - info = 0 - warnings = 0 - errors = 0 - for line in log_lines: - if line.startswith("[WARN]"): - warnings += 1 - elif line.startswith("[ERROR]"): - errors += 1 - elif line.startswith("[INFO]"): - info += 1 - elif line.startswith("[DEBUG]"): - debug += 1 - - log_file_lines = len(log_lines) - log_files_with_metadata.append( - dict( - name=log_file, - bytes=log_file_size, - lines=log_file_lines, - debug=debug, - info=info, - warnings=warnings, - errors=errors, - ) - ) - - return dict(log_file_directory=ros_log_dir, log_files=log_files_with_metadata) - - -@tool -def ros2_read_log(log_file_name: str, level: Optional[str]) -> dict: - """Read a ROS2 log file. - - :param log_file_name: The name of the log file to read. - :param level: (optional) The log level to filter by. If not provided, all log messages are returned. + :param min_size: The minimum size of the log file in bytes to include in the list. """ - ros_log_dir = get_ros2_log_root() - log_file_path = os.path.join(ros_log_dir, log_file_name) - if not os.path.exists(log_file_path): - return dict(error=f"Log file '{log_file_name}' does not exist.") + logs = [] + log_dirs = ros2_log_directories() - log_lines = [] - with open(log_file_path, "r") as f: - log_lines = f.readlines() + for _, log_dir in log_dirs.items(): + if not log_dir: + continue - res = dict(log_file=log_file_name, log_dir=ros_log_dir, lines=[]) + # Get all .log files in the directory + log_files = [ + os.path.join(log_dir, f) + for f in os.listdir(log_dir) + if os.path.isfile(os.path.join(log_dir, f)) and f.endswith(".log") + ] + + print(f"Log files: {log_files}") + + # Filter out blacklisted files + if blacklist: + log_files = list( + filter( + lambda x: not any( + re.match(f".*{pattern}.*", x) for pattern in blacklist + ), + log_files, + ) + ) - for line in log_lines: - if level and not line.startswith(f"[{level.upper()}]"): - continue - res["lines"].append(line.strip()) + # Filter out files that are too small + log_files = list(filter(lambda x: os.path.getsize(x) > min_size, log_files)) + + # Get the size of each log file in KB or MB if it's larger than 1 MB + log_files = [ + { + f.replace(log_dir, ""): ( + f"{round(os.path.getsize(f) / 1024, 2)} KB" + if os.path.getsize(f) < 1024 * 1024 + else f"{round(os.path.getsize(f) / (1024 * 1024), 2)} MB" + ), + } + for f in log_files + ] + + if len(log_files) > 0: + logs.append( + { + "directory": log_dir, + "total": len(log_files), + "files": log_files, + } + ) - return res + return dict( + total=len(logs), + logs=logs, + )