Skip to content

Commit

Permalink
remove rosbag dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
YvesSchoenberg committed Oct 9, 2023
1 parent a92a804 commit bed90e7
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 127 deletions.
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "robologs-ros-utils"
version = "0.1.1a49"
version = "0.1.1a50"
description = "robologs-ros-utils is an open source library of containerized data transformations for the robotics and drone communities"
authors = ["roboto.ai <[email protected]>"]
license = "Apache-2.0"
Expand Down
134 changes: 67 additions & 67 deletions python/robologs_ros_utils/sources/ros1/ros_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@

import cv2
import numpy as np
import rosbag
#import rosbag
from bagpy import bagreader
from PIL import Image
from rosbag import Bag
#from rosbag import Bag
from rosbags.rosbag1 import Reader
from rosbags.serde import deserialize_cdr, ros1_to_cdr
from tqdm import tqdm
Expand Down Expand Up @@ -610,68 +610,68 @@ def get_csv_data_from_bag(input_dir_or_file: str, output_dir: str, topic_list: l
return


def split_rosbag(input_path: Union[str, os.PathLike], chunks: int, output_folder: Union[str, os.PathLike]) -> None:
"""
Split ROS bag files into smaller chunks.
Parameters:
input_path (Union[str, os.PathLike]): The path to the input .bag file or a folder containing .bag files.
chunks (int): The number of chunks to split the bag file into.
output_folder (Union[str, os.PathLike]): The folder where the chunked bag files will be saved.
Returns:
None
"""
# Create output folder if it doesn't exist
if not os.path.exists(output_folder):
os.makedirs(output_folder)

# If the input path is a directory, loop through all files
if os.path.isdir(input_path):
for filename in os.listdir(input_path):
if filename.endswith(".bag"):
full_path = os.path.join(input_path, filename)
split_single_rosbag(full_path, chunks, output_folder)

# If the input path is a file, just process that one file
elif os.path.isfile(input_path) and input_path.endswith(".bag"):
split_single_rosbag(input_path, chunks, output_folder)

else:
print("Invalid input path. Provide either a .bag file or a directory containing .bag files.")


def split_single_rosbag(file_in: Union[str, os.PathLike], chunks: int, output_folder: Union[str, os.PathLike]) -> None:
"""
Split a single ROS bag file into smaller chunks.
Parameters:
file_in (Union[str, os.PathLike]): The path to the input .bag file.
chunks (int): The number of chunks to split the bag file into.
output_folder (Union[str, os.PathLike]): The folder where the chunked bag files will be saved.
Returns:
None
"""
bagfile = rosbag.Bag(file_in)
messages = bagfile.get_message_count()
m_per_chunk = int(round(float(messages) / float(chunks)))
chunk = 0
m = 0
filename = os.path.basename(file_in)
base_name, ext = os.path.splitext(filename)

outbag_path = os.path.join(output_folder, f"{base_name}_chunk_{chunk:04d}.bag")
outbag = rosbag.Bag(outbag_path, "w")

for topic, msg, t in bagfile.read_messages():
if m and m % m_per_chunk == 0:
outbag.close()
chunk += 1
outbag_path = os.path.join(output_folder, f"{base_name}_chunk_{chunk:04d}.bag")
outbag = rosbag.Bag(outbag_path, "w")

outbag.write(topic, msg, t)
m += 1

outbag.close()
# def split_rosbag(input_path: Union[str, os.PathLike], chunks: int, output_folder: Union[str, os.PathLike]) -> None:
# """
# Split ROS bag files into smaller chunks.
#
# Parameters:
# input_path (Union[str, os.PathLike]): The path to the input .bag file or a folder containing .bag files.
# chunks (int): The number of chunks to split the bag file into.
# output_folder (Union[str, os.PathLike]): The folder where the chunked bag files will be saved.
#
# Returns:
# None
# """
# # Create output folder if it doesn't exist
# if not os.path.exists(output_folder):
# os.makedirs(output_folder)
#
# # If the input path is a directory, loop through all files
# if os.path.isdir(input_path):
# for filename in os.listdir(input_path):
# if filename.endswith(".bag"):
# full_path = os.path.join(input_path, filename)
# split_single_rosbag(full_path, chunks, output_folder)
#
# # If the input path is a file, just process that one file
# elif os.path.isfile(input_path) and input_path.endswith(".bag"):
# split_single_rosbag(input_path, chunks, output_folder)
#
# else:
# print("Invalid input path. Provide either a .bag file or a directory containing .bag files.")


# def split_single_rosbag(file_in: Union[str, os.PathLike], chunks: int, output_folder: Union[str, os.PathLike]) -> None:
# """
# Split a single ROS bag file into smaller chunks.
#
# Parameters:
# file_in (Union[str, os.PathLike]): The path to the input .bag file.
# chunks (int): The number of chunks to split the bag file into.
# output_folder (Union[str, os.PathLike]): The folder where the chunked bag files will be saved.
#
# Returns:
# None
# """
# bagfile = rosbag.Bag(file_in)
# messages = bagfile.get_message_count()
# m_per_chunk = int(round(float(messages) / float(chunks)))
# chunk = 0
# m = 0
# filename = os.path.basename(file_in)
# base_name, ext = os.path.splitext(filename)
#
# outbag_path = os.path.join(output_folder, f"{base_name}_chunk_{chunk:04d}.bag")
# outbag = rosbag.Bag(outbag_path, "w")
#
# for topic, msg, t in bagfile.read_messages():
# if m and m % m_per_chunk == 0:
# outbag.close()
# chunk += 1
# outbag_path = os.path.join(output_folder, f"{base_name}_chunk_{chunk:04d}.bag")
# outbag = rosbag.Bag(outbag_path, "w")
#
# outbag.write(topic, msg, t)
# m += 1
#
# outbag.close()
118 changes: 59 additions & 59 deletions python/tests/sources/ros1/test_ros_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,53 +75,53 @@ def test_convert_offset_s_to_rosbag_ns():
1649764528071146477 + int(10*1e9)


def test_get_cropped_bag_file(request, tmp_path):
input_path = get_image_bag_file_path(request.fspath.dirname, "test_images.bag")
output_path = os.path.join(tmp_path, "output.bag")
print(output_path)

topic_list = ["/alphasense/cam0/image_raw"]

assert os.path.exists(tmp_path)

assert not os.path.exists(output_path)

ros_utils.get_clipped_bag_file(input_bag_path=input_path,
output_bag_path=output_path,
topic_list=topic_list)

assert os.path.exists(output_path)

summary_dict = ros_utils.get_bag_info_from_file(output_path)
topics = [x["Topics"] for x in summary_dict["topics"]]

assert topics == ["/alphasense/cam0/image_raw"]

output_path_t = os.path.join(tmp_path, "output_.bag")

ros_utils.get_clipped_bag_file(input_bag_path=output_path,
output_bag_path=output_path_t,
start_time=1649764528171137838,
end_time=1649764528246129638)

summary_dict = ros_utils.get_bag_info_from_file(output_path_t)

assert summary_dict["topics"][0]["Message Count"] == 4

ros_utils.get_clipped_bag_file(input_bag_path=output_path,
output_bag_path=output_path_t,
start_time=0,
end_time=0.1,
timestamp_type="offset_s")

summary_dict = ros_utils.get_bag_info_from_file(output_path_t)

assert summary_dict["topics"][0]["Message Count"] == 5

with pytest.raises(Exception):
ros_utils.get_clipped_bag_file(input_bag_path=output_path,
output_bag_path=output_path_t,
timestamp_type="weird_type")
#def test_get_cropped_bag_file(request, tmp_path):
# input_path = get_image_bag_file_path(request.fspath.dirname, "test_images.bag")
# output_path = os.path.join(tmp_path, "output.bag")
# print(output_path)
#
# topic_list = ["/alphasense/cam0/image_raw"]
#
# assert os.path.exists(tmp_path)
#
# assert not os.path.exists(output_path)
#
# ros_utils.get_clipped_bag_file(input_bag_path=input_path,
# output_bag_path=output_path,
# topic_list=topic_list)
#
# assert os.path.exists(output_path)
#
# summary_dict = ros_utils.get_bag_info_from_file(output_path)
# topics = [x["Topics"] for x in summary_dict["topics"]]
#
# assert topics == ["/alphasense/cam0/image_raw"]
#
# output_path_t = os.path.join(tmp_path, "output_.bag")
#
# ros_utils.get_clipped_bag_file(input_bag_path=output_path,
# output_bag_path=output_path_t,
# start_time=1649764528171137838,
# end_time=1649764528246129638)
#
# summary_dict = ros_utils.get_bag_info_from_file(output_path_t)
#
# assert summary_dict["topics"][0]["Message Count"] == 4
#
# ros_utils.get_clipped_bag_file(input_bag_path=output_path,
# output_bag_path=output_path_t,
# start_time=0,
# end_time=0.1,
# timestamp_type="offset_s")
#
# summary_dict = ros_utils.get_bag_info_from_file(output_path_t)
#
# assert summary_dict["topics"][0]["Message Count"] == 5
#
# with pytest.raises(Exception):
# ros_utils.get_clipped_bag_file(input_bag_path=output_path,
# output_bag_path=output_path_t,
# timestamp_type="weird_type")


def test_get_images_from_bag(request, tmp_path):
Expand Down Expand Up @@ -176,18 +176,18 @@ def test_get_csv_data_from_bags(request, tmp_path):
assert os.path.exists(csv_topic_path_2)


def test_split_rosbag(request, tmp_path):

input_path = get_image_bag_file_path(request.fspath.dirname, "test_images.bag")
output_folder = tmp_path

ros_utils.split_single_rosbag(file_in=input_path,
chunks=3,
output_folder=output_folder)

list_files = file_utils.get_all_files_of_type_in_directory(output_folder, "bag")

assert len(list_files) == 3
#def test_split_rosbag(request, tmp_path):
#
# input_path = get_image_bag_file_path(request.fspath.dirname, "test_images.bag")
# output_folder = tmp_path
#
# ros_utils.split_single_rosbag(file_in=input_path,
# chunks=3,
# output_folder=output_folder)
#
# list_files = file_utils.get_all_files_of_type_in_directory(output_folder, "bag")
#
# assert len(list_files) == 3



Expand Down

0 comments on commit bed90e7

Please sign in to comment.