Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Compression - 9] Enable writer to compress files/messages #250

Closed
wants to merge 9 commits into from

Conversation

piraka9011
Copy link
Contributor

@piraka9011 piraka9011 commented Jan 8, 2020

This PR adds compression options to the rosbag2_cpp SequentialWriter to support compression by file and message.
Compression options are used in the rosbag2_transport package to propagate the compression CLI flags.

Changes

  • Add a function to initialize a compressor.
  • Add a compression options struct.
  • Change signature of writer's open() method to support compression options.
  • Unit tests for compression.

Dependencies

@piraka9011 piraka9011 force-pushed the compression/writer branch 3 times, most recently from 7faf7a2 to fcba9c2 Compare January 11, 2020 00:32
{
if (compressor_ && compression_mode_ == CompressionMode::FILE) {
// Get the uri of the last file and pop the uri
const auto uncompressed_uri = metadata_.relative_file_paths.back();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how you guarantee metadata_.relative_file_paths.back() isn't already an URI to a compressed file. For example, split_bagfile() and reset() are called right after the other.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a helper function to check if the URI is already compressed (The extension matches the compression identifier)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The happy case seems to be: split_bagfile() is called, the next rosbag starts being recorded a new element is pushed back on metadata_.relative_file_paths, then recording is stopped and reset() is called.

The bad case is: split_bagfile() is called, then recording is stopped and reset() is called. This still feels like a pretty normal situation.

It's probably better to have a flag that tracks whether metadata_.relative_file_paths.back() has been compressed already or not, which gets set when a new rosbag starts and a when the current rosbag ends (I can't tell if this needs to be also threadsafe or not). Just checking the extension doesn't scale when you have more compression algorithms / identifiers available.

Copy link
Contributor Author

@piraka9011 piraka9011 Jan 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edit:

I realized there's three cases:

  1. No splitting: Compression always occurs at reset()
  2. Multiple Split bags: Compression occurs on split_bagfile() and then during reset() after recording but before a split occurs.
  3. Splitting specified but no bags split: Splitting is specified but split_bagfile() is never called. Compression should occur in reset().

However, split_bagfile() always adds a new file to the end so eventually, it should always be compressed on reset().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, split_bagfile() always adds a new file to the end so eventually, it should always be compressed on reset().

Sorry, I missed the metadata_.relative_file_paths.push_back() in split_bagfile(), so I thought it was happening elsewhere.

However, I also noticed in split_bagfile() that you're potentially throwing between compress_last_file() and metadata_.relative_file_paths.push_back(), so you can still potentially end up with metadata_.relative_file_paths.back() being a file that's already compressed.

I'm not sure how such a thrown exception will be handled. How do you guarantee that the next time you enter split_bagfile(), metadata_.relative_file_paths.back() is not already compressed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I also noticed in split_bagfile() that you're potentially throwing between compress_last_file() and metadata_.relative_file_paths.push_back(), so you can still potentially end up with metadata_.relative_file_paths.back() being a file that's already compressed.

That's correct, it seems a bool flag for that small section that could throw is required to make sure we don't double compress.

@zmichaels11 zmichaels11 self-requested a review January 11, 2020 20:14
Copy link
Contributor

@zmichaels11 zmichaels11 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accidentally approved; requesting changes

@piraka9011 piraka9011 force-pushed the compression/writer branch 2 times, most recently from fc4a5ca to 7a2a087 Compare January 13, 2020 19:12
@piraka9011 piraka9011 marked this pull request as ready for review January 13, 2020 19:15
@piraka9011 piraka9011 changed the title [WIP] [Compression - 9] Enable writer to compress files/messages [Compression - 9] Enable writer to compress files/messages Jan 13, 2020
Anas Abou Allaban and others added 6 commits January 13, 2020 11:28
…write docs, fix styles

Signed-off-by: Anas Abou Allaban <[email protected]>
Signed-off-by: Anas Abou Allaban <[email protected]>
Signed-off-by: Anas Abou Allaban <[email protected]>
Copy link
Contributor

@dabonnie dabonnie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need to review the tests.

Signed-off-by: Anas Abou Allaban <[email protected]>
Signed-off-by: Anas Abou Allaban <[email protected]>
Copy link
Collaborator

@Karsten1987 Karsten1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am requesting changes because of the hard dependency to zstd. It should be possible to compile rosbag2_cpp (and it's stack below) without any (hardcoded) compression algorithm.

#include "rosbag2_cpp/converter.hpp"
#include "rosbag2_cpp/serialization_format_converter_factory.hpp"
#include "rosbag2_cpp/storage_options.hpp"
#include "rosbag2_cpp/writer_interfaces/base_writer_interface.hpp"
#include "rosbag2_cpp/visibility_control.hpp"

#include "rosbag2_compression/base_compressor_interface.hpp"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might want to consider a new line after this to group the includes on a per-package basis.

*
* \throws runtime_error If the compression implementation does not exist.
*/
virtual void init_compression(const CompressionOptions & compression_options);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if the bag is already opened with a set of specific compression options and then this function is called with a different, potentially conflicting, set of it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is only called in open() and is a protected method (for unit testing) so we don't expect to call it anywhere else.
We could simply move the functionality into open() only but that might make the function too large.

#include "rcutils/filesystem.h"
#include "rosbag2_compression/zstd_compressor.hpp"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new line above to group per package.

{
if (compression_options.compression_mode != rosbag2_cpp::CompressionMode::NONE) {
if (compression_options.compression_format == "zstd") {
compressor_ = std::make_unique<rosbag2_compression::ZstdCompressor>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am having a problem with this line as it's imposing a hard dependency on Zstd and all it's dependencies.
rosbag2_cpp should potentially be compiled without any compression algorithm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have this issue on our backlog next to make compression a plugin instead so that its not hardcoded.

Do you have any other suggestions in the meantime on how to avoid the hard dependency?
Maybe something along the lines of:

auto zstd_compressor = std::make_unique<rosbag2_compression::ZstdCompressor>();
compressor_ = std::make_unique<rosbag2_compression::BaseCompressorInterface>(std::move(zstd_compressor));

If not, I suggest we work with the current implementation and then after introducing the plugin changes to rosbag2_compression use a CompressorFactory to make the writer more robust for different compression implementations.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I get your proposition right. We can either use a factory for loading it as a plugin dynamically or as discussed in a previous PR, inherit from the SequentialWriter or the BaseReaderInterface and make it live in the compression package. It can still be passed in by the caller side when instantiating the final Reader class. Example here: https://github.com/ros2/rosbag2/blob/master/rosbag2_transport/src/rosbag2_transport/player.cpp#L44-L46

The moment we include a compression header in the rosbag2_cpp package we introduce the dependency. That being said, the current PR also lacks the dependency of rosbag2_compression in its package.xml.

Copy link
Contributor Author

@piraka9011 piraka9011 Jan 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, you prefer to isolate compression from rosbag2_cpp.
So instead there would be a CompressionWriter in rosbag2_compression that would implement compression by message or file which can be used in the other rosbag2 packages.
However, that means losing the ability to split and compress simultaneously since we're overriding write() of the SequentialWriter.
Moreover, I don't see the issue of introducing compression as a dependency since we'd like to have that feature available for both the API and the CLI. We'd have to introduce it as a dependency at some point.

One option I see is compressing at the transport layer instead, though I'm not sure we want to do that.

Copy link
Collaborator

@Karsten1987 Karsten1987 Jan 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introducing the compression functionality is not the problem. So things like compression_options and general compression related structs might very well be living in this package. However, hard coding ztd is giving people no change to not compile zstd and its vendor package etc. This might not suit very well to every use case.
Just the sqlite3 it is possible to compile rosbag2 without any notion of sqlite3.

Copy link
Contributor Author

@piraka9011 piraka9011 Jan 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, hard coding ztd is giving people no change to not compile zstd and its vendor package etc.

I agree, which is why we want to work on making compression a plugin and work like storage and storage_factory does as opposed to the current implementation which is only Zstd.

However, since this is the first iteration, we want to make sure the whole compression pipeline works with a single compression format before extending it to other formats though a plugin.


P.S. Accidentally edited your comment, my bad.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P.S. Accidentally edited your comment, my bad.

That's one way to steer the direction of aa conversation ;-)

However, since this is the first iteration, we want to make sure the whole compression pipeline works with a single compression format before extending it to other formats though a plugin.

That's fine. I think having a compression writer/reader implementation within rosbag2_compression is then the right way to go. It gives you the freedom to test your compression pipeline without introducing a dependency to zstd in rosbag2_cpp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having a compression writer/reader implementation within rosbag2_compression is then the right way to go.

What about splitting then? Basically implementing a SequentialCompressionWriter?
If that's the case we can consider going down that route instead for now.

Signed-off-by: Anas Abou Allaban <[email protected]>
@zmichaels11
Copy link
Contributor

zmichaels11 commented Jan 22, 2020

@zmichaels11 zmichaels11 deleted the compression/writer branch January 22, 2020 18:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants