diff --git a/ros2bag/ros2bag/verb/record.py b/ros2bag/ros2bag/verb/record.py index dadb6807ba..92eab48630 100644 --- a/ros2bag/ros2bag/verb/record.py +++ b/ros2bag/ros2bag/verb/record.py @@ -18,6 +18,7 @@ from rclpy.qos import InvalidQoSProfileException from ros2bag.api import add_writer_storage_plugin_extensions +from ros2bag.api import check_not_negative_float from ros2bag.api import convert_service_to_service_event_topic from ros2bag.api import convert_yaml_to_qos_profile from ros2bag.api import print_error @@ -176,6 +177,15 @@ def add_recorder_arguments(parser: ArgumentParser) -> None: help='Enable snapshot mode. Messages will not be written to the bagfile until ' 'the "/rosbag2_recorder/snapshot" service is called. e.g. \n ' 'ros2 service call /rosbag2_recorder/snapshot rosbag2_interfaces/Snapshot') + parser.add_argument( + '--snapshot-duration', type=check_not_negative_float, default=0.0, + help='Maximum snapshot duration in a fraction of seconds.\n' + 'Default: %(default)d, indicates that the snapshot will be limited by the' + ' --max-cache-size parameter only. If the value is more than 0, the cyclic buffer' + ' for the snapshot will be limited by both the series of messages duration and the' + ' maximum cache size parameter.\n' + 'To override the upper bound by total messages size, the ' + '--maximum-cache-size parameter can be settled to 0.') parser.add_argument( '--log-level', type=str, default='info', choices=['debug', 'info', 'warn', 'error', 'fatal'], @@ -281,6 +291,10 @@ def validate_parsed_arguments(args, uri) -> str: if args.compression_queue_size < 0: return print_error('Compression queue size must be at least 0.') + if args.snapshot_mode and args.snapshot_duration == 0.0 and args.max_cache_size == 0: + return print_error('In snapshot mode, either the snapshot_duration or max_bytes_size shall' + ' not be set to zero.') + class RecordVerb(VerbExtension): """Record ROS data to a bag.""" @@ -325,6 +339,7 @@ def main(self, *, args): # noqa: D102 storage_preset_profile=args.storage_preset_profile, storage_config_uri=storage_config_file, snapshot_mode=args.snapshot_mode, + snapshot_duration=args.snapshot_duration, custom_data=custom_data ) record_options = RecordOptions() diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp index 0777dcd788..6348c0a21a 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp @@ -52,7 +52,7 @@ class ROSBAG2_CPP_PUBLIC CircularMessageCache : public MessageCacheInterface { public: - explicit CircularMessageCache(size_t max_buffer_size); + explicit CircularMessageCache(size_t max_buffer_size, int64_t max_buffer_duration_ns = 0); ~CircularMessageCache() override; diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_circular_buffer.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_circular_buffer.hpp index bd6a022f2e..227d56ab87 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_circular_buffer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_circular_buffer.hpp @@ -1,4 +1,4 @@ -// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// Copyright 2021 Amazonhe t.com, Inc. or its affiliates. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -36,27 +36,35 @@ namespace rosbag2_cpp namespace cache { -/** -* This class implements a circular buffer message cache. Since the buffer -* size is limited by total byte size of the storage messages rather than -* a fix number of messages, a deque is used instead of a vector since -* older messages can always be dropped from the front and new messages added -* to the end. The buffer will never consume more than max_cache_size bytes, -* and will log a warning message if an individual message exceeds the buffer -* size. -*/ +/// This class implements a circular buffer message cache. Since the buffer +/// size is limited by the total byte size of the storage messages or a total messages duration +/// rather than a fix number of messages, a deque is used instead of a vector since +/// older messages can always be dropped from the front and new messages added +/// to the end. The buffer will never consume more than max_cache_size bytes, if max_cache_size > 0. +/// And will log a warning message if an individual message exceeds the buffer size. class ROSBAG2_CPP_PUBLIC MessageCacheCircularBuffer : public CacheBufferInterface { public: // Delete default constructor since max_cache_size is required MessageCacheCircularBuffer() = delete; - explicit MessageCacheCircularBuffer(size_t max_cache_size); - /** - * If buffer size has some space left, we push the message regardless of its size, - * but if this results in exceeding buffer size, we begin dropping old messages. - */ + /// \brief Parametrized constructor + /// \param max_cache_size Maximum amount of memory which could be occupied by the messages stored + /// in the circular buffer. Note. If max_cache_size is zero, the circular buffer will be only + /// bounded by the max_cache_duration. + /// \param max_cache_duration_ns Maximum duration in nanoseconds of message sequence allowed to be + /// stored in the circular buffer. Note. If max_cache_duration is zero, the circular buffer will + /// be only bounded by the max_cache_size. + /// \throws std::invalid_argument if both max_cache_size and max_cache_duration are zero. + explicit MessageCacheCircularBuffer(size_t max_cache_size, int64_t max_cache_duration_ns = 0); + + /// \brief Push new message to the circular buffer + /// \details If buffer size has some space left, we push the message regardless of its size, + /// but if this results in exceeding buffer size, we begin dropping old messages. + /// \param msg Shared pointer to the rosbag2_storage::SerializedBagMessage + /// \return False if the buffer_bytes_size > 0 and + /// msg->serialized_data->buffer_length > max_bytes_size, otherwise true. bool push(CacheBufferInterface::buffer_element_t msg) override; /// Clear buffer @@ -73,6 +81,7 @@ class ROSBAG2_CPP_PUBLIC MessageCacheCircularBuffer std::vector msg_vector_; size_t buffer_bytes_size_ {0u}; const size_t max_bytes_size_; + const int64_t max_cache_duration_; }; } // namespace cache diff --git a/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp b/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp index d26d04ea61..185d2131f7 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp @@ -27,10 +27,12 @@ namespace rosbag2_cpp namespace cache { -CircularMessageCache::CircularMessageCache(size_t max_buffer_size) +CircularMessageCache::CircularMessageCache(size_t max_buffer_size, int64_t max_buffer_duration_ns) { - producer_buffer_ = std::make_shared(max_buffer_size); - consumer_buffer_ = std::make_shared(max_buffer_size); + producer_buffer_ = + std::make_shared(max_buffer_size, max_buffer_duration_ns); + consumer_buffer_ = + std::make_shared(max_buffer_size, max_buffer_duration_ns); } CircularMessageCache::~CircularMessageCache() diff --git a/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp b/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp index 8a1b212db6..f836ef66ec 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp @@ -25,25 +25,44 @@ namespace rosbag2_cpp namespace cache { -MessageCacheCircularBuffer::MessageCacheCircularBuffer(size_t max_cache_size) -: max_bytes_size_(max_cache_size) +MessageCacheCircularBuffer::MessageCacheCircularBuffer( + size_t max_cache_size, + int64_t max_cache_duration_ns) +: max_bytes_size_(max_cache_size), max_cache_duration_(max_cache_duration_ns) { + if (max_bytes_size_ == 0 && max_cache_duration_ == 0) { + ROSBAG2_CPP_LOG_ERROR_STREAM("Invalid arguments for the MessageCacheCircularBuffer. " + "Both max_bytes_size and max_cache_duration are zero."); + throw std::invalid_argument("Invalid arguments for the MessageCacheCircularBuffer. " + "Both max_bytes_size and max_cache_duration are zero."); + } } bool MessageCacheCircularBuffer::push(CacheBufferInterface::buffer_element_t msg) { // Drop message if it exceeds the buffer size - if (msg->serialized_data->buffer_length > max_bytes_size_) { + if (buffer_bytes_size_ > 0 && msg->serialized_data->buffer_length > max_bytes_size_) { ROSBAG2_CPP_LOG_WARN_STREAM("Last message exceeds snapshot buffer size. Dropping message!"); return false; } - // Remove any old items until there is room for new message - while (buffer_bytes_size_ > (max_bytes_size_ - msg->serialized_data->buffer_length)) { + // Remove any old items until there is room for a new message + while (max_bytes_size_ > 0 && + buffer_bytes_size_ > (max_bytes_size_ - msg->serialized_data->buffer_length)) + { buffer_bytes_size_ -= buffer_.front()->serialized_data->buffer_length; buffer_.pop_front(); } - // Add new message to end of buffer + // Remove any old items until the difference between last and newest message timestamp + // will be less than or equal to the max_cache_duration_. + if (buffer_.size() > 1) { + auto current_buffer_duration = buffer_.front()->recv_timestamp - buffer_.back()->recv_timestamp; + while (max_cache_duration_ > 0 && current_buffer_duration > max_cache_duration_) { + buffer_.pop_front(); + current_buffer_duration = buffer_.front()->recv_timestamp - buffer_.back()->recv_timestamp; + } + } + // Add a new message to the end of the buffer buffer_bytes_size_ += msg->serialized_data->buffer_length; buffer_.push_back(msg); diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index bd56eb8ade..51ec0c2e5f 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -145,16 +145,18 @@ void SequentialWriter::open( throw std::runtime_error{error.str()}; } - use_cache_ = storage_options.max_cache_size > 0u; + use_cache_ = storage_options.max_cache_size > 0u || + (storage_options.snapshot_mode && storage_options.snapshot_duration.nanoseconds() > 0); if (storage_options.snapshot_mode && !use_cache_) { throw std::runtime_error( - "Max cache size must be greater than 0 when snapshot mode is enabled"); + "Either the max cache size or the maximum snapshot duration must be greater than 0" + " when snapshot mode is enabled"); } if (use_cache_) { if (storage_options.snapshot_mode) { message_cache_ = std::make_shared( - storage_options.max_cache_size); + storage_options.max_cache_size, storage_options.snapshot_duration.nanoseconds()); } else { message_cache_ = std::make_shared( storage_options.max_cache_size); diff --git a/rosbag2_py/rosbag2_py/_storage.pyi b/rosbag2_py/rosbag2_py/_storage.pyi index 2520c4e070..19a211d580 100644 --- a/rosbag2_py/rosbag2_py/_storage.pyi +++ b/rosbag2_py/rosbag2_py/_storage.pyi @@ -101,13 +101,14 @@ class StorageOptions: max_bagfile_duration: int max_bagfile_size: int max_cache_size: int + snapshot_duration: object snapshot_mode: bool start_time_ns: int storage_config_uri: str storage_id: str storage_preset_profile: str uri: str - def __init__(self, uri: str, storage_id: str = ..., max_bagfile_size: int = ..., max_bagfile_duration: int = ..., max_cache_size: int = ..., storage_preset_profile: str = ..., storage_config_uri: str = ..., snapshot_mode: bool = ..., start_time_ns: int = ..., end_time_ns: int = ..., custom_data: Dict[str, str] = ...) -> None: ... + def __init__(self, uri: str, storage_id: str = ..., max_bagfile_size: int = ..., max_bagfile_duration: int = ..., max_cache_size: int = ..., storage_preset_profile: str = ..., storage_config_uri: str = ..., snapshot_mode: bool = ..., snapshot_duration: object = ..., start_time_ns: int = ..., end_time_ns: int = ..., custom_data: Dict[str, str] = ...) -> None: ... class TopicInformation: message_count: int diff --git a/rosbag2_py/src/rosbag2_py/_storage.cpp b/rosbag2_py/src/rosbag2_py/_storage.cpp index e12898053d..4739db6405 100644 --- a/rosbag2_py/src/rosbag2_py/_storage.cpp +++ b/rosbag2_py/src/rosbag2_py/_storage.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include "rosbag2_cpp/converter_options.hpp" @@ -44,6 +45,16 @@ std::chrono::nanoseconds from_rclpy_duration(const pybind11::object & duration) return std::chrono::nanoseconds(nanos); } +pybind11::object rclcpp_duration_to_py_float(const rclcpp::Duration & duration) +{ + return pybind11::cast(duration.seconds()); +} + +rclcpp::Duration py_float_to_rclcpp_duration(const pybind11::object & obj) +{ + return rclcpp::Duration::from_seconds(obj.cast()); +} + template pybind11::object to_rclpy_time(T time) { @@ -81,9 +92,36 @@ PYBIND11_MODULE(_storage, m) { using KEY_VALUE_MAP = std::unordered_map; pybind11::class_(m, "StorageOptions") .def( - pybind11::init< - std::string, std::string, uint64_t, uint64_t, uint64_t, std::string, std::string, bool, - int64_t, int64_t, KEY_VALUE_MAP>(), + pybind11::init( + []( + std::string uri, + std::string storage_id, + uint64_t max_bagfile_size, + uint64_t max_bagfile_duration, + uint64_t max_cache_size, + std::string storage_preset_profile, + std::string storage_config_uri, + bool snapshot_mode, + const pybind11::object & snapshot_duration, + int64_t start_time_ns, + int64_t end_time_ns, + KEY_VALUE_MAP custom_data) + { + return rosbag2_storage::StorageOptions{ + std::move(uri), + std::move(storage_id), + max_bagfile_size, + max_bagfile_duration, + max_cache_size, + std::move(storage_preset_profile), + std::move(storage_config_uri), + snapshot_mode, + py_float_to_rclcpp_duration(snapshot_duration), + start_time_ns, + end_time_ns, + std::move(custom_data), + }; + }), pybind11::arg("uri"), pybind11::arg("storage_id") = "", pybind11::arg("max_bagfile_size") = 0, @@ -92,6 +130,7 @@ PYBIND11_MODULE(_storage, m) { pybind11::arg("storage_preset_profile") = "", pybind11::arg("storage_config_uri") = "", pybind11::arg("snapshot_mode") = false, + pybind11::arg("snapshot_duration") = rclcpp_duration_to_py_float(rclcpp::Duration(0, 0)), pybind11::arg("start_time_ns") = -1, pybind11::arg("end_time_ns") = -1, pybind11::arg("custom_data") = KEY_VALUE_MAP{}) @@ -115,6 +154,14 @@ PYBIND11_MODULE(_storage, m) { .def_readwrite( "snapshot_mode", &rosbag2_storage::StorageOptions::snapshot_mode) + .def_property( + "snapshot_duration", + [](const rosbag2_storage::StorageOptions & self) { + return rclcpp_duration_to_py_float(self.snapshot_duration); + }, + [](rosbag2_storage::StorageOptions & self, const pybind11::object & obj) { + self.snapshot_duration = py_float_to_rclcpp_duration(obj); + }) .def_readwrite( "start_time_ns", &rosbag2_storage::StorageOptions::start_time_ns) diff --git a/rosbag2_storage/include/rosbag2_storage/storage_options.hpp b/rosbag2_storage/include/rosbag2_storage/storage_options.hpp index 8d6d188459..68f652f740 100644 --- a/rosbag2_storage/include/rosbag2_storage/storage_options.hpp +++ b/rosbag2_storage/include/rosbag2_storage/storage_options.hpp @@ -19,6 +19,7 @@ #include #include +#include "rclcpp/duration.hpp" #include "rosbag2_storage/visibility_control.hpp" #include "rosbag2_storage/yaml.hpp" @@ -56,6 +57,10 @@ struct StorageOptions // Defaults to disabled. bool snapshot_mode = false; + // The maximum snapshot duration. + // A value of 0.0 indicates that snapshot will be limited by the max_cache_size only. + rclcpp::Duration snapshot_duration{0, 0}; + // Start and end time for cutting int64_t start_time_ns = -1; int64_t end_time_ns = -1; diff --git a/rosbag2_storage/src/rosbag2_storage/storage_options.cpp b/rosbag2_storage/src/rosbag2_storage/storage_options.cpp index 717fcb9082..fbd37b91b6 100644 --- a/rosbag2_storage/src/rosbag2_storage/storage_options.cpp +++ b/rosbag2_storage/src/rosbag2_storage/storage_options.cpp @@ -15,6 +15,7 @@ #include #include +#include "rclcpp/duration.hpp" #include "rosbag2_storage/storage_options.hpp" namespace YAML @@ -32,6 +33,7 @@ Node convert::encode( node["storage_preset_profile"] = storage_options.storage_preset_profile; node["storage_config_uri"] = storage_options.storage_config_uri; node["snapshot_mode"] = storage_options.snapshot_mode; + node["snapshot_duration"] = storage_options.snapshot_duration; node["start_time_ns"] = storage_options.start_time_ns; node["end_time_ns"] = storage_options.end_time_ns; node["custom_data"] = storage_options.custom_data; @@ -50,6 +52,7 @@ bool convert::decode( node, "storage_preset_profile", storage_options.storage_preset_profile); optional_assign(node, "storage_config_uri", storage_options.storage_config_uri); optional_assign(node, "snapshot_mode", storage_options.snapshot_mode); + optional_assign(node, "snapshot_duration", storage_options.snapshot_duration); optional_assign(node, "start_time_ns", storage_options.start_time_ns); optional_assign(node, "end_time_ns", storage_options.end_time_ns); using KEY_VALUE_MAP = std::unordered_map; diff --git a/rosbag2_storage/test/rosbag2_storage/test_storage_options.cpp b/rosbag2_storage/test/rosbag2_storage/test_storage_options.cpp index aa21761178..116296de28 100644 --- a/rosbag2_storage/test/rosbag2_storage/test_storage_options.cpp +++ b/rosbag2_storage/test/rosbag2_storage/test_storage_options.cpp @@ -29,6 +29,7 @@ TEST(storage_options, test_yaml_serialization) original.storage_preset_profile = "profile"; original.storage_config_uri = "config_uri"; original.snapshot_mode = true; + original.snapshot_duration = rclcpp::Duration::from_seconds(1.5); original.start_time_ns = 12345000; original.end_time_ns = 23456000; original.custom_data["key1"] = "value1"; @@ -50,6 +51,7 @@ TEST(storage_options, test_yaml_serialization) ASSERT_EQ(original.storage_preset_profile, reconstructed.storage_preset_profile); ASSERT_EQ(original.storage_config_uri, reconstructed.storage_config_uri); ASSERT_EQ(original.snapshot_mode, reconstructed.snapshot_mode); + ASSERT_EQ(original.snapshot_duration, reconstructed.snapshot_duration); ASSERT_EQ(original.start_time_ns, reconstructed.start_time_ns); ASSERT_EQ(original.end_time_ns, reconstructed.end_time_ns); ASSERT_EQ(original.custom_data, reconstructed.custom_data); diff --git a/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp b/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp index ef6408db7c..73df125b6b 100644 --- a/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp +++ b/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp @@ -375,6 +375,9 @@ get_storage_options_from_node_params(rclcpp::Node & node) storage_options.snapshot_mode = node.declare_parameter("storage.snapshot_mode", false); + storage_options.snapshot_duration = + param_utils::get_duration_from_node_param(node, "storage.snapshot_duration", 0, 0); + auto list_of_key_value_strings = node.declare_parameter>( "storage.custom_data", std::vector()); diff --git a/rosbag2_transport/test/resources/player_node_params.yaml b/rosbag2_transport/test/resources/player_node_params.yaml index 79e1b47a93..48cb71d8e4 100644 --- a/rosbag2_transport/test/resources/player_node_params.yaml +++ b/rosbag2_transport/test/resources/player_node_params.yaml @@ -49,4 +49,7 @@ player_params_node: max_cache_size: 9898 storage_preset_profile: "resilient" snapshot_mode: false + snapshot_duration: + sec: 1 + nsec: 500000000 custom_data: ["key1=value1", "key2=value2"] \ No newline at end of file diff --git a/rosbag2_transport/test/resources/recorder_node_params.yaml b/rosbag2_transport/test/resources/recorder_node_params.yaml index 701f623bf4..c295e803d6 100644 --- a/rosbag2_transport/test/resources/recorder_node_params.yaml +++ b/rosbag2_transport/test/resources/recorder_node_params.yaml @@ -41,6 +41,9 @@ recorder_params_node: max_cache_size: 989888 storage_preset_profile: "none" snapshot_mode: false + snapshot_duration: + sec: 1 + nsec: 500000000 custom_data: ["key1=value1", "key2=value2"] start_time_ns: 0 end_time_ns: 100000 diff --git a/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp b/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp index 3875a0b1b0..99b31de2b3 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp @@ -186,6 +186,7 @@ TEST_P(ComposablePlayerTests, player_can_parse_parameters_from_file) { EXPECT_EQ(storage_options.max_cache_size, 9898); EXPECT_EQ(storage_options.storage_preset_profile, "resilient"); EXPECT_EQ(storage_options.snapshot_mode, false); + EXPECT_DOUBLE_EQ(storage_options.snapshot_duration.seconds(), 1.5); std::unordered_map custom_data{ std::pair{"key1", "value1"}, std::pair{"key2", "value2"} diff --git a/rosbag2_transport/test/rosbag2_transport/test_composable_recorder.cpp b/rosbag2_transport/test/rosbag2_transport/test_composable_recorder.cpp index 97a89bc5a9..f6d18bbe30 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_composable_recorder.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_composable_recorder.cpp @@ -256,6 +256,7 @@ TEST_P(ComposableRecorderTests, recorder_can_parse_parameters_from_file) { EXPECT_EQ(storage_options.max_cache_size, 989888); EXPECT_EQ(storage_options.storage_preset_profile, "none"); EXPECT_EQ(storage_options.snapshot_mode, false); + EXPECT_DOUBLE_EQ(storage_options.snapshot_duration.seconds(), 1.5); std::unordered_map custom_data{ std::pair{"key1", "value1"}, std::pair{"key2", "value2"}