Skip to content

Commit

Permalink
Make snapshot writing into a new file each time it is triggered (#1842)
Browse files Browse the repository at this point in the history
* Make snapshot writing into a new file each time when it is triggered

- Note. Snapshot now became a blocking call and mutually exclusive with
writer::write(message) method to avoid race conditions.
i.e. blocking the same writer_mutex_

Signed-off-by: Michael Orlov <[email protected]>

* Add unit test to make sure that snapshot writing in the new file

Co-authored-by: Clemens Mühlbacher <[email protected]>
Signed-off-by: Michael Orlov <[email protected]>

* Add support for snapshot with file compression

Signed-off-by: Michael Orlov <[email protected]>

* Rename newly added tests to avoid misunderstanding

Signed-off-by: Michael Orlov <[email protected]>

* Address review comments in tests

Signed-off-by: Michael Orlov <[email protected]>

* Change order of includes in the test_sequential_compression_writer.cpp

Signed-off-by: Michael Orlov <[email protected]>

* Update metadata_.message_count unconditionally in write_messages(..)

Signed-off-by: Michael Orlov <[email protected]>

---------

Signed-off-by: Michael Orlov <[email protected]>
Co-authored-by: Clemens Mühlbacher <[email protected]>
  • Loading branch information
MichaelOrlov and cmuehlbacher authored Nov 1, 2024
1 parent ee7f68a commit 3f2281f
Show file tree
Hide file tree
Showing 4 changed files with 296 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "rosbag2_cpp/writer.hpp"

#include "rosbag2_storage/ros_helper.hpp"
#include "rosbag2_storage/storage_options.hpp"

#include "mock_converter_factory.hpp"
Expand Down Expand Up @@ -624,6 +625,82 @@ TEST_P(SequentialCompressionWriterTest, split_event_calls_callback_with_file_com
}
}

TEST_F(SequentialCompressionWriterTest, snapshot_writes_to_new_file_with_file_compression)
{
tmp_dir_storage_options_.max_bagfile_size = 0;
tmp_dir_storage_options_.max_cache_size = 200;
tmp_dir_storage_options_.snapshot_mode = true;

initializeFakeFileStorage();
// Expect a single write call when the snapshot is triggered
EXPECT_CALL(
*storage_, write(
An<const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> &>())
).Times(1);

rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::FILE,
1,
1,
kDefaultCompressionQueueThreadsPriority
};
initializeWriter(compression_options);

std::vector<std::string> closed_files;
std::vector<std::string> opened_files;
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) {
closed_files.emplace_back(info.closed_file);
opened_files.emplace_back(info.opened_file);
};
writer_->add_event_callbacks(callbacks);

std::string rmw_format = "rmw_format";

std::string msg_content = "Hello";
auto msg_length = msg_content.length();
auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";
message->serialized_data =
rosbag2_storage::make_serialized_message(msg_content.c_str(), msg_length);

writer_->open(tmp_dir_storage_options_, {rmw_format, rmw_format});
writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""});

for (size_t i = 0; i < 100; i++) {
writer_->write(message);
}
writer_->take_snapshot();
writer_->close();

EXPECT_THAT(closed_files.size(), 2);
EXPECT_THAT(opened_files.size(), 2);

if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == 2))) {
// Output debug info
for (size_t i = 0; i < opened_files.size(); i++) {
std::cout << "opened_file[" << i << "] = '" << opened_files[i] <<
"'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl;
}
}

ASSERT_EQ(opened_files.size(), 2);
ASSERT_EQ(closed_files.size(), 2);

for (size_t i = 0; i < 2; i++) {
auto expected_closed = fs::path(tmp_dir_storage_options_.uri) /
(bag_base_dir_ + "_" + std::to_string(i) + "." + DefaultTestCompressor);
auto expected_opened = (i == 1) ?
// The last opened file shall be empty string when we do "writer->close();"
"" : fs::path(tmp_dir_storage_options_.uri) /
(bag_base_dir_ + "_" + std::to_string(i + 1));
ASSERT_STREQ(closed_files[i].c_str(), expected_closed.generic_string().c_str());
ASSERT_STREQ(opened_files[i].c_str(), expected_opened.generic_string().c_str());
}
}

INSTANTIATE_TEST_SUITE_P(
SequentialCompressionWriterTestQueueSizes,
SequentialCompressionWriterTest,
Expand Down
1 change: 1 addition & 0 deletions rosbag2_cpp/src/rosbag2_cpp/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ void Writer::remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type

bool Writer::take_snapshot()
{
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
return writer_impl_->take_snapshot();
}

Expand Down
20 changes: 18 additions & 2 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,13 @@ void SequentialWriter::init_metadata()
metadata_.storage_identifier = storage_->get_storage_identifier();
metadata_.starting_time = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds::max());
metadata_.duration = std::chrono::nanoseconds(0);
metadata_.relative_file_paths = {strip_parent_path(storage_->get_relative_file_path())};
rosbag2_storage::FileInformation file_info{};
file_info.path = strip_parent_path(storage_->get_relative_file_path());
file_info.starting_time = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds::max());
file_info.duration = std::chrono::nanoseconds(0);
file_info.message_count = 0;
metadata_.custom_data = storage_options_.custom_data;
metadata_.files = {file_info};
Expand Down Expand Up @@ -409,7 +411,7 @@ void SequentialWriter::write(std::shared_ptr<const rosbag2_storage::SerializedBa
is_first_message_ = false;
}

