From f806243f8430ed7942a27ffb9d4085a4c170ee3c Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sun, 27 Oct 2024 19:02:02 -0700 Subject: [PATCH 1/6] Initial implementation for the time bounded snapshot - Added the "--snapshot-duration" CLI option. Default value 0 indicates that the snapshot will be limited by the --max-cache-size parameter only. If the value is more than 0, the cyclic buffer for the snapshot will be limited by both the series of messages duration and the maximum cache size parameter. - To override the upper bound by total messages size, the "--maximum-cache-size" CLI option can be settled to 0. Signed-off-by: Michael Orlov --- ros2bag/ros2bag/verb/record.py | 14 +++++++ .../cache/circular_message_cache.hpp | 2 +- .../cache/message_cache_circular_buffer.hpp | 39 ++++++++++++------- .../cache/circular_message_cache.cpp | 8 ++-- .../cache/message_cache_circular_buffer.cpp | 29 +++++++++++--- .../rosbag2_cpp/writers/sequential_writer.cpp | 9 +++-- rosbag2_py/src/rosbag2_py/_storage.cpp | 6 ++- .../rosbag2_storage/storage_options.hpp | 4 ++ .../src/rosbag2_storage/storage_options.cpp | 2 + .../rosbag2_storage/test_storage_options.cpp | 2 + 10 files changed, 86 insertions(+), 29 deletions(-) diff --git a/ros2bag/ros2bag/verb/record.py b/ros2bag/ros2bag/verb/record.py index dadb6807ba..f4972fece4 100644 --- a/ros2bag/ros2bag/verb/record.py +++ b/ros2bag/ros2bag/verb/record.py @@ -176,6 +176,15 @@ def add_recorder_arguments(parser: ArgumentParser) -> None: help='Enable snapshot mode. Messages will not be written to the bagfile until ' 'the "/rosbag2_recorder/snapshot" service is called. e.g. \n ' 'ros2 service call /rosbag2_recorder/snapshot rosbag2_interfaces/Snapshot') + parser.add_argument( + '--snapshot-duration', type=int, default=0, + help='Maximum snapshot duration in milliseconds.\n' + 'Default: %(default)d, indicates that the snapshot will be limited by the' + ' --max-cache-size parameter only. If the value is more than 0, the cyclic buffer' + ' for the snapshot will be limited by both the series of messages duration and the' + ' maximum cache size parameter.\n' + 'To override the upper bound by total messages size, the ' + '--maximum-cache-size parameter can be settled to 0.') parser.add_argument( '--log-level', type=str, default='info', choices=['debug', 'info', 'warn', 'error', 'fatal'], @@ -281,6 +290,10 @@ def validate_parsed_arguments(args, uri) -> str: if args.compression_queue_size < 0: return print_error('Compression queue size must be at least 0.') + if args.snapshot_mode and args.snapshot_duration == 0 and args.max_cache_size == 0: + return print_error('In snapshot mode, either the snapshot_duration or max_bytes_size shall' + ' not be set to zero.') + class RecordVerb(VerbExtension): """Record ROS data to a bag.""" @@ -325,6 +338,7 @@ def main(self, *, args): # noqa: D102 storage_preset_profile=args.storage_preset_profile, storage_config_uri=storage_config_file, snapshot_mode=args.snapshot_mode, + snapshot_duration_ms=args.snapshot_duration, custom_data=custom_data ) record_options = RecordOptions() diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp index 0777dcd788..6348c0a21a 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp @@ -52,7 +52,7 @@ class ROSBAG2_CPP_PUBLIC CircularMessageCache : public MessageCacheInterface { public: - explicit CircularMessageCache(size_t max_buffer_size); + explicit CircularMessageCache(size_t max_buffer_size, int64_t max_buffer_duration_ns = 0); ~CircularMessageCache() override; diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_circular_buffer.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_circular_buffer.hpp index bd6a022f2e..227d56ab87 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_circular_buffer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_circular_buffer.hpp @@ -1,4 +1,4 @@ -// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// Copyright 2021 Amazonhe t.com, Inc. or its affiliates. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -36,27 +36,35 @@ namespace rosbag2_cpp namespace cache { -/** -* This class implements a circular buffer message cache. Since the buffer -* size is limited by total byte size of the storage messages rather than -* a fix number of messages, a deque is used instead of a vector since -* older messages can always be dropped from the front and new messages added -* to the end. The buffer will never consume more than max_cache_size bytes, -* and will log a warning message if an individual message exceeds the buffer -* size. -*/ +/// This class implements a circular buffer message cache. Since the buffer +/// size is limited by the total byte size of the storage messages or a total messages duration +/// rather than a fix number of messages, a deque is used instead of a vector since +/// older messages can always be dropped from the front and new messages added +/// to the end. The buffer will never consume more than max_cache_size bytes, if max_cache_size > 0. +/// And will log a warning message if an individual message exceeds the buffer size. class ROSBAG2_CPP_PUBLIC MessageCacheCircularBuffer : public CacheBufferInterface { public: // Delete default constructor since max_cache_size is required MessageCacheCircularBuffer() = delete; - explicit MessageCacheCircularBuffer(size_t max_cache_size); - /** - * If buffer size has some space left, we push the message regardless of its size, - * but if this results in exceeding buffer size, we begin dropping old messages. - */ + /// \brief Parametrized constructor + /// \param max_cache_size Maximum amount of memory which could be occupied by the messages stored + /// in the circular buffer. Note. If max_cache_size is zero, the circular buffer will be only + /// bounded by the max_cache_duration. + /// \param max_cache_duration_ns Maximum duration in nanoseconds of message sequence allowed to be + /// stored in the circular buffer. Note. If max_cache_duration is zero, the circular buffer will + /// be only bounded by the max_cache_size. + /// \throws std::invalid_argument if both max_cache_size and max_cache_duration are zero. + explicit MessageCacheCircularBuffer(size_t max_cache_size, int64_t max_cache_duration_ns = 0); + + /// \brief Push new message to the circular buffer + /// \details If buffer size has some space left, we push the message regardless of its size, + /// but if this results in exceeding buffer size, we begin dropping old messages. + /// \param msg Shared pointer to the rosbag2_storage::SerializedBagMessage + /// \return False if the buffer_bytes_size > 0 and + /// msg->serialized_data->buffer_length > max_bytes_size, otherwise true. bool push(CacheBufferInterface::buffer_element_t msg) override; /// Clear buffer @@ -73,6 +81,7 @@ class ROSBAG2_CPP_PUBLIC MessageCacheCircularBuffer std::vector msg_vector_; size_t buffer_bytes_size_ {0u}; const size_t max_bytes_size_; + const int64_t max_cache_duration_; }; } // namespace cache diff --git a/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp b/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp index d26d04ea61..185d2131f7 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp @@ -27,10 +27,12 @@ namespace rosbag2_cpp namespace cache { -CircularMessageCache::CircularMessageCache(size_t max_buffer_size) +CircularMessageCache::CircularMessageCache(size_t max_buffer_size, int64_t max_buffer_duration_ns) { - producer_buffer_ = std::make_shared(max_buffer_size); - consumer_buffer_ = std::make_shared(max_buffer_size); + producer_buffer_ = + std::make_shared(max_buffer_size, max_buffer_duration_ns); + consumer_buffer_ = + std::make_shared(max_buffer_size, max_buffer_duration_ns); } CircularMessageCache::~CircularMessageCache() diff --git a/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp b/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp index 8a1b212db6..bb131f5354 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp @@ -25,25 +25,42 @@ namespace rosbag2_cpp namespace cache { -MessageCacheCircularBuffer::MessageCacheCircularBuffer(size_t max_cache_size) -: max_bytes_size_(max_cache_size) +MessageCacheCircularBuffer::MessageCacheCircularBuffer( + size_t max_cache_size, + int64_t max_cache_duration_ns) +: max_bytes_size_(max_cache_size), max_cache_duration_(max_cache_duration_ns) { + if (max_bytes_size_ == 0 && max_cache_duration_ == 0) { + ROSBAG2_CPP_LOG_ERROR_STREAM("Invalid arguments for the MessageCacheCircularBuffer. " + "Both max_bytes_size and max_cache_duration are zero."); + throw std::invalid_argument("Invalid arguments for the MessageCacheCircularBuffer. " + "Both max_bytes_size and max_cache_duration are zero."); + } } bool MessageCacheCircularBuffer::push(CacheBufferInterface::buffer_element_t msg) { // Drop message if it exceeds the buffer size - if (msg->serialized_data->buffer_length > max_bytes_size_) { + if (buffer_bytes_size_ > 0 && msg->serialized_data->buffer_length > max_bytes_size_) { ROSBAG2_CPP_LOG_WARN_STREAM("Last message exceeds snapshot buffer size. Dropping message!"); return false; } - // Remove any old items until there is room for new message - while (buffer_bytes_size_ > (max_bytes_size_ - msg->serialized_data->buffer_length)) { + // Remove any old items until there is room for a new message + while (buffer_bytes_size_ > 0 && + buffer_bytes_size_ > (max_bytes_size_ - msg->serialized_data->buffer_length)) + { buffer_bytes_size_ -= buffer_.front()->serialized_data->buffer_length; buffer_.pop_front(); } - // Add new message to end of buffer + // Remove any old items until the difference between last and newest message timestamp + // will be less than or equal to the max_cache_duration_. + auto current_buffer_duration = buffer_.front()->recv_timestamp - buffer_.back()->recv_timestamp; + while (max_cache_duration_ > 0 && current_buffer_duration > max_cache_duration_) { + buffer_.pop_front(); + current_buffer_duration = buffer_.front()->recv_timestamp - buffer_.back()->recv_timestamp; + } + // Add a new message to the end of the buffer buffer_bytes_size_ += msg->serialized_data->buffer_length; buffer_.push_back(msg); diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index bd56eb8ade..b3f30e6187 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -145,16 +145,19 @@ void SequentialWriter::open( throw std::runtime_error{error.str()}; } - use_cache_ = storage_options.max_cache_size > 0u; + use_cache_ = storage_options.max_cache_size > 0u || + (storage_options.snapshot_mode && storage_options.snapshot_duration_ms > 0); if (storage_options.snapshot_mode && !use_cache_) { throw std::runtime_error( - "Max cache size must be greater than 0 when snapshot mode is enabled"); + "Either the max cache size or the maximum snapshot duration must be greater than 0" + " when snapshot mode is enabled"); } if (use_cache_) { if (storage_options.snapshot_mode) { + int64_t max_buffer_duration_ns = storage_options.snapshot_duration_ms * 1000000; message_cache_ = std::make_shared( - storage_options.max_cache_size); + storage_options.max_cache_size, max_buffer_duration_ns); } else { message_cache_ = std::make_shared( storage_options.max_cache_size); diff --git a/rosbag2_py/src/rosbag2_py/_storage.cpp b/rosbag2_py/src/rosbag2_py/_storage.cpp index e12898053d..d690171276 100644 --- a/rosbag2_py/src/rosbag2_py/_storage.cpp +++ b/rosbag2_py/src/rosbag2_py/_storage.cpp @@ -83,7 +83,7 @@ PYBIND11_MODULE(_storage, m) { .def( pybind11::init< std::string, std::string, uint64_t, uint64_t, uint64_t, std::string, std::string, bool, - int64_t, int64_t, KEY_VALUE_MAP>(), + int64_t, int64_t, int64_t, KEY_VALUE_MAP>(), pybind11::arg("uri"), pybind11::arg("storage_id") = "", pybind11::arg("max_bagfile_size") = 0, @@ -92,6 +92,7 @@ PYBIND11_MODULE(_storage, m) { pybind11::arg("storage_preset_profile") = "", pybind11::arg("storage_config_uri") = "", pybind11::arg("snapshot_mode") = false, + pybind11::arg("snapshot_duration_ms") = 0, pybind11::arg("start_time_ns") = -1, pybind11::arg("end_time_ns") = -1, pybind11::arg("custom_data") = KEY_VALUE_MAP{}) @@ -115,6 +116,9 @@ PYBIND11_MODULE(_storage, m) { .def_readwrite( "snapshot_mode", &rosbag2_storage::StorageOptions::snapshot_mode) + .def_readwrite( + "snapshot_duration_ms", + &rosbag2_storage::StorageOptions::snapshot_duration_ms) .def_readwrite( "start_time_ns", &rosbag2_storage::StorageOptions::start_time_ns) diff --git a/rosbag2_storage/include/rosbag2_storage/storage_options.hpp b/rosbag2_storage/include/rosbag2_storage/storage_options.hpp index 8d6d188459..b62e2fc19b 100644 --- a/rosbag2_storage/include/rosbag2_storage/storage_options.hpp +++ b/rosbag2_storage/include/rosbag2_storage/storage_options.hpp @@ -56,6 +56,10 @@ struct StorageOptions // Defaults to disabled. bool snapshot_mode = false; + // The maximum snapshot duration in milliseconds. + // A value of 0 indicates that snapshot will be limited by the max_cache_size only. + int64_t snapshot_duration_ms = 0; + // Start and end time for cutting int64_t start_time_ns = -1; int64_t end_time_ns = -1; diff --git a/rosbag2_storage/src/rosbag2_storage/storage_options.cpp b/rosbag2_storage/src/rosbag2_storage/storage_options.cpp index 717fcb9082..d6ec6137bc 100644 --- a/rosbag2_storage/src/rosbag2_storage/storage_options.cpp +++ b/rosbag2_storage/src/rosbag2_storage/storage_options.cpp @@ -32,6 +32,7 @@ Node convert::encode( node["storage_preset_profile"] = storage_options.storage_preset_profile; node["storage_config_uri"] = storage_options.storage_config_uri; node["snapshot_mode"] = storage_options.snapshot_mode; + node["snapshot_duration_ms"] = storage_options.snapshot_duration_ms; node["start_time_ns"] = storage_options.start_time_ns; node["end_time_ns"] = storage_options.end_time_ns; node["custom_data"] = storage_options.custom_data; @@ -50,6 +51,7 @@ bool convert::decode( node, "storage_preset_profile", storage_options.storage_preset_profile); optional_assign(node, "storage_config_uri", storage_options.storage_config_uri); optional_assign(node, "snapshot_mode", storage_options.snapshot_mode); + optional_assign(node, "snapshot_duration_ms", storage_options.snapshot_duration_ms); optional_assign(node, "start_time_ns", storage_options.start_time_ns); optional_assign(node, "end_time_ns", storage_options.end_time_ns); using KEY_VALUE_MAP = std::unordered_map; diff --git a/rosbag2_storage/test/rosbag2_storage/test_storage_options.cpp b/rosbag2_storage/test/rosbag2_storage/test_storage_options.cpp index aa21761178..2e30434ae2 100644 --- a/rosbag2_storage/test/rosbag2_storage/test_storage_options.cpp +++ b/rosbag2_storage/test/rosbag2_storage/test_storage_options.cpp @@ -29,6 +29,7 @@ TEST(storage_options, test_yaml_serialization) original.storage_preset_profile = "profile"; original.storage_config_uri = "config_uri"; original.snapshot_mode = true; + original.snapshot_duration_ms = 1500; original.start_time_ns = 12345000; original.end_time_ns = 23456000; original.custom_data["key1"] = "value1"; @@ -50,6 +51,7 @@ TEST(storage_options, test_yaml_serialization) ASSERT_EQ(original.storage_preset_profile, reconstructed.storage_preset_profile); ASSERT_EQ(original.storage_config_uri, reconstructed.storage_config_uri); ASSERT_EQ(original.snapshot_mode, reconstructed.snapshot_mode); + ASSERT_EQ(original.snapshot_duration_ms, reconstructed.snapshot_duration_ms); ASSERT_EQ(original.start_time_ns, reconstructed.start_time_ns); ASSERT_EQ(original.end_time_ns, reconstructed.end_time_ns); ASSERT_EQ(original.custom_data, reconstructed.custom_data); From 6e439e605dda782a67d50f7c3510376dbab84b5b Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sun, 27 Oct 2024 19:32:56 -0700 Subject: [PATCH 2/6] Regenerate Python stub files (.pyi) Signed-off-by: Michael Orlov --- rosbag2_py/rosbag2_py/_storage.pyi | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rosbag2_py/rosbag2_py/_storage.pyi b/rosbag2_py/rosbag2_py/_storage.pyi index 2520c4e070..f8a4a96e7b 100644 --- a/rosbag2_py/rosbag2_py/_storage.pyi +++ b/rosbag2_py/rosbag2_py/_storage.pyi @@ -101,13 +101,14 @@ class StorageOptions: max_bagfile_duration: int max_bagfile_size: int max_cache_size: int + snapshot_duration_ms: int snapshot_mode: bool start_time_ns: int storage_config_uri: str storage_id: str storage_preset_profile: str uri: str - def __init__(self, uri: str, storage_id: str = ..., max_bagfile_size: int = ..., max_bagfile_duration: int = ..., max_cache_size: int = ..., storage_preset_profile: str = ..., storage_config_uri: str = ..., snapshot_mode: bool = ..., start_time_ns: int = ..., end_time_ns: int = ..., custom_data: Dict[str, str] = ...) -> None: ... + def __init__(self, uri: str, storage_id: str = ..., max_bagfile_size: int = ..., max_bagfile_duration: int = ..., max_cache_size: int = ..., storage_preset_profile: str = ..., storage_config_uri: str = ..., snapshot_mode: bool = ..., snapshot_duration_ms: int = ..., start_time_ns: int = ..., end_time_ns: int = ..., custom_data: Dict[str, str] = ...) -> None: ... class TopicInformation: message_count: int From 3d637f0e8e8dd9893402b556a44c8752b75f9bbe Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Mon, 28 Oct 2024 07:59:02 -0700 Subject: [PATCH 3/6] Bugfix for failing tests Signed-off-by: Michael Orlov --- .../cache/message_cache_circular_buffer.cpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp b/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp index bb131f5354..f836ef66ec 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp @@ -47,7 +47,7 @@ bool MessageCacheCircularBuffer::push(CacheBufferInterface::buffer_element_t msg } // Remove any old items until there is room for a new message - while (buffer_bytes_size_ > 0 && + while (max_bytes_size_ > 0 && buffer_bytes_size_ > (max_bytes_size_ - msg->serialized_data->buffer_length)) { buffer_bytes_size_ -= buffer_.front()->serialized_data->buffer_length; @@ -55,10 +55,12 @@ bool MessageCacheCircularBuffer::push(CacheBufferInterface::buffer_element_t msg } // Remove any old items until the difference between last and newest message timestamp // will be less than or equal to the max_cache_duration_. - auto current_buffer_duration = buffer_.front()->recv_timestamp - buffer_.back()->recv_timestamp; - while (max_cache_duration_ > 0 && current_buffer_duration > max_cache_duration_) { - buffer_.pop_front(); - current_buffer_duration = buffer_.front()->recv_timestamp - buffer_.back()->recv_timestamp; + if (buffer_.size() > 1) { + auto current_buffer_duration = buffer_.front()->recv_timestamp - buffer_.back()->recv_timestamp; + while (max_cache_duration_ > 0 && current_buffer_duration > max_cache_duration_) { + buffer_.pop_front(); + current_buffer_duration = buffer_.front()->recv_timestamp - buffer_.back()->recv_timestamp; + } } // Add a new message to the end of the buffer buffer_bytes_size_ += msg->serialized_data->buffer_length; From a126442da201c014eb7a16dbf44cb0b2131d8bf6 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Mon, 28 Oct 2024 08:24:04 -0700 Subject: [PATCH 4/6] Rename snapshot_duration_ms to the snapshot_duration Signed-off-by: Michael Orlov --- ros2bag/ros2bag/verb/record.py | 2 +- rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp | 4 ++-- rosbag2_py/rosbag2_py/_storage.pyi | 4 ++-- rosbag2_py/src/rosbag2_py/_storage.cpp | 6 +++--- rosbag2_storage/include/rosbag2_storage/storage_options.hpp | 2 +- rosbag2_storage/src/rosbag2_storage/storage_options.cpp | 4 ++-- .../test/rosbag2_storage/test_storage_options.cpp | 4 ++-- 7 files changed, 13 insertions(+), 13 deletions(-) diff --git a/ros2bag/ros2bag/verb/record.py b/ros2bag/ros2bag/verb/record.py index f4972fece4..3260ee4748 100644 --- a/ros2bag/ros2bag/verb/record.py +++ b/ros2bag/ros2bag/verb/record.py @@ -338,7 +338,7 @@ def main(self, *, args): # noqa: D102 storage_preset_profile=args.storage_preset_profile, storage_config_uri=storage_config_file, snapshot_mode=args.snapshot_mode, - snapshot_duration_ms=args.snapshot_duration, + snapshot_duration=args.snapshot_duration, custom_data=custom_data ) record_options = RecordOptions() diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index b3f30e6187..87397262be 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -146,7 +146,7 @@ void SequentialWriter::open( } use_cache_ = storage_options.max_cache_size > 0u || - (storage_options.snapshot_mode && storage_options.snapshot_duration_ms > 0); + (storage_options.snapshot_mode && storage_options.snapshot_duration > 0); if (storage_options.snapshot_mode && !use_cache_) { throw std::runtime_error( "Either the max cache size or the maximum snapshot duration must be greater than 0" @@ -155,7 +155,7 @@ void SequentialWriter::open( if (use_cache_) { if (storage_options.snapshot_mode) { - int64_t max_buffer_duration_ns = storage_options.snapshot_duration_ms * 1000000; + int64_t max_buffer_duration_ns = storage_options.snapshot_duration * 1000000; message_cache_ = std::make_shared( storage_options.max_cache_size, max_buffer_duration_ns); } else { diff --git a/rosbag2_py/rosbag2_py/_storage.pyi b/rosbag2_py/rosbag2_py/_storage.pyi index f8a4a96e7b..1ff573c61a 100644 --- a/rosbag2_py/rosbag2_py/_storage.pyi +++ b/rosbag2_py/rosbag2_py/_storage.pyi @@ -101,14 +101,14 @@ class StorageOptions: max_bagfile_duration: int max_bagfile_size: int max_cache_size: int - snapshot_duration_ms: int + snapshot_duration: int snapshot_mode: bool start_time_ns: int storage_config_uri: str storage_id: str storage_preset_profile: str uri: str - def __init__(self, uri: str, storage_id: str = ..., max_bagfile_size: int = ..., max_bagfile_duration: int = ..., max_cache_size: int = ..., storage_preset_profile: str = ..., storage_config_uri: str = ..., snapshot_mode: bool = ..., snapshot_duration_ms: int = ..., start_time_ns: int = ..., end_time_ns: int = ..., custom_data: Dict[str, str] = ...) -> None: ... + def __init__(self, uri: str, storage_id: str = ..., max_bagfile_size: int = ..., max_bagfile_duration: int = ..., max_cache_size: int = ..., storage_preset_profile: str = ..., storage_config_uri: str = ..., snapshot_mode: bool = ..., snapshot_duration: int = ..., start_time_ns: int = ..., end_time_ns: int = ..., custom_data: Dict[str, str] = ...) -> None: ... class TopicInformation: message_count: int diff --git a/rosbag2_py/src/rosbag2_py/_storage.cpp b/rosbag2_py/src/rosbag2_py/_storage.cpp index d690171276..9abf1b382a 100644 --- a/rosbag2_py/src/rosbag2_py/_storage.cpp +++ b/rosbag2_py/src/rosbag2_py/_storage.cpp @@ -92,7 +92,7 @@ PYBIND11_MODULE(_storage, m) { pybind11::arg("storage_preset_profile") = "", pybind11::arg("storage_config_uri") = "", pybind11::arg("snapshot_mode") = false, - pybind11::arg("snapshot_duration_ms") = 0, + pybind11::arg("snapshot_duration") = 0, pybind11::arg("start_time_ns") = -1, pybind11::arg("end_time_ns") = -1, pybind11::arg("custom_data") = KEY_VALUE_MAP{}) @@ -117,8 +117,8 @@ PYBIND11_MODULE(_storage, m) { "snapshot_mode", &rosbag2_storage::StorageOptions::snapshot_mode) .def_readwrite( - "snapshot_duration_ms", - &rosbag2_storage::StorageOptions::snapshot_duration_ms) + "snapshot_duration", + &rosbag2_storage::StorageOptions::snapshot_duration) .def_readwrite( "start_time_ns", &rosbag2_storage::StorageOptions::start_time_ns) diff --git a/rosbag2_storage/include/rosbag2_storage/storage_options.hpp b/rosbag2_storage/include/rosbag2_storage/storage_options.hpp index b62e2fc19b..00f51cd2b2 100644 --- a/rosbag2_storage/include/rosbag2_storage/storage_options.hpp +++ b/rosbag2_storage/include/rosbag2_storage/storage_options.hpp @@ -58,7 +58,7 @@ struct StorageOptions // The maximum snapshot duration in milliseconds. // A value of 0 indicates that snapshot will be limited by the max_cache_size only. - int64_t snapshot_duration_ms = 0; + int64_t snapshot_duration = 0; // Start and end time for cutting int64_t start_time_ns = -1; diff --git a/rosbag2_storage/src/rosbag2_storage/storage_options.cpp b/rosbag2_storage/src/rosbag2_storage/storage_options.cpp index d6ec6137bc..af63544eeb 100644 --- a/rosbag2_storage/src/rosbag2_storage/storage_options.cpp +++ b/rosbag2_storage/src/rosbag2_storage/storage_options.cpp @@ -32,7 +32,7 @@ Node convert::encode( node["storage_preset_profile"] = storage_options.storage_preset_profile; node["storage_config_uri"] = storage_options.storage_config_uri; node["snapshot_mode"] = storage_options.snapshot_mode; - node["snapshot_duration_ms"] = storage_options.snapshot_duration_ms; + node["snapshot_duration"] = storage_options.snapshot_duration; node["start_time_ns"] = storage_options.start_time_ns; node["end_time_ns"] = storage_options.end_time_ns; node["custom_data"] = storage_options.custom_data; @@ -51,7 +51,7 @@ bool convert::decode( node, "storage_preset_profile", storage_options.storage_preset_profile); optional_assign(node, "storage_config_uri", storage_options.storage_config_uri); optional_assign(node, "snapshot_mode", storage_options.snapshot_mode); - optional_assign(node, "snapshot_duration_ms", storage_options.snapshot_duration_ms); + optional_assign(node, "snapshot_duration", storage_options.snapshot_duration); optional_assign(node, "start_time_ns", storage_options.start_time_ns); optional_assign(node, "end_time_ns", storage_options.end_time_ns); using KEY_VALUE_MAP = std::unordered_map; diff --git a/rosbag2_storage/test/rosbag2_storage/test_storage_options.cpp b/rosbag2_storage/test/rosbag2_storage/test_storage_options.cpp index 2e30434ae2..f36000781b 100644 --- a/rosbag2_storage/test/rosbag2_storage/test_storage_options.cpp +++ b/rosbag2_storage/test/rosbag2_storage/test_storage_options.cpp @@ -29,7 +29,7 @@ TEST(storage_options, test_yaml_serialization) original.storage_preset_profile = "profile"; original.storage_config_uri = "config_uri"; original.snapshot_mode = true; - original.snapshot_duration_ms = 1500; + original.snapshot_duration = 1500; original.start_time_ns = 12345000; original.end_time_ns = 23456000; original.custom_data["key1"] = "value1"; @@ -51,7 +51,7 @@ TEST(storage_options, test_yaml_serialization) ASSERT_EQ(original.storage_preset_profile, reconstructed.storage_preset_profile); ASSERT_EQ(original.storage_config_uri, reconstructed.storage_config_uri); ASSERT_EQ(original.snapshot_mode, reconstructed.snapshot_mode); - ASSERT_EQ(original.snapshot_duration_ms, reconstructed.snapshot_duration_ms); + ASSERT_EQ(original.snapshot_duration, reconstructed.snapshot_duration); ASSERT_EQ(original.start_time_ns, reconstructed.start_time_ns); ASSERT_EQ(original.end_time_ns, reconstructed.end_time_ns); ASSERT_EQ(original.custom_data, reconstructed.custom_data); From b4f250654d5ab98e2a62524bd7cbcc655719a58f Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Mon, 28 Oct 2024 10:17:29 -0700 Subject: [PATCH 5/6] Change type for snapshot_duration to the rclcpp::Duration Signed-off-by: Michael Orlov --- ros2bag/ros2bag/verb/record.py | 7 ++- .../rosbag2_cpp/writers/sequential_writer.cpp | 5 +- rosbag2_py/rosbag2_py/_storage.pyi | 4 +- rosbag2_py/src/rosbag2_py/_storage.cpp | 57 ++++++++++++++++--- .../rosbag2_storage/storage_options.hpp | 7 ++- .../src/rosbag2_storage/storage_options.cpp | 3 +- .../rosbag2_storage/test_storage_options.cpp | 2 +- 7 files changed, 65 insertions(+), 20 deletions(-) diff --git a/ros2bag/ros2bag/verb/record.py b/ros2bag/ros2bag/verb/record.py index 3260ee4748..92eab48630 100644 --- a/ros2bag/ros2bag/verb/record.py +++ b/ros2bag/ros2bag/verb/record.py @@ -18,6 +18,7 @@ from rclpy.qos import InvalidQoSProfileException from ros2bag.api import add_writer_storage_plugin_extensions +from ros2bag.api import check_not_negative_float from ros2bag.api import convert_service_to_service_event_topic from ros2bag.api import convert_yaml_to_qos_profile from ros2bag.api import print_error @@ -177,8 +178,8 @@ def add_recorder_arguments(parser: ArgumentParser) -> None: 'the "/rosbag2_recorder/snapshot" service is called. e.g. \n ' 'ros2 service call /rosbag2_recorder/snapshot rosbag2_interfaces/Snapshot') parser.add_argument( - '--snapshot-duration', type=int, default=0, - help='Maximum snapshot duration in milliseconds.\n' + '--snapshot-duration', type=check_not_negative_float, default=0.0, + help='Maximum snapshot duration in a fraction of seconds.\n' 'Default: %(default)d, indicates that the snapshot will be limited by the' ' --max-cache-size parameter only. If the value is more than 0, the cyclic buffer' ' for the snapshot will be limited by both the series of messages duration and the' @@ -290,7 +291,7 @@ def validate_parsed_arguments(args, uri) -> str: if args.compression_queue_size < 0: return print_error('Compression queue size must be at least 0.') - if args.snapshot_mode and args.snapshot_duration == 0 and args.max_cache_size == 0: + if args.snapshot_mode and args.snapshot_duration == 0.0 and args.max_cache_size == 0: return print_error('In snapshot mode, either the snapshot_duration or max_bytes_size shall' ' not be set to zero.') diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 87397262be..51ec0c2e5f 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -146,7 +146,7 @@ void SequentialWriter::open( } use_cache_ = storage_options.max_cache_size > 0u || - (storage_options.snapshot_mode && storage_options.snapshot_duration > 0); + (storage_options.snapshot_mode && storage_options.snapshot_duration.nanoseconds() > 0); if (storage_options.snapshot_mode && !use_cache_) { throw std::runtime_error( "Either the max cache size or the maximum snapshot duration must be greater than 0" @@ -155,9 +155,8 @@ void SequentialWriter::open( if (use_cache_) { if (storage_options.snapshot_mode) { - int64_t max_buffer_duration_ns = storage_options.snapshot_duration * 1000000; message_cache_ = std::make_shared( - storage_options.max_cache_size, max_buffer_duration_ns); + storage_options.max_cache_size, storage_options.snapshot_duration.nanoseconds()); } else { message_cache_ = std::make_shared( storage_options.max_cache_size); diff --git a/rosbag2_py/rosbag2_py/_storage.pyi b/rosbag2_py/rosbag2_py/_storage.pyi index 1ff573c61a..19a211d580 100644 --- a/rosbag2_py/rosbag2_py/_storage.pyi +++ b/rosbag2_py/rosbag2_py/_storage.pyi @@ -101,14 +101,14 @@ class StorageOptions: max_bagfile_duration: int max_bagfile_size: int max_cache_size: int - snapshot_duration: int + snapshot_duration: object snapshot_mode: bool start_time_ns: int storage_config_uri: str storage_id: str storage_preset_profile: str uri: str - def __init__(self, uri: str, storage_id: str = ..., max_bagfile_size: int = ..., max_bagfile_duration: int = ..., max_cache_size: int = ..., storage_preset_profile: str = ..., storage_config_uri: str = ..., snapshot_mode: bool = ..., snapshot_duration: int = ..., start_time_ns: int = ..., end_time_ns: int = ..., custom_data: Dict[str, str] = ...) -> None: ... + def __init__(self, uri: str, storage_id: str = ..., max_bagfile_size: int = ..., max_bagfile_duration: int = ..., max_cache_size: int = ..., storage_preset_profile: str = ..., storage_config_uri: str = ..., snapshot_mode: bool = ..., snapshot_duration: object = ..., start_time_ns: int = ..., end_time_ns: int = ..., custom_data: Dict[str, str] = ...) -> None: ... class TopicInformation: message_count: int diff --git a/rosbag2_py/src/rosbag2_py/_storage.cpp b/rosbag2_py/src/rosbag2_py/_storage.cpp index 9abf1b382a..4739db6405 100644 --- a/rosbag2_py/src/rosbag2_py/_storage.cpp +++ b/rosbag2_py/src/rosbag2_py/_storage.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include "rosbag2_cpp/converter_options.hpp" @@ -44,6 +45,16 @@ std::chrono::nanoseconds from_rclpy_duration(const pybind11::object & duration) return std::chrono::nanoseconds(nanos); } +pybind11::object rclcpp_duration_to_py_float(const rclcpp::Duration & duration) +{ + return pybind11::cast(duration.seconds()); +} + +rclcpp::Duration py_float_to_rclcpp_duration(const pybind11::object & obj) +{ + return rclcpp::Duration::from_seconds(obj.cast()); +} + template pybind11::object to_rclpy_time(T time) { @@ -81,9 +92,36 @@ PYBIND11_MODULE(_storage, m) { using KEY_VALUE_MAP = std::unordered_map; pybind11::class_(m, "StorageOptions") .def( - pybind11::init< - std::string, std::string, uint64_t, uint64_t, uint64_t, std::string, std::string, bool, - int64_t, int64_t, int64_t, KEY_VALUE_MAP>(), + pybind11::init( + []( + std::string uri, + std::string storage_id, + uint64_t max_bagfile_size, + uint64_t max_bagfile_duration, + uint64_t max_cache_size, + std::string storage_preset_profile, + std::string storage_config_uri, + bool snapshot_mode, + const pybind11::object & snapshot_duration, + int64_t start_time_ns, + int64_t end_time_ns, + KEY_VALUE_MAP custom_data) + { + return rosbag2_storage::StorageOptions{ + std::move(uri), + std::move(storage_id), + max_bagfile_size, + max_bagfile_duration, + max_cache_size, + std::move(storage_preset_profile), + std::move(storage_config_uri), + snapshot_mode, + py_float_to_rclcpp_duration(snapshot_duration), + start_time_ns, + end_time_ns, + std::move(custom_data), + }; + }), pybind11::arg("uri"), pybind11::arg("storage_id") = "", pybind11::arg("max_bagfile_size") = 0, @@ -92,7 +130,7 @@ PYBIND11_MODULE(_storage, m) { pybind11::arg("storage_preset_profile") = "", pybind11::arg("storage_config_uri") = "", pybind11::arg("snapshot_mode") = false, - pybind11::arg("snapshot_duration") = 0, + pybind11::arg("snapshot_duration") = rclcpp_duration_to_py_float(rclcpp::Duration(0, 0)), pybind11::arg("start_time_ns") = -1, pybind11::arg("end_time_ns") = -1, pybind11::arg("custom_data") = KEY_VALUE_MAP{}) @@ -116,9 +154,14 @@ PYBIND11_MODULE(_storage, m) { .def_readwrite( "snapshot_mode", &rosbag2_storage::StorageOptions::snapshot_mode) - .def_readwrite( - "snapshot_duration", - &rosbag2_storage::StorageOptions::snapshot_duration) + .def_property( + "snapshot_duration", + [](const rosbag2_storage::StorageOptions & self) { + return rclcpp_duration_to_py_float(self.snapshot_duration); + }, + [](rosbag2_storage::StorageOptions & self, const pybind11::object & obj) { + self.snapshot_duration = py_float_to_rclcpp_duration(obj); + }) .def_readwrite( "start_time_ns", &rosbag2_storage::StorageOptions::start_time_ns) diff --git a/rosbag2_storage/include/rosbag2_storage/storage_options.hpp b/rosbag2_storage/include/rosbag2_storage/storage_options.hpp index 00f51cd2b2..68f652f740 100644 --- a/rosbag2_storage/include/rosbag2_storage/storage_options.hpp +++ b/rosbag2_storage/include/rosbag2_storage/storage_options.hpp @@ -19,6 +19,7 @@ #include #include +#include "rclcpp/duration.hpp" #include "rosbag2_storage/visibility_control.hpp" #include "rosbag2_storage/yaml.hpp" @@ -56,9 +57,9 @@ struct StorageOptions // Defaults to disabled. bool snapshot_mode = false; - // The maximum snapshot duration in milliseconds. - // A value of 0 indicates that snapshot will be limited by the max_cache_size only. - int64_t snapshot_duration = 0; + // The maximum snapshot duration. + // A value of 0.0 indicates that snapshot will be limited by the max_cache_size only. + rclcpp::Duration snapshot_duration{0, 0}; // Start and end time for cutting int64_t start_time_ns = -1; diff --git a/rosbag2_storage/src/rosbag2_storage/storage_options.cpp b/rosbag2_storage/src/rosbag2_storage/storage_options.cpp index af63544eeb..fbd37b91b6 100644 --- a/rosbag2_storage/src/rosbag2_storage/storage_options.cpp +++ b/rosbag2_storage/src/rosbag2_storage/storage_options.cpp @@ -15,6 +15,7 @@ #include #include +#include "rclcpp/duration.hpp" #include "rosbag2_storage/storage_options.hpp" namespace YAML @@ -51,7 +52,7 @@ bool convert::decode( node, "storage_preset_profile", storage_options.storage_preset_profile); optional_assign(node, "storage_config_uri", storage_options.storage_config_uri); optional_assign(node, "snapshot_mode", storage_options.snapshot_mode); - optional_assign(node, "snapshot_duration", storage_options.snapshot_duration); + optional_assign(node, "snapshot_duration", storage_options.snapshot_duration); optional_assign(node, "start_time_ns", storage_options.start_time_ns); optional_assign(node, "end_time_ns", storage_options.end_time_ns); using KEY_VALUE_MAP = std::unordered_map; diff --git a/rosbag2_storage/test/rosbag2_storage/test_storage_options.cpp b/rosbag2_storage/test/rosbag2_storage/test_storage_options.cpp index f36000781b..116296de28 100644 --- a/rosbag2_storage/test/rosbag2_storage/test_storage_options.cpp +++ b/rosbag2_storage/test/rosbag2_storage/test_storage_options.cpp @@ -29,7 +29,7 @@ TEST(storage_options, test_yaml_serialization) original.storage_preset_profile = "profile"; original.storage_config_uri = "config_uri"; original.snapshot_mode = true; - original.snapshot_duration = 1500; + original.snapshot_duration = rclcpp::Duration::from_seconds(1.5); original.start_time_ns = 12345000; original.end_time_ns = 23456000; original.custom_data["key1"] = "value1"; From 351b529245b2c114bf131b291e343c4597170fe9 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Mon, 28 Oct 2024 10:31:49 -0700 Subject: [PATCH 6/6] Add snapshot_duration handling in composable recorder and player nodes Signed-off-by: Michael Orlov --- .../src/rosbag2_transport/config_options_from_node_params.cpp | 3 +++ rosbag2_transport/test/resources/player_node_params.yaml | 3 +++ rosbag2_transport/test/resources/recorder_node_params.yaml | 3 +++ .../test/rosbag2_transport/test_composable_player.cpp | 1 + .../test/rosbag2_transport/test_composable_recorder.cpp | 1 + 5 files changed, 11 insertions(+) diff --git a/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp b/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp index ef6408db7c..73df125b6b 100644 --- a/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp +++ b/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp @@ -375,6 +375,9 @@ get_storage_options_from_node_params(rclcpp::Node & node) storage_options.snapshot_mode = node.declare_parameter("storage.snapshot_mode", false); + storage_options.snapshot_duration = + param_utils::get_duration_from_node_param(node, "storage.snapshot_duration", 0, 0); + auto list_of_key_value_strings = node.declare_parameter>( "storage.custom_data", std::vector()); diff --git a/rosbag2_transport/test/resources/player_node_params.yaml b/rosbag2_transport/test/resources/player_node_params.yaml index 79e1b47a93..48cb71d8e4 100644 --- a/rosbag2_transport/test/resources/player_node_params.yaml +++ b/rosbag2_transport/test/resources/player_node_params.yaml @@ -49,4 +49,7 @@ player_params_node: max_cache_size: 9898 storage_preset_profile: "resilient" snapshot_mode: false + snapshot_duration: + sec: 1 + nsec: 500000000 custom_data: ["key1=value1", "key2=value2"] \ No newline at end of file diff --git a/rosbag2_transport/test/resources/recorder_node_params.yaml b/rosbag2_transport/test/resources/recorder_node_params.yaml index 701f623bf4..c295e803d6 100644 --- a/rosbag2_transport/test/resources/recorder_node_params.yaml +++ b/rosbag2_transport/test/resources/recorder_node_params.yaml @@ -41,6 +41,9 @@ recorder_params_node: max_cache_size: 989888 storage_preset_profile: "none" snapshot_mode: false + snapshot_duration: + sec: 1 + nsec: 500000000 custom_data: ["key1=value1", "key2=value2"] start_time_ns: 0 end_time_ns: 100000 diff --git a/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp b/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp index 3875a0b1b0..99b31de2b3 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp @@ -186,6 +186,7 @@ TEST_P(ComposablePlayerTests, player_can_parse_parameters_from_file) { EXPECT_EQ(storage_options.max_cache_size, 9898); EXPECT_EQ(storage_options.storage_preset_profile, "resilient"); EXPECT_EQ(storage_options.snapshot_mode, false); + EXPECT_DOUBLE_EQ(storage_options.snapshot_duration.seconds(), 1.5); std::unordered_map custom_data{ std::pair{"key1", "value1"}, std::pair{"key2", "value2"} diff --git a/rosbag2_transport/test/rosbag2_transport/test_composable_recorder.cpp b/rosbag2_transport/test/rosbag2_transport/test_composable_recorder.cpp index 97a89bc5a9..f6d18bbe30 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_composable_recorder.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_composable_recorder.cpp @@ -256,6 +256,7 @@ TEST_P(ComposableRecorderTests, recorder_can_parse_parameters_from_file) { EXPECT_EQ(storage_options.max_cache_size, 989888); EXPECT_EQ(storage_options.storage_preset_profile, "none"); EXPECT_EQ(storage_options.snapshot_mode, false); + EXPECT_DOUBLE_EQ(storage_options.snapshot_duration.seconds(), 1.5); std::unordered_map custom_data{ std::pair{"key1", "value1"}, std::pair{"key2", "value2"}