-
Notifications
You must be signed in to change notification settings - Fork 249
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] The time bounded snapshot feature #1844
base: rolling
Are you sure you want to change the base?
Changes from all commits
f806243
6e439e6
3d637f0
a126442
b4f2506
351b529
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,25 +25,44 @@ namespace rosbag2_cpp | |
namespace cache | ||
{ | ||
|
||
MessageCacheCircularBuffer::MessageCacheCircularBuffer(size_t max_cache_size) | ||
: max_bytes_size_(max_cache_size) | ||
MessageCacheCircularBuffer::MessageCacheCircularBuffer( | ||
size_t max_cache_size, | ||
int64_t max_cache_duration_ns) | ||
: max_bytes_size_(max_cache_size), max_cache_duration_(max_cache_duration_ns) | ||
{ | ||
if (max_bytes_size_ == 0 && max_cache_duration_ == 0) { | ||
ROSBAG2_CPP_LOG_ERROR_STREAM("Invalid arguments for the MessageCacheCircularBuffer. " | ||
"Both max_bytes_size and max_cache_duration are zero."); | ||
Comment on lines
+34
to
+35
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if this is debug message, it should include some extra debug information that the following exception cannot throw as in the message? otherwise, it is the same information with the following exception, so maybe debugging message here is not really useful since user can see the exact same message via exception? |
||
throw std::invalid_argument("Invalid arguments for the MessageCacheCircularBuffer. " | ||
"Both max_bytes_size and max_cache_duration are zero."); | ||
} | ||
} | ||
|
||
bool MessageCacheCircularBuffer::push(CacheBufferInterface::buffer_element_t msg) | ||
{ | ||
// Drop message if it exceeds the buffer size | ||
if (msg->serialized_data->buffer_length > max_bytes_size_) { | ||
if (buffer_bytes_size_ > 0 && msg->serialized_data->buffer_length > max_bytes_size_) { | ||
ROSBAG2_CPP_LOG_WARN_STREAM("Last message exceeds snapshot buffer size. Dropping message!"); | ||
return false; | ||
} | ||
|
||
// Remove any old items until there is room for new message | ||
while (buffer_bytes_size_ > (max_bytes_size_ - msg->serialized_data->buffer_length)) { | ||
// Remove any old items until there is room for a new message | ||
while (max_bytes_size_ > 0 && | ||
buffer_bytes_size_ > (max_bytes_size_ - msg->serialized_data->buffer_length)) | ||
{ | ||
buffer_bytes_size_ -= buffer_.front()->serialized_data->buffer_length; | ||
buffer_.pop_front(); | ||
} | ||
// Add new message to end of buffer | ||
// Remove any old items until the difference between last and newest message timestamp | ||
// will be less than or equal to the max_cache_duration_. | ||
if (buffer_.size() > 1) { | ||
auto current_buffer_duration = buffer_.front()->recv_timestamp - buffer_.back()->recv_timestamp; | ||
while (max_cache_duration_ > 0 && current_buffer_duration > max_cache_duration_) { | ||
buffer_.pop_front(); | ||
current_buffer_duration = buffer_.front()->recv_timestamp - buffer_.back()->recv_timestamp; | ||
} | ||
} | ||
// Add a new message to the end of the buffer | ||
buffer_bytes_size_ += msg->serialized_data->buffer_length; | ||
buffer_.push_back(msg); | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -145,16 +145,18 @@ void SequentialWriter::open( | |||||||
throw std::runtime_error{error.str()}; | ||||||||
} | ||||||||
|
||||||||
use_cache_ = storage_options.max_cache_size > 0u; | ||||||||
use_cache_ = storage_options.max_cache_size > 0u || | ||||||||
(storage_options.snapshot_mode && storage_options.snapshot_duration.nanoseconds() > 0); | ||||||||
Comment on lines
+148
to
+149
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. checking
Suggested change
|
||||||||
if (storage_options.snapshot_mode && !use_cache_) { | ||||||||
throw std::runtime_error( | ||||||||
"Max cache size must be greater than 0 when snapshot mode is enabled"); | ||||||||
"Either the max cache size or the maximum snapshot duration must be greater than 0" | ||||||||
" when snapshot mode is enabled"); | ||||||||
} | ||||||||
|
||||||||
if (use_cache_) { | ||||||||
if (storage_options.snapshot_mode) { | ||||||||
message_cache_ = std::make_shared<rosbag2_cpp::cache::CircularMessageCache>( | ||||||||
storage_options.max_cache_size); | ||||||||
storage_options.max_cache_size, storage_options.snapshot_duration.nanoseconds()); | ||||||||
} else { | ||||||||
message_cache_ = std::make_shared<rosbag2_cpp::cache::MessageCache>( | ||||||||
storage_options.max_cache_size); | ||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,4 +49,7 @@ player_params_node: | |
max_cache_size: 9898 | ||
storage_preset_profile: "resilient" | ||
snapshot_mode: false | ||
snapshot_duration: | ||
sec: 1 | ||
nsec: 500000000 | ||
custom_data: ["key1=value1", "key2=value2"] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Requires a newline at the end of the file? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
accidental typo?