diff --git a/README.md b/README.md index bc8b777..3741667 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ ros_utils.get_images_from_bag( output_folder="/output_imgs/", file_format="jpg", create_manifest=True, - topics="/camera/image_raw", + topics=["/camera/image_raw"], naming="sequential", resize=[640,480], sample=2, @@ -65,7 +65,7 @@ Do you have a request for a data format that's not listed above? Raise an issue ## Community -If you have any questions, comments, or want to chat, please join [our Slack channel](#). +If you have any questions, comments, or want to chat, please join [our Discord channel](https://discord.com/invite/rvXqP6EjwF). ## Contribute ### How to Contribute diff --git a/python/README.md b/python/README.md deleted file mode 100644 index e69de29..0000000 diff --git a/python/robologs_ros_utils/cli.py b/python/robologs_ros_utils/cli.py index ee94251..9b9db54 100644 --- a/python/robologs_ros_utils/cli.py +++ b/python/robologs_ros_utils/cli.py @@ -1,13 +1,9 @@ import click -from robologs_ros_utils.sources.ros1 import ( - clip_rosbag, - get_csv_data_from_bag, - get_images_from_bag, - get_summary_from_bag, - get_videos_from_bag, - split_rosbag, -) +from robologs_ros_utils.sources.ros1 import (get_csv_data_from_bag, + get_images_from_bag, + get_summary_from_bag, + get_videos_from_bag) @click.group(invoke_without_command=True) @@ -33,9 +29,7 @@ def cli(ctx): cli.add_command(get_images_from_bag.get_images) cli.add_command(get_videos_from_bag.get_videos) cli.add_command(get_summary_from_bag.get_summary) -cli.add_command(clip_rosbag.clip_rosbag) cli.add_command(get_csv_data_from_bag.get_csv_data_from_bag) -cli.add_command(split_rosbag.split_rosbag) def main(): diff --git a/python/robologs_ros_utils/sources/ros1/argument_parsers.py b/python/robologs_ros_utils/sources/ros1/argument_parsers.py index 1b2340a..cb8153b 100644 --- a/python/robologs_ros_utils/sources/ros1/argument_parsers.py +++ b/python/robologs_ros_utils/sources/ros1/argument_parsers.py @@ -1,9 +1,10 @@ def get_width_height_from_args(resize_arg: str) -> list: """ + Get width and height from resize argument Args: resize_arg (str): - Returns: + Returns: list of width and height """ if not resize_arg: diff --git a/python/robologs_ros_utils/sources/ros1/clip_rosbag.py b/python/robologs_ros_utils/sources/ros1/clip_rosbag.py deleted file mode 100644 index 65b1211..0000000 --- a/python/robologs_ros_utils/sources/ros1/clip_rosbag.py +++ /dev/null @@ -1,62 +0,0 @@ -import os.path - -import click - -from robologs_ros_utils.sources.ros1 import ros_utils -from robologs_ros_utils.utils import file_utils - - -@click.command() -@click.option("--input", "-i", type=str, required=True, help="A single rosbag or folder with rosbags") -@click.option("--output", "-o", type=str, required=True, help="Output directory") -@click.option("--topics", type=str, help="Topic names used for extraction, comma-separated list") -@click.option("--start-time", type=float, help="Only extract from start time") -@click.option("--end-time", type=float, help="Only extract until end time") -@click.option("--name", type=str, help="Output name of rosbag, only is used when there is a single input bag") -@click.option("--timestamp_type", type=str, help="valid values are [rosbag_ns, offset_s]", default="rosbag_ns") -def clip_rosbag(input, output, topics, start_time, end_time, name, timestamp_type): - """Clip rosbags based on topics and timestamps""" - input_path = input - output_path = output - topics = topics.split(",") if topics is not None else topics - start_time = start_time if start_time is not None else start_time - end_time = end_time if end_time is not None else end_time - timestamp_type = timestamp_type - - if os.path.isdir(input_path): - rosbag_file_list = file_utils.get_all_files_of_type_in_directory(input_folder=input_path, file_format="bag") - for it, rosbag_path in enumerate(rosbag_file_list): - - _, output_name = os.path.split(rosbag_path) - output_name = output_name.replace(".bag", "_cropped_.bag") - - output_path_cropped_bag = os.path.join(output_path, output_name) - - ros_utils.get_clipped_bag_file( - input_bag_path=rosbag_path, - output_bag_path=output_path_cropped_bag, - start_time=start_time, - end_time=end_time, - topic_list=topics, - timestamp_type=timestamp_type, - ) - - if os.path.isfile(input_path): - if name: - output_name = name - else: - _, output_name = os.path.split(input_path) - output_name = output_name.replace(".bag", "_cropped.bag") - output_path_cropped_bag = os.path.join(output_path, output_name) - ros_utils.get_clipped_bag_file( - input_bag_path=input_path, - output_bag_path=output_path_cropped_bag, - start_time=start_time, - end_time=end_time, - topic_list=topics, - timestamp_type=timestamp_type, - ) - - -if __name__ == "__main__": - clip_rosbag() diff --git a/python/robologs_ros_utils/sources/ros1/commands.py b/python/robologs_ros_utils/sources/ros1/commands.py index 88bd923..da291d7 100644 --- a/python/robologs_ros_utils/sources/ros1/commands.py +++ b/python/robologs_ros_utils/sources/ros1/commands.py @@ -1,6 +1,6 @@ import click -from . import clip_rosbag, get_images_from_bag, get_summary_from_bag, get_videos_from_bag, split_rosbag +from . import get_images_from_bag, get_summary_from_bag, get_videos_from_bag @click.group() @@ -14,5 +14,3 @@ def ros(): ros.add_command(get_images_from_bag.get_images) ros.add_command(get_videos_from_bag.get_videos) ros.add_command(get_summary_from_bag.get_summary) -ros.add_command(clip_rosbag.clip_rosbag) -ros.add_command(split_rosbag.split_rosbag) diff --git a/python/robologs_ros_utils/sources/ros1/get_csv_data_from_bag.py b/python/robologs_ros_utils/sources/ros1/get_csv_data_from_bag.py index 36a0f65..47e8f9a 100644 --- a/python/robologs_ros_utils/sources/ros1/get_csv_data_from_bag.py +++ b/python/robologs_ros_utils/sources/ros1/get_csv_data_from_bag.py @@ -1,9 +1,6 @@ -import os - import click from robologs_ros_utils.sources.ros1 import ros_utils -from robologs_ros_utils.utils import file_utils def process_topics(ctx, param, value): diff --git a/python/robologs_ros_utils/sources/ros1/get_summary_from_bag.py b/python/robologs_ros_utils/sources/ros1/get_summary_from_bag.py index 3ca4e72..2c6f2b1 100644 --- a/python/robologs_ros_utils/sources/ros1/get_summary_from_bag.py +++ b/python/robologs_ros_utils/sources/ros1/get_summary_from_bag.py @@ -1,6 +1,6 @@ -import os.path import json import os +import os.path import click diff --git a/python/robologs_ros_utils/sources/ros1/ros_img_tools.py b/python/robologs_ros_utils/sources/ros1/ros_img_tools.py index 2d08039..bb18938 100644 --- a/python/robologs_ros_utils/sources/ros1/ros_img_tools.py +++ b/python/robologs_ros_utils/sources/ros1/ros_img_tools.py @@ -1,66 +1,75 @@ import glob -import io import os import subprocess import cv2 import numpy as np -from PIL import Image def convert_compressed_depth_to_cv2(compressed_depth): """ - Convert a compressedDepth topic image into a cv2 image. - compressed_depth must be from a topic /bla/compressedDepth - as it's encoded in PNG - Code from: https://answers.ros.org/question/249775/display-compresseddepth-image-python-cv2/ + Converts a ROS compressedDepth image into an OpenCV image. + + Args: + compressed_depth: A sensor_msgs/CompressedImage message from a ROS compressedDepth topic. + + Returns: + OpenCV image representing the depth image. + + Raises: + Exception: If the compression type is not 'compressedDepth' or the depth image could not be decoded. + + Note: + Code adapted from: https://answers.ros.org/question/249775/display-compresseddepth-image-python-cv2/ """ depth_fmt, compr_type = compressed_depth.format.split(";") - # remove white space depth_fmt = depth_fmt.strip() compr_type = compr_type.strip().replace(" png", "") + if compr_type != "compressedDepth": - raise Exception("Compression type is not 'compressedDepth'." "You probably subscribed to the wrong topic.") + raise Exception("Compression type is not 'compressedDepth'. Incorrect topic subscribed.") depth_header_size = 12 raw_data = compressed_depth.data[depth_header_size:] - depth_img_raw = cv2.imdecode(np.frombuffer(raw_data, np.uint8), 0) + depth_img_raw = cv2.imdecode(np.frombuffer(raw_data, np.uint8), cv2.IMREAD_UNCHANGED) if depth_img_raw is None: - # probably wrong header size - raise Exception("Could not decode compressed depth image." "You may need to change 'depth_header_size'!") - result = cv2.normalize(depth_img_raw, depth_img_raw, 0, 255, norm_type=cv2.NORM_MINMAX) + raise Exception("Could not decode compressed depth image. Adjust 'depth_header_size' if necessary.") - im_color = cv2.applyColorMap(depth_img_raw, cv2.COLORMAP_JET) + depth_img_norm = cv2.normalize(depth_img_raw, depth_img_raw, 0, 255, norm_type=cv2.NORM_MINMAX) + depth_img_colormap = cv2.applyColorMap(depth_img_raw, cv2.COLORMAP_JET) - return im_color + return depth_img_colormap def convert_image_to_cv2(msg): """ + Converts a ROS image message into an OpenCV image. + Args: - msg (): + msg: A sensor_msgs/Image message. Returns: - + OpenCV image. """ - np_arr = np.fromstring(msg.data, np.uint8) - image_np = cv2.imdecode(np_arr, cv2.IMREAD_UNCHANGED) - return image_np + np_arr = np.frombuffer(msg.data, np.uint8) + return cv2.imdecode(np_arr, cv2.IMREAD_UNCHANGED) def create_video_from_images(input_path, output_path, output_name="video.mp4", frame_rate=12, resize=None): """ + Creates a video from a collection of images stored in a specified directory. + Args: - input_path (): - output_path (): - output_name (): - frame_rate (): - resize (): + input_path (str): Directory where the input images are stored. + output_path (str): Directory where the output video will be saved. + output_name (str): Name of the output video file. Defaults to 'video.mp4'. + frame_rate (int): Frame rate of the output video. Defaults to 12. + resize (float, optional): Factor by which to resize the images. If None, images are not resized. Returns: - + None """ img_array = [] output_video_path = os.path.join(output_path, output_name) @@ -69,21 +78,18 @@ def create_video_from_images(input_path, output_path, output_name="video.mp4", f img_list = sorted(glob.glob(os.path.join(input_path, "./*.jpg"))) if not img_list: - sorted(glob.glob(os.path.join(input_path, "./*.png"))) + img_list = sorted(glob.glob(os.path.join(input_path, "./*.png"))) if not img_list: - return + print("No images found in the specified directory.") + return for filename in img_list: img = cv2.imread(filename) if resize: img = cv2.resize(img, (0, 0), fx=resize, fy=resize, interpolation=cv2.INTER_LANCZOS4) - if len(img.shape) == 3: - height, width, _ = img.shape - else: - height, width = img.shape - + height, width = (img.shape[0], img.shape[1]) if len(img.shape) == 3 else img.shape size = (width, height) img_array.append(img) @@ -93,6 +99,6 @@ def create_video_from_images(input_path, output_path, output_name="video.mp4", f out.write(img_array[i]) out.release() + # Using FFmpeg to convert the temporary video to final format subprocess.call(["ffmpeg", "-i", output_video_path_temp, "-vcodec", "libx264", "-y", output_video_path]) os.remove(output_video_path_temp) - return diff --git a/python/robologs_ros_utils/sources/ros1/ros_utils.py b/python/robologs_ros_utils/sources/ros1/ros_utils.py index 82e8683..84795eb 100644 --- a/python/robologs_ros_utils/sources/ros1/ros_utils.py +++ b/python/robologs_ros_utils/sources/ros1/ros_utils.py @@ -1,20 +1,19 @@ -"""Rosbag utilities +"""Rosbag Utilities -This module contains functions to extract data from rosbags. +This module contains functions to extract and handle data from ROS bag files. """ + import glob import logging import os import shutil -from typing import Optional, Tuple, Union +from typing import List, Optional, Tuple, Union import cv2 import numpy as np -#import rosbag from bagpy import bagreader from PIL import Image -#from rosbag import Bag from rosbags.rosbag1 import Reader from rosbags.serde import deserialize_cdr, ros1_to_cdr from tqdm import tqdm @@ -25,14 +24,17 @@ def get_bag_info_from_file(rosbag_path: str) -> dict: """ + Retrieve metadata from a specified ROS bag file. + Args: - rosbag_path (str): Input file path rosbag. + rosbag_path (str): The file path to the rosbag. Returns: - dict: Dictionary with rosbag metadata. + dict: A dictionary containing rosbag metadata such as start time, end time, duration, size, and topics. + Raises: + Exception: If the rosbag file does not exist or the file provided is not a valid rosbag. """ - if not os.path.exists(rosbag_path): raise Exception(f"{rosbag_path} does not exist.") @@ -41,49 +43,53 @@ def get_bag_info_from_file(rosbag_path: str) -> dict: try: bag = bagreader(rosbag_path, tmp=True) - - except: - f"Couldn't open rosbag...skipping" + except Exception as e: + logging.error(f"Couldn't open rosbag due to error: {e}. Skipping...") return dict() file_stats = os.stat(rosbag_path) - summary_dict = dict() - summary_dict["file_name"] = os.path.split(rosbag_path)[1] - summary_dict["start_time"] = bag.start_time - summary_dict["end_time"] = bag.end_time - summary_dict["duration"] = bag.end_time - bag.start_time - summary_dict["file_size_mb"] = str(file_stats.st_size / (1024 * 1024)) - summary_dict["topics"] = bag.topic_table.to_dict("records") + summary_dict = { + "file_name": os.path.split(rosbag_path)[1], + "start_time": bag.start_time, + "end_time": bag.end_time, + "duration": bag.end_time - bag.start_time, + "file_size_mb": str(file_stats.st_size / (1024 * 1024)), + "topics": bag.topic_table.to_dict("records"), + } return summary_dict def get_topic_dict(rosbag_metadata_dict: dict) -> dict: - topic_dict = dict() + """ + Extracts the topic information from the rosbag metadata dictionary. - if "topics" in rosbag_metadata_dict: - for entry in rosbag_metadata_dict["topics"]: - topic_dict[entry["Topics"]] = entry + Args: + rosbag_metadata_dict (dict): A dictionary containing rosbag metadata. + + Returns: + dict: A dictionary where keys are topic names and values are metadata about each topic. + """ + topic_dict = {} + + for entry in rosbag_metadata_dict.get("topics", []): + topic_dict[entry["Topics"]] = entry return topic_dict def get_bag_info_from_file_or_folder(input_path: str) -> dict: """ - Recursively searches for rosbag files in the given path and returns a dictionary with their metadata. - If a rosbag file is provided, returns the metadata for that file only. - If a directory is provided, returns metadata for all rosbag files in that directory and its subdirectories. + Retrieves metadata from a rosbag file or recursively from rosbag files within a given directory. Args: - input_path (str): Input file path of a rosbag, or folder with multiple rosbags, possibly nested. + input_path (str): The file path to a rosbag or a directory containing one or more rosbag files. Returns: - dict: Dictionary with rosbag metadata for each rosbag found. - The key of the dictionary is the rosbag's absolute file path. + dict: A dictionary where each key is the absolute path of a rosbag, and the value is its corresponding metadata. """ - - rosbag_info_dict = dict() + rosbag_info_dict = {} if input_path.endswith(".bag"): rosbag_info_dict[os.path.abspath(input_path)] = get_bag_info_from_file(input_path) @@ -99,14 +105,16 @@ def get_bag_info_from_file_or_folder(input_path: str) -> dict: def create_manifest_entry_dict(msg_timestamp: int, rosbag_timestamp: int, file_path: str, index: int) -> dict: """ - Args: - msg_timestamp (int): - rosbag_timestamp (int): - file_path (str): - index (int): + Create a dictionary entry for the manifest, containing metadata about the message. - Returns: dict + Args: + msg_timestamp (int): The timestamp from the message. + rosbag_timestamp (int): The timestamp from the rosbag. + file_path (str): The file path of the image or message. + index (int): The index of the message in the topic. + Returns: + dict: A dictionary containing metadata about the message. """ _, img_name = os.path.split(file_path) return { @@ -120,152 +128,152 @@ def create_manifest_entry_dict(msg_timestamp: int, rosbag_timestamp: int, file_p def get_image_topic_types() -> list: """ - Returns: + Retrieve the list of image topic types. + Returns: + list: A list of strings representing ROS image topic types. """ return ["sensor_msgs/CompressedImage", "sensor_msgs/Image"] def get_topic_names_of_type(all_topics: list, filter_topic_types: list) -> list: """ + Filter topic names based on the specified topic types. + Args: - all_topics (list): - filter_topic_types (list): + all_topics (list): A list of dictionaries, each containing information about a topic. + filter_topic_types (list): A list of strings representing the topic types to filter by. Returns: - + list: A list of topic names that match the filtered topic types. """ return [x["Topics"] for x in all_topics if x["Types"] in filter_topic_types] def get_image_name_from_timestamp(timestamp: int, file_format: str = "jpg") -> str: """ + Generate an image file name based on the timestamp. + Args: - timestamp (int): - file_format (str): + timestamp (int): The timestamp to be incorporated into the image file name. + file_format (str): The image file format. Defaults to "jpg". Returns: - + str: A string representing the image file name. """ - img_name = f"{str(timestamp)}.{file_format}" - return img_name + return f"{timestamp}.{file_format}" def get_image_name_from_index(index: int, file_format: str = "jpg", zero_padding: int = 6) -> str: """ + Generate an image file name based on the index. + Args: - index (): - file_format (): - zero_padding (): + index (int): The index to be incorporated into the image file name. + file_format (str): The image file format. Defaults to "jpg". + zero_padding (int): The number of zeros used to pad the index in the file name. Defaults to 6. Returns: - + str: A string representing the image file name. """ - img_name = f"{str(index).zfill(zero_padding)}.{file_format}" - return img_name + return f"{str(index).zfill(zero_padding)}.{file_format}" def replace_ros_topic_name(topic_name: str, replace_character: str = "_") -> str: """ + Replace slashes in a ROS topic name with the specified character. + Args: - topic_name (): - replace_character (): + topic_name (str): The ROS topic name to be modified. + replace_character (str): The character to replace slashes with. Defaults to "_". Returns: - + str: The topic name with slashes replaced by the specified character. """ - return topic_name.replace("/", replace_character)[1:] + return topic_name.replace("/", replace_character).lstrip(replace_character) -def get_filter_fraction(start_time: Optional[float], end_time: Optional[float], start_rosbag: float, end_rosbag: float) -> Optional[float]: +def get_filter_fraction( + start_time: Optional[float], end_time: Optional[float], start_rosbag: float, end_rosbag: float +) -> Optional[float]: """ + Calculate the fraction of the rosbag duration based on the provided start and end times. + Args: - start_time (float or None): - end_time (float or None): - start_rosbag (float): - end_rosbag (float): + start_time (Optional[float]): The start time in seconds from the beginning of the rosbag. + end_time (Optional[float]): The end time in seconds until the end of the rosbag. + start_rosbag (float): The start time of the rosbag in seconds. + end_rosbag (float): The end time of the rosbag in seconds. Returns: - + Optional[float]: The fraction of the rosbag that is covered by the start and end times, if applicable; otherwise, None. """ rosbag_duration = end_rosbag - start_rosbag if rosbag_duration <= 0: return None - if start_time and end_time: - return float((end_time - start_time) / rosbag_duration) - - if start_time and not end_time: - return float((end_rosbag - start_time) / rosbag_duration) + if start_time is not None and end_time is not None: + return (end_time - start_time) / rosbag_duration - if end_time and not start_time: - return float((end_time - start_rosbag) / rosbag_duration) + if start_time is not None: + return (end_rosbag - start_time) / rosbag_duration - if not end_time and not start_time: - return 1 + if end_time is not None: + return (end_time - start_rosbag) / rosbag_duration - return None + return 1.0 # if neither start_time nor end_time is specified, the whole rosbag is considered. -def check_if_in_time_range(t: float, start_time: float, end_time: float) -> bool: +def check_if_in_time_range(t: float, start_time: Optional[float], end_time: Optional[float]) -> bool: """ + Check if a given time is within the specified time range. + Args: - t (float): - start_time (float): - end_time (float): + t (float): The time to check. + start_time (Optional[float]): The start of the time range. + end_time (Optional[float]): The end of the time range. Returns: - + bool: True if 't' is in the range specified by 'start_time' and 'end_time', False otherwise. """ - if start_time and end_time: - if t >= start_time and t <= end_time: - return True - else: - return False + if start_time is not None and t < start_time: + return False - if not start_time and end_time: - if t <= end_time: - return True - else: - return False + if end_time is not None and t > end_time: + return False - if start_time and not end_time: - if t >= start_time: - return True - else: - return False - - if not start_time and not end_time: - return True - - return False + return True # 't' is in the range if it hasn't failed any of the above conditions. def get_name_img_manifest() -> str: """ - Returns: + Get the standard file name for the image manifest file. + Returns: + str: The file name of the image manifest. """ return "img_manifest.json" def get_video_from_image_folder(folder: str, save_imgs: bool = True) -> None: """ + Create a video from images located in a specified folder. + Args: - folder (str): - delete_imgs (bool): + folder (str): The directory containing the images. + save_imgs (bool): If False, the original images are deleted after the video is created. Defaults to True. Returns: - + None """ img_manifest = file_utils.read_json(os.path.join(folder, get_name_img_manifest())) frame_rate = round(img_manifest["topic"]["Frequency"], 2) ros_img_tools.create_video_from_images(input_path=folder, output_path=folder, frame_rate=frame_rate) if not save_imgs: - file_utils.delete_files_of_type(folder) + file_utils.delete_files_of_type(folder, file_extension=".jpg") # assuming the images are in '.jpg' format return @@ -274,29 +282,31 @@ def get_images_from_bag( rosbag_path: str, output_folder: str, file_format: str = "jpg", - topics: Optional[list] = None, + topics: Optional[List[str]] = None, create_manifest: bool = True, naming: str = "sequential", - resize: Optional[list] = None, + resize: Optional[List[int]] = None, sample: Optional[int] = None, start_time: Optional[float] = None, end_time: Optional[float] = None, -) -> list: +) -> List[str]: """ - Args: - rosbag_path (str): - output_folder (str): - file_format (str): - topics (list): - create_manifest (bool): - naming (str): - resize (list): - sample (int): - start_time (float): - end_time (float): + Extract images from a ROS bag file, with various filtering and configuration options. - Returns: None + Args: + rosbag_path (str): Path to the ROS bag file. + output_folder (str): Directory where the extracted images will be saved. + file_format (str): Image format for saved files (default is "jpg"). + topics (Optional[List[str]]): List of ROS topics to extract images from. If None, auto-detects image topics. + create_manifest (bool): Whether to create a manifest file for the extracted images. + naming (str): Naming scheme for the saved image files ("sequential", "rosbag_timestamp", or "msg_timestamp"). + resize (Optional[List[int]]): If specified, resizes images to [width, height]. + sample (Optional[int]): If specified, only one out of 'sample' images will be extracted. + start_time (Optional[float]): Start time for extracting messages from the rosbag (in seconds). + end_time (Optional[float]): End time for extracting messages from the rosbag (in seconds). + Returns: + List[str]: List of paths to directories containing the extracted images, one for each topic. """ rosbag_metadata_dict = get_bag_info_from_file(rosbag_path=rosbag_path) @@ -447,8 +457,17 @@ def get_images_from_bag( return output_imgs_folder_list -def convert_offset_s_to_rosbag_ns(offset_s: int, first_rosbag_time_ns: int): +def convert_offset_s_to_rosbag_ns(offset_s: int, first_rosbag_time_ns: int) -> int: + """ + Convert a time offset in seconds to nanoseconds and add it to the timestamp of the first entry in the rosbag. + + Args: + offset_s (int): Time offset in seconds. + first_rosbag_time_ns (int): Timestamp of the first entry in the rosbag in nanoseconds. + Returns: + int: New timestamp in nanoseconds. + """ offset_ns = int(offset_s * 1e9) return offset_ns + first_rosbag_time_ns @@ -457,91 +476,108 @@ def is_message_within_time_range( time_ns: int, start_time_rosbag_ns: Optional[int] = None, end_time_rosbag_ns: Optional[int] = None ) -> Tuple[bool, bool]: """ - This function checks if a timestamp is withing a specified timerange and returns True or False if we are. It also - returns a second boolean to indicate if we are past the end time. This can be used to break out of the loop early. - Args: - time_ns (int): - start_time_rosbag_ns (int): - end_time_rosbag_ns (int): - - Returns: Tuple of booleans - - """ - - if start_time_rosbag_ns: - if time_ns < start_time_rosbag_ns: - return False, False - - if end_time_rosbag_ns: - if time_ns > end_time_rosbag_ns: - return False, True - - return True, False + Check if a ROS message timestamp falls within a specified time range. - -def get_clipped_bag_file( - input_bag_path: str, - output_bag_path: str, - topic_list: Optional[list] = None, - start_time: Optional[Union[float, int]] = None, - end_time: Optional[Union[float, int]] = None, - timestamp_type: str = "rosbag_ns", -): - """ Args: - input_bag_path (): - output_bag_path (): - topic_list (): - start_time (): - end_time (): - timestamp_type (): + time_ns (int): The message's timestamp in nanoseconds. + start_time_rosbag_ns (Optional[int]): The start of the time range in nanoseconds. + end_time_rosbag_ns (Optional[int]): The end of the time range in nanoseconds. Returns: - + Tuple[bool, bool]: + - True if the timestamp is within the range, False otherwise. + - True if the timestamp exceeds the end_time, False otherwise. """ - if timestamp_type not in ["rosbag_ns", "offset_s"]: - raise Exception(f"Robologs: invalid timestamp_type parameter: {timestamp_type}") - - with Bag(output_bag_path, "w") as outbag: - msg_counter = 0 - first_time_stamp = -1 - for topic, msg, t in Bag(input_bag_path).read_messages(): - if first_time_stamp < 0: - first_time_stamp = t.to_nsec() - if timestamp_type == "offset_s": - if start_time: - start_time = convert_offset_s_to_rosbag_ns( - offset_s=start_time, first_rosbag_time_ns=first_time_stamp - ) + # Check if the message is before the start of the desired time range + if start_time_rosbag_ns is not None and time_ns < start_time_rosbag_ns: + return False, False - if end_time: - end_time = convert_offset_s_to_rosbag_ns( - offset_s=end_time, first_rosbag_time_ns=first_time_stamp - ) + # Check if the message is after the end of the desired time range + if end_time_rosbag_ns is not None and time_ns > end_time_rosbag_ns: + return False, True - if topic_list: - if topic not in topic_list: - continue - - in_time_range, past_end_time = is_message_within_time_range( - time_ns=t.to_nsec(), start_time_rosbag_ns=start_time, end_time_rosbag_ns=end_time - ) + return True, False - # stop iterating over rosbag if we're past the user specified end-time - if past_end_time: - break - if not in_time_range: - continue +# def get_clipped_bag_file( +# input_bag_path: str, +# output_bag_path: str, +# topic_list: Optional[List[str]] = None, +# start_time: Optional[Union[float, int]] = None, +# end_time: Optional[Union[float, int]] = None, +# timestamp_type: str = "rosbag_ns", +# ) -> None: +# """ +# Create a clipped rosbag file from an input bag, filtered by topic and time range. +# +# Args: +# input_bag_path (str): Path to the input rosbag file. +# output_bag_path (str): Path where the new clipped bag should be written. +# topic_list (Optional[List[str]]): List of string topics to be included in the clipped bag. +# start_time (Optional[Union[float, int]]): The start time from which to include messages. +# end_time (Optional[Union[float, int]]): The end time until which to include messages. +# timestamp_type (str): Type of the provided timestamps, "rosbag_ns" for nanoseconds and +# "offset_s" for seconds offset. +# +# Raises: +# Exception: If an invalid timestamp_type is provided. +# +# Returns: +# None +# """ +# +# if timestamp_type not in ["rosbag_ns", "offset_s"]: +# raise Exception(f"Robologs: invalid timestamp_type parameter: {timestamp_type}") +# +# with Bag(output_bag_path, "w") as outbag: +# msg_counter = 0 +# first_time_stamp = -1 +# for topic, msg, t in Bag(input_bag_path).read_messages(): +# if first_time_stamp < 0: +# first_time_stamp = t.to_nsec() +# if timestamp_type == "offset_s": +# if start_time: +# start_time = convert_offset_s_to_rosbag_ns( +# offset_s=start_time, first_rosbag_time_ns=first_time_stamp +# ) +# +# if end_time: +# end_time = convert_offset_s_to_rosbag_ns( +# offset_s=end_time, first_rosbag_time_ns=first_time_stamp +# ) +# +# if topic_list: +# if topic not in topic_list: +# continue +# +# in_time_range, past_end_time = is_message_within_time_range( +# time_ns=t.to_nsec(), start_time_rosbag_ns=start_time, end_time_rosbag_ns=end_time +# ) +# +# # stop iterating over rosbag if we're past the user specified end-time +# if past_end_time: +# break +# +# if not in_time_range: +# continue +# +# msg_counter += 1 +# outbag.write(topic, msg, t) +# +# return - msg_counter += 1 - outbag.write(topic, msg, t) - return +def get_all_topics(rosbag_list: List[str]) -> List[str]: + """ + Retrieve all unique topics from a list of rosbag files. + Args: + rosbag_list (List[str]): List of rosbag file paths. -def get_all_topics(rosbag_list): + Returns: + List[str]: List of unique topics across all provided rosbag files. + """ topic_list = list() for rosbag_file in rosbag_list: summary_dict = get_bag_info_from_file(rosbag_file) @@ -608,70 +644,3 @@ def get_csv_data_from_bag(input_dir_or_file: str, output_dir: str, topic_list: l shutil.move(data, destination) return - - -# def split_rosbag(input_path: Union[str, os.PathLike], chunks: int, output_folder: Union[str, os.PathLike]) -> None: -# """ -# Split ROS bag files into smaller chunks. -# -# Parameters: -# input_path (Union[str, os.PathLike]): The path to the input .bag file or a folder containing .bag files. -# chunks (int): The number of chunks to split the bag file into. -# output_folder (Union[str, os.PathLike]): The folder where the chunked bag files will be saved. -# -# Returns: -# None -# """ -# # Create output folder if it doesn't exist -# if not os.path.exists(output_folder): -# os.makedirs(output_folder) -# -# # If the input path is a directory, loop through all files -# if os.path.isdir(input_path): -# for filename in os.listdir(input_path): -# if filename.endswith(".bag"): -# full_path = os.path.join(input_path, filename) -# split_single_rosbag(full_path, chunks, output_folder) -# -# # If the input path is a file, just process that one file -# elif os.path.isfile(input_path) and input_path.endswith(".bag"): -# split_single_rosbag(input_path, chunks, output_folder) -# -# else: -# print("Invalid input path. Provide either a .bag file or a directory containing .bag files.") - - -# def split_single_rosbag(file_in: Union[str, os.PathLike], chunks: int, output_folder: Union[str, os.PathLike]) -> None: -# """ -# Split a single ROS bag file into smaller chunks. -# -# Parameters: -# file_in (Union[str, os.PathLike]): The path to the input .bag file. -# chunks (int): The number of chunks to split the bag file into. -# output_folder (Union[str, os.PathLike]): The folder where the chunked bag files will be saved. -# -# Returns: -# None -# """ -# bagfile = rosbag.Bag(file_in) -# messages = bagfile.get_message_count() -# m_per_chunk = int(round(float(messages) / float(chunks))) -# chunk = 0 -# m = 0 -# filename = os.path.basename(file_in) -# base_name, ext = os.path.splitext(filename) -# -# outbag_path = os.path.join(output_folder, f"{base_name}_chunk_{chunk:04d}.bag") -# outbag = rosbag.Bag(outbag_path, "w") -# -# for topic, msg, t in bagfile.read_messages(): -# if m and m % m_per_chunk == 0: -# outbag.close() -# chunk += 1 -# outbag_path = os.path.join(output_folder, f"{base_name}_chunk_{chunk:04d}.bag") -# outbag = rosbag.Bag(outbag_path, "w") -# -# outbag.write(topic, msg, t) -# m += 1 -# -# outbag.close() diff --git a/python/robologs_ros_utils/sources/ros1/split_rosbag.py b/python/robologs_ros_utils/sources/ros1/split_rosbag.py deleted file mode 100644 index 0d33fd7..0000000 --- a/python/robologs_ros_utils/sources/ros1/split_rosbag.py +++ /dev/null @@ -1,18 +0,0 @@ -import click - -from robologs_ros_utils.sources.ros1 import ros_utils - - -@click.command() -@click.option("--input", "-i", type=str, required=True, help="A single rosbag, or directory containing rosbags") -@click.option("--output", "-o", type=str, required=True, help="Output directory for split rosbags") -@click.option("--chunks", "-c", type=int, required=True, help="Number of chunks to split each rosbag into") -def split_rosbag(input, output, chunks): - """ - Split rosbag files into smaller chunks and save them to the specified output directory. - """ - ros_utils.split_rosbag(input_path=input, chunks=chunks, output_folder=output) - - -if __name__ == "__main__": - split_rosbag() diff --git a/python/tests/sources/ros1/test_ros_utils.py b/python/tests/sources/ros1/test_ros_utils.py index ef4f518..1a62c09 100644 --- a/python/tests/sources/ros1/test_ros_utils.py +++ b/python/tests/sources/ros1/test_ros_utils.py @@ -75,55 +75,6 @@ def test_convert_offset_s_to_rosbag_ns(): 1649764528071146477 + int(10*1e9) -#def test_get_cropped_bag_file(request, tmp_path): -# input_path = get_image_bag_file_path(request.fspath.dirname, "test_images.bag") -# output_path = os.path.join(tmp_path, "output.bag") -# print(output_path) -# -# topic_list = ["/alphasense/cam0/image_raw"] -# -# assert os.path.exists(tmp_path) -# -# assert not os.path.exists(output_path) -# -# ros_utils.get_clipped_bag_file(input_bag_path=input_path, -# output_bag_path=output_path, -# topic_list=topic_list) -# -# assert os.path.exists(output_path) -# -# summary_dict = ros_utils.get_bag_info_from_file(output_path) -# topics = [x["Topics"] for x in summary_dict["topics"]] -# -# assert topics == ["/alphasense/cam0/image_raw"] -# -# output_path_t = os.path.join(tmp_path, "output_.bag") -# -# ros_utils.get_clipped_bag_file(input_bag_path=output_path, -# output_bag_path=output_path_t, -# start_time=1649764528171137838, -# end_time=1649764528246129638) -# -# summary_dict = ros_utils.get_bag_info_from_file(output_path_t) -# -# assert summary_dict["topics"][0]["Message Count"] == 4 -# -# ros_utils.get_clipped_bag_file(input_bag_path=output_path, -# output_bag_path=output_path_t, -# start_time=0, -# end_time=0.1, -# timestamp_type="offset_s") -# -# summary_dict = ros_utils.get_bag_info_from_file(output_path_t) -# -# assert summary_dict["topics"][0]["Message Count"] == 5 -# -# with pytest.raises(Exception): -# ros_utils.get_clipped_bag_file(input_bag_path=output_path, -# output_bag_path=output_path_t, -# timestamp_type="weird_type") - - def test_get_images_from_bag(request, tmp_path): input_path = get_image_bag_file_path(request.fspath.dirname, "test_images.bag") @@ -174,21 +125,3 @@ def test_get_csv_data_from_bags(request, tmp_path): output_dir=output_folder) assert os.path.exists(csv_topic_path_2) - - -#def test_split_rosbag(request, tmp_path): -# -# input_path = get_image_bag_file_path(request.fspath.dirname, "test_images.bag") -# output_folder = tmp_path -# -# ros_utils.split_single_rosbag(file_in=input_path, -# chunks=3, -# output_folder=output_folder) -# -# list_files = file_utils.get_all_files_of_type_in_directory(output_folder, "bag") -# -# assert len(list_files) == 3 - - - -