if (should_split_bagfile(message_timestamp)) {
if (!storage_options_.snapshot_mode && should_split_bagfile(message_timestamp)) {
split_bagfile();
metadata_.files.back().starting_time = message_timestamp;
}
Expand Down Expand Up @@ -441,10 +443,13 @@ void SequentialWriter::write(std::shared_ptr<const rosbag2_storage::SerializedBa
bool SequentialWriter::take_snapshot()
{
if (!storage_options_.snapshot_mode) {
ROSBAG2_CPP_LOG_WARN("SequentialWriter take_snaphot called when snapshot mode is disabled");
ROSBAG2_CPP_LOG_WARN("SequentialWriter take_snapshot called when snapshot mode is disabled");
return false;
}
// Note: Information about start, duration and num messages for the current file in metadata_
// will be updated in the write_messages(..), when cache_consumer call it as a callback.
message_cache_->notify_data_ready();
split_bagfile();
return true;
}

Expand Down Expand Up @@ -528,6 +533,17 @@ void SequentialWriter::write_messages(
return;
}
storage_->write(messages);
if (storage_options_.snapshot_mode) {
// Update FileInformation about the last file in metadata in case of snapshot mode
const auto first_msg_timestamp = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds(messages.front()->recv_timestamp));
const auto last_msg_timestamp = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds(messages.back()->recv_timestamp));
metadata_.files.back().starting_time = first_msg_timestamp;
metadata_.files.back().duration = last_msg_timestamp - first_msg_timestamp;
metadata_.files.back().message_count = messages.size();
}
metadata_.message_count += messages.size();
std::lock_guard<std::mutex> lock(topics_info_mutex_);
for (const auto & msg : messages) {
if (topics_names_to_info_.find(msg->topic_name) != topics_names_to_info_.end()) {
Expand Down
200 changes: 200 additions & 0 deletions rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,206 @@ TEST_F(SequentialWriterTest, snapshot_mode_zero_cache_size_throws_exception)
EXPECT_THROW(writer_->open(storage_options_, {rmw_format, rmw_format}), std::runtime_error);
}

TEST_F(SequentialWriterTest, snapshot_writes_to_new_file_with_bag_split)
{
storage_options_.max_bagfile_size = 0;
storage_options_.max_cache_size = 200;
storage_options_.snapshot_mode = true;
const rcutils_time_point_value_t first_msg_timestamp = 100;
const size_t num_msgs_to_write = 100;
const std::string topic_name = "test_topic";
std::string msg_content = "Hello";
const size_t serialized_msg_buffer_length = msg_content.length();
const size_t num_expected_msgs = storage_options_.max_cache_size / serialized_msg_buffer_length;
const size_t expected_start_time = first_msg_timestamp + (num_msgs_to_write - num_expected_msgs);
const auto expected_last_msg_timestamp = (first_msg_timestamp + num_msgs_to_write - 1);
const size_t expected_duration = expected_last_msg_timestamp - expected_start_time;
// Prepare vector of messages
std::vector<rosbag2_storage::SerializedBagMessageSharedPtr> messages;
for (size_t i = 0; i < num_msgs_to_write; i++) {
auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->recv_timestamp = first_msg_timestamp + static_cast<rcutils_time_point_value_t>(i);
message->send_timestamp = first_msg_timestamp + static_cast<rcutils_time_point_value_t>(i);
message->topic_name = topic_name;
message->serialized_data =
rosbag2_storage::make_serialized_message(msg_content.c_str(), msg_content.length());
messages.push_back(message);
}

// Expect a single write call when the snapshot is triggered
EXPECT_CALL(
*storage_, write(
An<const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> &>())
).Times(1);

ON_CALL(
*storage_,
write(An<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>>())).WillByDefault(
[this](std::shared_ptr<const rosbag2_storage::SerializedBagMessage>) {
fake_storage_size_ += 1;
});

ON_CALL(*storage_, get_bagfile_size).WillByDefault(
[this]() {
return fake_storage_size_.load();
});

ON_CALL(*metadata_io_, write_metadata).WillByDefault(
[this](const std::string &, const rosbag2_storage::BagMetadata & metadata) {
fake_metadata_ = metadata;
});

ON_CALL(*storage_, get_relative_file_path).WillByDefault(
[this]() {
return fake_storage_uri_;
});

auto sequential_writer = std::make_unique<rosbag2_cpp::writers::SequentialWriter>(
std::move(storage_factory_), converter_factory_, std::move(metadata_io_));
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));

std::vector<std::string> closed_files;
std::vector<std::string> opened_files;
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) {
closed_files.emplace_back(info.closed_file);
opened_files.emplace_back(info.opened_file);
};
writer_->add_event_callbacks(callbacks);

std::string rmw_format = "rmw_format";

