Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplicate code in SequentialCompressionReader #372

Merged
merged 2 commits into from
Apr 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include "rosbag2_compression/compression_options.hpp"

#include "rosbag2_cpp/converter.hpp"
#include "rosbag2_cpp/reader_interfaces/base_reader_interface.hpp"
#include "rosbag2_cpp/readers/sequential_reader.hpp"
#include "rosbag2_cpp/serialization_format_converter_factory.hpp"
#include "rosbag2_cpp/serialization_format_converter_factory_interface.hpp"

Expand All @@ -47,7 +47,7 @@ namespace rosbag2_compression
{

class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionReader
: public ::rosbag2_cpp::reader_interfaces::BaseReaderInterface
: public rosbag2_cpp::readers::SequentialReader
{
public:
explicit SequentialCompressionReader(
Expand All @@ -66,36 +66,8 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionReader
const rosbag2_cpp::StorageOptions & storage_options,
const rosbag2_cpp::ConverterOptions & converter_options) override;

void reset() override;

bool has_next() override;

std::shared_ptr<rosbag2_storage::SerializedBagMessage> read_next() override;

std::vector<rosbag2_storage::TopicMetadata> get_all_topics_and_types() override;

void set_filter(const rosbag2_storage::StorageFilter & storage_filter) override;

void reset_filter() override;

/**
* Ask whether there is another database file to read from the list of relative
* file paths.
*
* \return true if there are still files to read in the list
*/
virtual bool has_next_file() const;

/**
* Return the relative file path pointed to by the current file iterator.
*/
virtual std::string get_current_file() const;

/**
* Return the URI of the current file (i.e. no extensions).
*/
virtual std::string get_current_uri() const;

protected:
/**
* Increment the current file iterator to point to the next file in the list of relative file
Expand All @@ -104,7 +76,7 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionReader
* Expected usage:
* if (has_next_file()) load_next_file();
*/
virtual void load_next_file();
void load_next_file() override;

/**
* Initializes the decompressor if a compression mode is specified in the metadata.
Expand All @@ -113,46 +85,8 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionReader
*/
virtual void setup_decompression();

/**
* Set the file path currently pointed to by the iterator.
*
* \param file Relative path to the file.
*/
virtual void set_current_file(const std::string & file);

private:
/**
* Checks if all topics in the bagfile have the same RMW serialization format.
* Currently a bag file can only be played if all topics have the same serialization format.
*
* \param topics Vector of TopicInformation with metadata.
* \throws runtime_error if any topic has a different serialization format from the rest.
*/
virtual void check_topics_serialization_formats(
const std::vector<rosbag2_storage::TopicInformation> & topics);

/**
* Checks if the serialization format of the converter factory is the same as that of the storage
* factory.
* If not, changes the serialization format of the converter factory to use the serialization
* format of the storage factory.
*
* \param converter_serialization_format
* \param storage_serialization_format
*/
virtual void check_converter_serialization_format(
const std::string & converter_serialization_format,
const std::string & storage_serialization_format);

std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory_{};
std::shared_ptr<rosbag2_cpp::SerializationFormatConverterFactoryInterface> converter_factory_{};
std::shared_ptr<rosbag2_storage::storage_interfaces::ReadOnlyInterface> storage_{};
std::unique_ptr<rosbag2_cpp::Converter> converter_{};
std::unique_ptr<rosbag2_compression::BaseDecompressorInterface> decompressor_{};
std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io_{};
rosbag2_storage::BagMetadata metadata_{};
std::vector<std::string> file_paths_{}; // List of database files.
std::vector<std::string>::iterator current_file_iterator_{}; // Index of file to read from
rosbag2_compression::CompressionMode compression_mode_{
rosbag2_compression::CompressionMode::NONE};
std::unique_ptr<rosbag2_compression::CompressionFactory> compression_factory_{};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,12 @@ SequentialCompressionReader::SequentialCompressionReader(
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory,
std::shared_ptr<rosbag2_cpp::SerializationFormatConverterFactoryInterface> converter_factory,
std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io)
: storage_factory_{std::move(storage_factory)},
converter_factory_{std::move(converter_factory)},
metadata_io_{std::move(metadata_io)},
: SequentialReader(std::move(storage_factory), converter_factory, std::move(metadata_io)),
compression_factory_{std::move(compression_factory)}
{}

SequentialCompressionReader::~SequentialCompressionReader()
{
reset();
}

void SequentialCompressionReader::reset()
{
storage_.reset();
}
{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

= default?


void SequentialCompressionReader::setup_decompression()
{
Expand All @@ -58,7 +49,7 @@ void SequentialCompressionReader::setup_decompression()
decompressor_ = compression_factory_->create_decompressor(metadata_.compression_format);
// Decompress the first file so that it is readable.
ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM("Decompressing " << get_current_file().c_str());
set_current_file(decompressor_->decompress_uri(get_current_file()));
*current_file_iterator_ = decompressor_->decompress_uri(get_current_file());
} else {
throw std::invalid_argument{
"SequentialCompressionReader requires a CompressionMode that is not NONE!"};
Expand Down Expand Up @@ -107,21 +98,6 @@ void SequentialCompressionReader::open(
topics[0].topic_metadata.serialization_format);
}

bool SequentialCompressionReader::has_next()
{
if (storage_) {
// If there's no new message, check if there's at least another file to read and update storage
// to read from there. Otherwise, check if there's another message.
if (!storage_->has_next() && has_next_file()) {
load_next_file();
storage_ = storage_factory_->open_read_only(
*current_file_iterator_, metadata_.storage_identifier);
}
return storage_->has_next();
}
throw std::runtime_error{"Bag is not open. Call open() before reading."};
}

std::shared_ptr<rosbag2_storage::SerializedBagMessage> SequentialCompressionReader::read_next()
{
if (storage_ && decompressor_) {
Expand All @@ -134,43 +110,6 @@ std::shared_ptr<rosbag2_storage::SerializedBagMessage> SequentialCompressionRead
throw std::runtime_error{"Bag is not open. Call open() before reading."};
}

std::vector<rosbag2_storage::TopicMetadata> SequentialCompressionReader::get_all_topics_and_types()
{
if (storage_) {
return storage_->get_all_topics_and_types();
}
throw std::runtime_error{"Bag is not open. Call open() before reading."};
}

void SequentialCompressionReader::set_filter(
const rosbag2_storage::StorageFilter & storage_filter)
{
if (storage_) {
storage_->set_filter(storage_filter);
return;
}
throw std::runtime_error(
"Bag is not open. Call open() before setting filter.");
}

void SequentialCompressionReader::reset_filter()
{
if (storage_) {
storage_->reset_filter();
}
throw std::runtime_error(
"Bag is not open. Call open() before resetting filter.");
}

bool SequentialCompressionReader::has_next_file() const
{
// Handle case where bagfile is not split
if (current_file_iterator_ == file_paths_.end()) {
return false;
}

return current_file_iterator_ + 1 != file_paths_.end();
}

void SequentialCompressionReader::load_next_file()
{
Expand All @@ -179,7 +118,7 @@ void SequentialCompressionReader::load_next_file()
}

if (compression_mode_ == rosbag2_compression::CompressionMode::NONE) {
throw std::runtime_error{"Cannot use SequentialCompressionWriter with NONE compression mode."};
throw std::runtime_error{"Cannot use SequentialCompressionReader with NONE compression mode."};
}

++current_file_iterator_;
Expand All @@ -195,52 +134,4 @@ void SequentialCompressionReader::load_next_file()
*current_file_iterator_ = decompressor_->decompress_uri(get_current_file());
}
}

std::string SequentialCompressionReader::get_current_uri() const
{
const auto current_file = get_current_file();
const auto current_uri = rcpputils::fs::remove_extension(current_file);
return current_uri.string();
}

std::string SequentialCompressionReader::get_current_file() const
{
return *current_file_iterator_;
}

void SequentialCompressionReader::set_current_file(const std::string & file)
{
*current_file_iterator_ = file;
}

void SequentialCompressionReader::check_topics_serialization_formats(
const std::vector<rosbag2_storage::TopicInformation> & topics)
{
const auto & storage_serialization_format =
topics[0].topic_metadata.serialization_format;

for (const auto & topic : topics) {
if (topic.topic_metadata.serialization_format != storage_serialization_format) {
throw std::runtime_error{
"Topics with different rwm serialization format have been found. "
"All topics must have the same serialization format."};
}
}
}

void SequentialCompressionReader::check_converter_serialization_format(
const std::string & converter_serialization_format,
const std::string & storage_serialization_format)
{
if (converter_serialization_format != storage_serialization_format) {
converter_ = std::make_unique<rosbag2_cpp::Converter>(
storage_serialization_format,
converter_serialization_format,
converter_factory_);
const auto topics = storage_->get_all_topics_and_types();
for (const auto & topic_with_type : topics) {
converter_->add_topic(topic_with_type.name, topic_with_type.type);
}
}
}
} // namespace rosbag2_compression
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ class ROSBAG2_CPP_PUBLIC SequentialReader
*/
virtual void load_next_file();

private:
emersonknapp marked this conversation as resolved.
Show resolved Hide resolved
/**
* Checks if all topics in the bagfile have the same RMW serialization format.
* Currently a bag file can only be played if all topics have the same serialization format.
Expand All @@ -127,13 +126,15 @@ class ROSBAG2_CPP_PUBLIC SequentialReader
const std::string & storage_serialization_format);

std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory_{};
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory_{};
std::shared_ptr<rosbag2_storage::storage_interfaces::ReadOnlyInterface> storage_{};
std::unique_ptr<Converter> converter_{};
std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io_{};
rosbag2_storage::BagMetadata metadata_{};
std::vector<std::string> file_paths_{}; // List of database files.
std::vector<std::string>::iterator current_file_iterator_{}; // Index of file to read from

private:
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory_{};
Comment on lines +136 to +137
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from all the things which moved into the protected section, why not the converter_factory? I am not proposing otherwise, just asking for the rationale.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every other previously-private member is currently referenced directly in the SequentialCompressionReader subclass. We can probably clean it up a little so that more can be private, but my intention with this was to go for the minimum necessary set of visibility increase. Which happened to just be everything except this

};

} // namespace readers
Expand Down
4 changes: 2 additions & 2 deletions rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ SequentialReader::SequentialReader(
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory,
std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io)
: storage_factory_(std::move(storage_factory)),
converter_factory_(std::move(converter_factory)),
converter_(nullptr),
metadata_io_(std::move(metadata_io))
metadata_io_(std::move(metadata_io)),
converter_factory_(std::move(converter_factory))
{}

SequentialReader::~SequentialReader()
Expand Down