Skip to content

Commit

Permalink
Fix playback of compressed bagfiles (#417)
Browse files Browse the repository at this point in the history
Signed-off-by: Emerson Knapp <[email protected]>
  • Loading branch information
emersonknapp authored May 26, 2020
1 parent 75c8126 commit 2438e91
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ void SequentialCompressionReader::open(
ROSBAG2_COMPRESSION_LOG_WARN("No topics were listed in metadata.");
return;
}
fill_topics_metadata();

// Currently a bag file can only be played if all topics have the same serialization format.
check_topics_serialization_formats(topics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ class SequentialCompressionReaderTest : public Test
ON_CALL(*storage_factory_, open_read_only(_, _)).WillByDefault(Return(storage_));
}

rosbag2_storage::BagMetadata construct_default_bag_metadata() const
{
rosbag2_storage::BagMetadata metadata;
metadata.relative_file_paths = {"/path/to/storage"};
metadata.topics_with_message_count.push_back({{topic_with_type_}, 1});
metadata.compression_format = "zstd";
metadata.compression_mode =
rosbag2_compression::compression_mode_to_string(rosbag2_compression::CompressionMode::FILE);
return metadata;
}

std::unique_ptr<StrictMock<MockStorageFactory>> storage_factory_;
std::shared_ptr<NiceMock<MockStorage>> storage_;
std::shared_ptr<StrictMock<MockConverterFactory>> converter_factory_;
Expand All @@ -65,12 +76,8 @@ class SequentialCompressionReaderTest : public Test

TEST_F(SequentialCompressionReaderTest, open_throws_if_unsupported_compressor)
{
rosbag2_storage::BagMetadata metadata;
metadata.relative_file_paths = {"/path/to/storage"};
metadata.topics_with_message_count.push_back({{topic_with_type_}, 1});
rosbag2_storage::BagMetadata metadata = construct_default_bag_metadata();
metadata.compression_format = "bad_format";
metadata.compression_mode =
rosbag2_compression::compression_mode_to_string(rosbag2_compression::CompressionMode::FILE);
EXPECT_CALL(*metadata_io_, read_metadata(_)).WillRepeatedly(Return(metadata));
EXPECT_CALL(*metadata_io_, metadata_file_exists(_)).WillRepeatedly(Return(true));
auto compression_factory = std::make_unique<rosbag2_compression::CompressionFactory>();
Expand All @@ -87,14 +94,36 @@ TEST_F(SequentialCompressionReaderTest, open_throws_if_unsupported_compressor)
std::invalid_argument);
}

TEST_F(SequentialCompressionReaderTest, returns_all_topics_and_types)
{
rosbag2_storage::BagMetadata metadata = construct_default_bag_metadata();
ON_CALL(*metadata_io_, read_metadata(_)).WillByDefault(Return(metadata));
ON_CALL(*metadata_io_, metadata_file_exists(_)).WillByDefault(Return(true));

auto decompressor = std::make_unique<NiceMock<MockDecompressor>>();
auto compression_factory = std::make_unique<StrictMock<MockCompressionFactory>>();

ON_CALL(*compression_factory, create_decompressor(_))
.WillByDefault(Return(ByMove(std::move(decompressor))));
EXPECT_CALL(*compression_factory, create_decompressor(_)).Times(1);
EXPECT_CALL(*storage_factory_, open_read_only(_, _)).Times(1);

auto compression_reader = std::make_unique<rosbag2_compression::SequentialCompressionReader>(
std::move(compression_factory),
std::move(storage_factory_),
converter_factory_,
std::move(metadata_io_));

compression_reader->open(
rosbag2_cpp::StorageOptions(), {"", storage_serialization_format_});

auto topics_and_types = compression_reader->get_all_topics_and_types();
EXPECT_FALSE(topics_and_types.empty());
}

TEST_F(SequentialCompressionReaderTest, open_supports_zstd_compressor)
{
rosbag2_storage::BagMetadata metadata;
metadata.relative_file_paths = {"/path/to/storage"};
metadata.topics_with_message_count.push_back({{topic_with_type_}, 1});
metadata.compression_format = "zstd";
metadata.compression_mode =
rosbag2_compression::compression_mode_to_string(rosbag2_compression::CompressionMode::FILE);
rosbag2_storage::BagMetadata metadata = construct_default_bag_metadata();
ON_CALL(*metadata_io_, read_metadata(_)).WillByDefault(Return(metadata));
ON_CALL(*metadata_io_, metadata_file_exists(_)).WillByDefault(Return(true));
auto compression_factory = std::make_unique<rosbag2_compression::CompressionFactory>();
Expand All @@ -114,12 +143,7 @@ TEST_F(SequentialCompressionReaderTest, open_supports_zstd_compressor)

TEST_F(SequentialCompressionReaderTest, reader_calls_create_decompressor)
{
rosbag2_storage::BagMetadata metadata;
metadata.relative_file_paths = {"/path/to/storage"};
metadata.topics_with_message_count.push_back({{topic_with_type_}, 1});
metadata.compression_format = "zstd";
metadata.compression_mode =
rosbag2_compression::compression_mode_to_string(rosbag2_compression::CompressionMode::FILE);
rosbag2_storage::BagMetadata metadata = construct_default_bag_metadata();
ON_CALL(*metadata_io_, read_metadata(_)).WillByDefault(Return(metadata));
ON_CALL(*metadata_io_, metadata_file_exists(_)).WillByDefault(Return(true));

Expand Down
5 changes: 5 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ class ROSBAG2_CPP_PUBLIC SequentialReader
const std::string & converter_serialization_format,
const std::string & storage_serialization_format);

/**
* Fill topics_metadata_ cache vector with information from metadata_
*/
virtual void fill_topics_metadata();

std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory_{};
std::shared_ptr<rosbag2_storage::storage_interfaces::ReadOnlyInterface> storage_{};
std::unique_ptr<Converter> converter_{};
Expand Down
26 changes: 12 additions & 14 deletions rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,6 @@
#include "rosbag2_cpp/logging.hpp"
#include "rosbag2_cpp/readers/sequential_reader.hpp"

namespace
{
void fill_topics_and_types(
const rosbag2_storage::BagMetadata & metadata,
std::vector<rosbag2_storage::TopicMetadata> & topics_and_types)
{
topics_and_types.clear();
topics_and_types.reserve(metadata.topics_with_message_count.size());
for (const auto & topic_information : metadata.topics_with_message_count) {
topics_and_types.push_back(topic_information.topic_metadata);
}
}
} // unnamed namespace

namespace rosbag2_cpp
{
Expand Down Expand Up @@ -133,7 +120,7 @@ void SequentialReader::open(
ROSBAG2_CPP_LOG_WARN("No topics were listed in metadata.");
return;
}
fill_topics_and_types(metadata_, topics_metadata_);
fill_topics_metadata();

// Currently a bag file can only be played if all topics have the same serialization format.
check_topics_serialization_formats(topics);
Expand Down Expand Up @@ -251,5 +238,16 @@ void SequentialReader::check_converter_serialization_format(
}
}
}

void SequentialReader::fill_topics_metadata()
{
rcpputils::check_true(storage_ != nullptr, "Bag is not open. Call open() before reading.");
topics_metadata_.clear();
topics_metadata_.reserve(metadata_.topics_with_message_count.size());
for (const auto & topic_information : metadata_.topics_with_message_count) {
topics_metadata_.push_back(topic_information.topic_metadata);
}
}

} // namespace readers
} // namespace rosbag2_cpp

0 comments on commit 2438e91

Please sign in to comment.