writer_->open(storage_options_, {rmw_format, rmw_format});
writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""});

for (const auto & message : messages) {
writer_->write(message);
}
writer_->take_snapshot();

EXPECT_THAT(closed_files.size(), 1);
EXPECT_THAT(opened_files.size(), 1);

if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == 1))) {
// Output debug info
for (size_t i = 0; i < opened_files.size(); i++) {
std::cout << "opened_file[" << i << "] = '" << opened_files[i] <<
"'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl;
}
}

ASSERT_EQ(opened_files.size(), 1);
ASSERT_EQ(closed_files.size(), 1);

const auto expected_closed = fs::path(storage_options_.uri) / (bag_base_dir_ + "_0");
const auto expected_opened = fs::path(storage_options_.uri) / (bag_base_dir_ + "_1");
ASSERT_STREQ(closed_files[0].c_str(), expected_closed.generic_string().c_str());
ASSERT_STREQ(opened_files[0].c_str(), expected_opened.generic_string().c_str());

// Check metadata
ASSERT_EQ(v_intercepted_update_metadata_.size(), 3u);
// The v_intercepted_update_metadata_[0] is the very first metadata saved from the writer's
// constructor. We don't update it during the snapshot, and it doesn't make sense checking it.
// The v_intercepted_update_metadata_[1] is the metadata written right before closing the file
// with the new snapshot.
// The v_intercepted_update_metadata_[2] is the metadata written when we are opening a new file
// after switching to a new storage.
EXPECT_EQ(v_intercepted_update_metadata_[1].message_count, num_expected_msgs);
EXPECT_EQ(v_intercepted_update_metadata_[2].message_count, num_expected_msgs);
EXPECT_EQ(
std::chrono::time_point_cast<std::chrono::nanoseconds>(
v_intercepted_update_metadata_[1].starting_time).time_since_epoch().count(),
first_msg_timestamp);

ASSERT_FALSE(v_intercepted_update_metadata_[1].files.empty());
const auto & first_file_info = v_intercepted_update_metadata_[1].files[0];
EXPECT_STREQ(first_file_info.path.c_str(), std::string(bag_base_dir_ + "_0").c_str());
EXPECT_EQ(first_file_info.message_count, num_expected_msgs);
EXPECT_EQ(
std::chrono::time_point_cast<std::chrono::nanoseconds>(
first_file_info.starting_time).time_since_epoch().count(),
expected_start_time);
EXPECT_EQ(first_file_info.duration.count(), expected_duration);
}

TEST_F(SequentialWriterTest, snapshot_can_be_called_twice)
{
storage_options_.max_bagfile_size = 0;
storage_options_.max_cache_size = 200;
storage_options_.snapshot_mode = true;
const size_t num_msgs_to_write = 100;

// Expect to call write method twice. Once per each snapshot.
EXPECT_CALL(
*storage_, write(
An<const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> &>())
).Times(2);

ON_CALL(*storage_, get_relative_file_path).WillByDefault(
[this]() {
return fake_storage_uri_;
});

auto sequential_writer = std::make_unique<rosbag2_cpp::writers::SequentialWriter>(
std::move(storage_factory_), converter_factory_, std::move(metadata_io_));
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));

std::vector<std::string> closed_files;
std::vector<std::string> opened_files;
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) {
closed_files.emplace_back(info.closed_file);
opened_files.emplace_back(info.opened_file);
};
writer_->add_event_callbacks(callbacks);

std::string rmw_format = "rmw_format";

writer_->open(storage_options_, {rmw_format, rmw_format});
writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""});

std::string msg_content = "Hello";
auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";
message->serialized_data =
rosbag2_storage::make_serialized_message(msg_content.c_str(), msg_content.length());

for (size_t i = 0; i < num_msgs_to_write / 2; i++) {
writer_->write(message);
}
writer_->take_snapshot();

for (size_t i = num_msgs_to_write / 2; i < num_msgs_to_write; i++) {
writer_->write(message);
}
writer_->take_snapshot();

EXPECT_THAT(closed_files.size(), 2);
EXPECT_THAT(opened_files.size(), 2);

if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == 2))) {
// Output debug info
for (size_t i = 0; i < opened_files.size(); i++) {
std::cout << "opened_file[" << i << "] = '" << opened_files[i] <<
"'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl;
}
}

ASSERT_EQ(opened_files.size(), 2);
ASSERT_EQ(closed_files.size(), 2);

for (size_t i = 0; i < opened_files.size(); i++) {
const auto expected_closed = fs::path(storage_options_.uri) /
(bag_base_dir_ + "_" + std::to_string(i));
const auto expected_opened = fs::path(storage_options_.uri) /
(bag_base_dir_ + "_" + std::to_string(i + 1));
ASSERT_STREQ(closed_files[i].c_str(), expected_closed.generic_string().c_str());
ASSERT_STREQ(opened_files[i].c_str(), expected_opened.generic_string().c_str());
}
}

TEST_F(SequentialWriterTest, split_event_calls_callback)
{
const uint64_t max_bagfile_size = 3;
Expand Down

0 comments on commit 3f2281f

Please sign in to comment.