From 8a7ce11af71172de26c9aadda6ef95b62075b066 Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Mon, 13 Apr 2020 17:45:22 -0700 Subject: [PATCH 1/2] Deduplicate code in SequentialCompressionReader Signed-off-by: Emerson Knapp --- .../sequential_compression_reader.hpp | 72 +---------- .../sequential_compression_reader.cpp | 117 +----------------- .../rosbag2_cpp/readers/sequential_reader.hpp | 1 - 3 files changed, 7 insertions(+), 183 deletions(-) diff --git a/rosbag2_compression/include/rosbag2_compression/sequential_compression_reader.hpp b/rosbag2_compression/include/rosbag2_compression/sequential_compression_reader.hpp index e0d275675e..90e9fd435b 100644 --- a/rosbag2_compression/include/rosbag2_compression/sequential_compression_reader.hpp +++ b/rosbag2_compression/include/rosbag2_compression/sequential_compression_reader.hpp @@ -23,7 +23,7 @@ #include "rosbag2_compression/compression_options.hpp" #include "rosbag2_cpp/converter.hpp" -#include "rosbag2_cpp/reader_interfaces/base_reader_interface.hpp" +#include "rosbag2_cpp/readers/sequential_reader.hpp" #include "rosbag2_cpp/serialization_format_converter_factory.hpp" #include "rosbag2_cpp/serialization_format_converter_factory_interface.hpp" @@ -47,7 +47,7 @@ namespace rosbag2_compression { class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionReader - : public ::rosbag2_cpp::reader_interfaces::BaseReaderInterface + : public rosbag2_cpp::readers::SequentialReader { public: explicit SequentialCompressionReader( @@ -66,36 +66,8 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionReader const rosbag2_cpp::StorageOptions & storage_options, const rosbag2_cpp::ConverterOptions & converter_options) override; - void reset() override; - - bool has_next() override; - std::shared_ptr read_next() override; - std::vector get_all_topics_and_types() override; - - void set_filter(const rosbag2_storage::StorageFilter & storage_filter) override; - - void reset_filter() override; - - /** - * Ask whether there is another database file to read from the list of relative - * file paths. - * - * \return true if there are still files to read in the list - */ - virtual bool has_next_file() const; - - /** - * Return the relative file path pointed to by the current file iterator. - */ - virtual std::string get_current_file() const; - - /** - * Return the URI of the current file (i.e. no extensions). - */ - virtual std::string get_current_uri() const; - protected: /** * Increment the current file iterator to point to the next file in the list of relative file @@ -104,7 +76,7 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionReader * Expected usage: * if (has_next_file()) load_next_file(); */ - virtual void load_next_file(); + void load_next_file() override; /** * Initializes the decompressor if a compression mode is specified in the metadata. @@ -113,46 +85,8 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionReader */ virtual void setup_decompression(); - /** - * Set the file path currently pointed to by the iterator. - * - * \param file Relative path to the file. - */ - virtual void set_current_file(const std::string & file); - private: - /** - * Checks if all topics in the bagfile have the same RMW serialization format. - * Currently a bag file can only be played if all topics have the same serialization format. - * - * \param topics Vector of TopicInformation with metadata. - * \throws runtime_error if any topic has a different serialization format from the rest. - */ - virtual void check_topics_serialization_formats( - const std::vector & topics); - - /** - * Checks if the serialization format of the converter factory is the same as that of the storage - * factory. - * If not, changes the serialization format of the converter factory to use the serialization - * format of the storage factory. - * - * \param converter_serialization_format - * \param storage_serialization_format - */ - virtual void check_converter_serialization_format( - const std::string & converter_serialization_format, - const std::string & storage_serialization_format); - - std::unique_ptr storage_factory_{}; - std::shared_ptr converter_factory_{}; - std::shared_ptr storage_{}; - std::unique_ptr converter_{}; std::unique_ptr decompressor_{}; - std::unique_ptr metadata_io_{}; - rosbag2_storage::BagMetadata metadata_{}; - std::vector file_paths_{}; // List of database files. - std::vector::iterator current_file_iterator_{}; // Index of file to read from rosbag2_compression::CompressionMode compression_mode_{ rosbag2_compression::CompressionMode::NONE}; std::unique_ptr compression_factory_{}; diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_reader.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_reader.cpp index aa288b49e6..0870d94e29 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_reader.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_reader.cpp @@ -35,21 +35,12 @@ SequentialCompressionReader::SequentialCompressionReader( std::unique_ptr storage_factory, std::shared_ptr converter_factory, std::unique_ptr metadata_io) -: storage_factory_{std::move(storage_factory)}, - converter_factory_{std::move(converter_factory)}, - metadata_io_{std::move(metadata_io)}, +: SequentialReader(std::move(storage_factory), converter_factory, std::move(metadata_io)), compression_factory_{std::move(compression_factory)} {} SequentialCompressionReader::~SequentialCompressionReader() -{ - reset(); -} - -void SequentialCompressionReader::reset() -{ - storage_.reset(); -} +{} void SequentialCompressionReader::setup_decompression() { @@ -58,7 +49,7 @@ void SequentialCompressionReader::setup_decompression() decompressor_ = compression_factory_->create_decompressor(metadata_.compression_format); // Decompress the first file so that it is readable. ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM("Decompressing " << get_current_file().c_str()); - set_current_file(decompressor_->decompress_uri(get_current_file())); + *current_file_iterator_ = decompressor_->decompress_uri(get_current_file()); } else { throw std::invalid_argument{ "SequentialCompressionReader requires a CompressionMode that is not NONE!"}; @@ -107,21 +98,6 @@ void SequentialCompressionReader::open( topics[0].topic_metadata.serialization_format); } -bool SequentialCompressionReader::has_next() -{ - if (storage_) { - // If there's no new message, check if there's at least another file to read and update storage - // to read from there. Otherwise, check if there's another message. - if (!storage_->has_next() && has_next_file()) { - load_next_file(); - storage_ = storage_factory_->open_read_only( - *current_file_iterator_, metadata_.storage_identifier); - } - return storage_->has_next(); - } - throw std::runtime_error{"Bag is not open. Call open() before reading."}; -} - std::shared_ptr SequentialCompressionReader::read_next() { if (storage_ && decompressor_) { @@ -134,43 +110,6 @@ std::shared_ptr SequentialCompressionRead throw std::runtime_error{"Bag is not open. Call open() before reading."}; } -std::vector SequentialCompressionReader::get_all_topics_and_types() -{ - if (storage_) { - return storage_->get_all_topics_and_types(); - } - throw std::runtime_error{"Bag is not open. Call open() before reading."}; -} - -void SequentialCompressionReader::set_filter( - const rosbag2_storage::StorageFilter & storage_filter) -{ - if (storage_) { - storage_->set_filter(storage_filter); - return; - } - throw std::runtime_error( - "Bag is not open. Call open() before setting filter."); -} - -void SequentialCompressionReader::reset_filter() -{ - if (storage_) { - storage_->reset_filter(); - } - throw std::runtime_error( - "Bag is not open. Call open() before resetting filter."); -} - -bool SequentialCompressionReader::has_next_file() const -{ - // Handle case where bagfile is not split - if (current_file_iterator_ == file_paths_.end()) { - return false; - } - - return current_file_iterator_ + 1 != file_paths_.end(); -} void SequentialCompressionReader::load_next_file() { @@ -179,7 +118,7 @@ void SequentialCompressionReader::load_next_file() } if (compression_mode_ == rosbag2_compression::CompressionMode::NONE) { - throw std::runtime_error{"Cannot use SequentialCompressionWriter with NONE compression mode."}; + throw std::runtime_error{"Cannot use SequentialCompressionReader with NONE compression mode."}; } ++current_file_iterator_; @@ -195,52 +134,4 @@ void SequentialCompressionReader::load_next_file() *current_file_iterator_ = decompressor_->decompress_uri(get_current_file()); } } - -std::string SequentialCompressionReader::get_current_uri() const -{ - const auto current_file = get_current_file(); - const auto current_uri = rcpputils::fs::remove_extension(current_file); - return current_uri.string(); -} - -std::string SequentialCompressionReader::get_current_file() const -{ - return *current_file_iterator_; -} - -void SequentialCompressionReader::set_current_file(const std::string & file) -{ - *current_file_iterator_ = file; -} - -void SequentialCompressionReader::check_topics_serialization_formats( - const std::vector & topics) -{ - const auto & storage_serialization_format = - topics[0].topic_metadata.serialization_format; - - for (const auto & topic : topics) { - if (topic.topic_metadata.serialization_format != storage_serialization_format) { - throw std::runtime_error{ - "Topics with different rwm serialization format have been found. " - "All topics must have the same serialization format."}; - } - } -} - -void SequentialCompressionReader::check_converter_serialization_format( - const std::string & converter_serialization_format, - const std::string & storage_serialization_format) -{ - if (converter_serialization_format != storage_serialization_format) { - converter_ = std::make_unique( - storage_serialization_format, - converter_serialization_format, - converter_factory_); - const auto topics = storage_->get_all_topics_and_types(); - for (const auto & topic_with_type : topics) { - converter_->add_topic(topic_with_type.name, topic_with_type.type); - } - } -} } // namespace rosbag2_compression diff --git a/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp b/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp index beb26023e4..b99d89a85a 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp @@ -101,7 +101,6 @@ class ROSBAG2_CPP_PUBLIC SequentialReader */ virtual void load_next_file(); -private: /** * Checks if all topics in the bagfile have the same RMW serialization format. * Currently a bag file can only be played if all topics have the same serialization format. From f2fea4247fdcfa85ae4e8a3e95400c085aa83399 Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Mon, 13 Apr 2020 18:22:55 -0700 Subject: [PATCH 2/2] Make converter_factory_ private Signed-off-by: Emerson Knapp --- rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp | 4 +++- rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp b/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp index b99d89a85a..4010fccfea 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp @@ -126,13 +126,15 @@ class ROSBAG2_CPP_PUBLIC SequentialReader const std::string & storage_serialization_format); std::unique_ptr storage_factory_{}; - std::shared_ptr converter_factory_{}; std::shared_ptr storage_{}; std::unique_ptr converter_{}; std::unique_ptr metadata_io_{}; rosbag2_storage::BagMetadata metadata_{}; std::vector file_paths_{}; // List of database files. std::vector::iterator current_file_iterator_{}; // Index of file to read from + +private: + std::shared_ptr converter_factory_{}; }; } // namespace readers diff --git a/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp b/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp index 18740e5d39..6e43017e1b 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp @@ -61,9 +61,9 @@ SequentialReader::SequentialReader( std::shared_ptr converter_factory, std::unique_ptr metadata_io) : storage_factory_(std::move(storage_factory)), - converter_factory_(std::move(converter_factory)), converter_(nullptr), - metadata_io_(std::move(metadata_io)) + metadata_io_(std::move(metadata_io)), + converter_factory_(std::move(converter_factory)) {} SequentialReader::~SequentialReader()