Skip to content

Commit

Permalink
Add pluggable compression
Browse files Browse the repository at this point in the history
  • Loading branch information
lucagiac81 committed Nov 3, 2022
1 parent 59ea03f commit f316b22
Show file tree
Hide file tree
Showing 32 changed files with 1,037 additions and 92 deletions.
2 changes: 1 addition & 1 deletion db/blob/blob_file_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@

#include "rocksdb/advanced_options.h"
#include "rocksdb/compression_type.h"
#include "rocksdb/compressor.h"
#include "rocksdb/env.h"
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/types.h"
#include "util/compressor.h"

namespace ROCKSDB_NAMESPACE {

Expand Down
1 change: 0 additions & 1 deletion db/blob/blob_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "rocksdb/compression_type.h"
#include "rocksdb/rocksdb_namespace.h"
#include "util/autovector.h"
#include "util/compressor.h"

namespace ROCKSDB_NAMESPACE {

Expand Down
2 changes: 1 addition & 1 deletion db/blob/blob_read_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
#include <cinttypes>

#include "rocksdb/compression_type.h"
#include "rocksdb/compressor.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "util/autovector.h"
#include "util/compressor.h"

namespace ROCKSDB_NAMESPACE {

Expand Down
2 changes: 1 addition & 1 deletion db/blob/blob_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
#include "db/blob/blob_file_cache.h"
#include "db/blob/blob_read_request.h"
#include "rocksdb/cache.h"
#include "rocksdb/compressor.h"
#include "rocksdb/rocksdb_namespace.h"
#include "table/block_based/cachable_entry.h"
#include "util/autovector.h"
#include "util/compressor.h"

namespace ROCKSDB_NAMESPACE {

Expand Down
2 changes: 1 addition & 1 deletion db/compact_files_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@

#include "db/db_impl/db_impl.h"
#include "port/port.h"
#include "rocksdb/compressor.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
#include "util/cast_util.h"
#include "util/compressor.h"
#include "util/string_util.h"

namespace ROCKSDB_NAMESPACE {
Expand Down
2 changes: 1 addition & 1 deletion db/db_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "db/db_test_util.h"
#include "options/options_helper.h"
#include "port/stack_trace.h"
#include "rocksdb/compressor.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/merge_operator.h"
Expand All @@ -23,7 +24,6 @@
#if !defined(ROCKSDB_LITE)
#include "test_util/sync_point.h"
#endif
#include "util/compressor.h"
#include "util/file_checksum_helper.h"
#include "util/random.h"
#include "utilities/counted_fs.h"
Expand Down
2 changes: 1 addition & 1 deletion db/db_block_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
#include "db/db_test_util.h"
#include "env/unique_id_gen.h"
#include "port/stack_trace.h"
#include "rocksdb/compressor.h"
#include "rocksdb/persistent_cache.h"
#include "rocksdb/statistics.h"
#include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/unique_id_impl.h"
#include "util/compressor.h"
#include "util/defer.h"
#include "util/hash.h"
#include "util/math.h"
Expand Down
9 changes: 8 additions & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,17 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
#endif // !ROCKSDB_LITE

// Supported wal compression types
if (!StreamingCompressionTypeSupported(result.wal_compression)) {
CompressionType wal_compression = result.wal_compression;
if (result.wal_compressor != nullptr) {
wal_compression = result.wal_compressor->GetCompressionType();
}
if (!StreamingCompressionTypeSupported(wal_compression)) {
result.wal_compressor = BuiltinCompressor::GetCompressor(kNoCompression);
result.wal_compression = kNoCompression;
ROCKS_LOG_WARN(result.info_log,
"wal_compression is disabled since only zstd is supported");
} else if (result.wal_compressor == nullptr) {
result.wal_compressor = BuiltinCompressor::GetCompressor(wal_compression);
}

if (!result.paranoid_checks) {
Expand Down
27 changes: 26 additions & 1 deletion db/db_options_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,8 @@ TEST_F(DBOptionsTest, ChangeCompression) {
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
Compaction* c = reinterpret_cast<Compaction*>(arg);
compression_used = c->output_compressor()->GetCompressionType();
compression_opt_used =
*(c->output_compressor()->GetOptions<CompressionOptions>());
compacted = true;
});
SyncPoint::GetInstance()->EnableProcessing();
Expand All @@ -1130,7 +1132,8 @@ TEST_F(DBOptionsTest, ChangeCompression) {
compression_used = CompressionType::kLZ4Compression;
compacted = false;
ASSERT_OK(dbfull()->SetOptions(
{{"bottommost_compression", "kSnappyCompression"},
{{"bottommost_compressor", "nullptr"},
{"bottommost_compression", "kSnappyCompression"},
{"bottommost_compression_opts", "0:6:0:0:4:true"}}));
ASSERT_OK(Put("foo", "foofoofoo"));
ASSERT_OK(Put("bar", "foofoofoo"));
Expand All @@ -1141,8 +1144,30 @@ TEST_F(DBOptionsTest, ChangeCompression) {
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_TRUE(compacted);
ASSERT_EQ(CompressionType::kSnappyCompression, compression_used);
// Snappy compressor does not define level option. Default is returned.
ASSERT_EQ(32767, compression_opt_used.level);
// Right now parallel_level is not yet allowed to be changed.

if (!Zlib_Supported()) {
return;
}
compression_used = CompressionType::kLZ4Compression;
compacted = false;
ASSERT_OK(dbfull()->SetOptions(
{{"bottommost_compressor", "nullptr"},
{"bottommost_compression", "kZlibCompression"},
{"bottommost_compression_opts", "0:6:0:0:4:true"}}));
ASSERT_OK(Put("foo", "foofoofoo"));
ASSERT_OK(Put("bar", "foofoofoo"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo", "foofoofoo"));
ASSERT_OK(Put("bar", "foofoofoo"));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_TRUE(compacted);
ASSERT_EQ(CompressionType::kZlibCompression, compression_used);
ASSERT_EQ(6, compression_opt_used.level);

SyncPoint::GetInstance()->DisableProcessing();
}

Expand Down
2 changes: 1 addition & 1 deletion db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "options/options_helper.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/compressor.h"
#include "rocksdb/experimental.h"
#include "rocksdb/iostats_context.h"
#include "rocksdb/persistent_cache.h"
Expand All @@ -25,7 +26,6 @@
#include "rocksdb/utilities/replayer.h"
#include "rocksdb/wal_filter.h"
#include "test_util/testutil.h"
#include "util/compressor.h"
#include "util/random.h"
#include "utilities/fault_injection_env.h"

Expand Down
27 changes: 27 additions & 0 deletions include/rocksdb/advanced_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

namespace ROCKSDB_NAMESPACE {

class Compressor;
class Slice;
class SliceTransform;
class TablePropertiesCollectorFactory;
Expand Down Expand Up @@ -532,6 +533,19 @@ struct AdvancedColumnFamilyOptions {
// Dynamically changeable through SetOptions() API
std::vector<CompressionType> compression_per_level;

// Similar to compression_per_level, but the algorithms are encapsulated in
// Compressor objects. This adds the ability to select custom compressors,
// beyond the built-in ones provided through CompressionType.
//
// If compressor_per_level is specified (not empty), it overrides
// compression_per_level.
//
// If compressor_per_level is not specified (empty),
// compression_per_level is applied as described for that option.
//
// Default: empty
std::vector<std::shared_ptr<Compressor>> compressor_per_level;

// Number of levels for this database
int num_levels = 7;

Expand Down Expand Up @@ -994,6 +1008,19 @@ struct AdvancedColumnFamilyOptions {
// Dynamically changeable through the SetOptions() API
CompressionType blob_compression_type = kNoCompression;

// Similar to blob_compression_type, but the algorithm is encapsulated in a
// Compressor class. This adds the ability to select plugin compressors,
// beyond the built-in ones provided through CompressionType.
//
// If blob_compressor is specified (not null), it overrides
// blob_compression_type.
//
// If blob_compressor is not specified (null), blob_compression_type is
// applied.
//
// Default: nullptr
std::shared_ptr<Compressor> blob_compressor = nullptr;

// Enables garbage collection of blobs. Blob GC is performed as part of
// compaction. Valid blobs residing in blob files older than a cutoff get
// relocated to new files as they are encountered during compaction, which
Expand Down
4 changes: 4 additions & 0 deletions include/rocksdb/compression_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ enum CompressionType : unsigned char {
// eventually remove the option from the public API.
kZSTDNotFinalCompression = 0x40,

// Used as block compression type identifier when using a non built-in
// compressor.
kPluginCompression = 0x41,

// kDisableCompressionOption is used to disable some compression options.
kDisableCompressionOption = 0xff,
};
Expand Down
4 changes: 3 additions & 1 deletion util/compressor.h → include/rocksdb/compressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ class Compressor : public Customizable {
static std::vector<std::string> GetDictSupported();

// Get the numeric type associated with this compressor
virtual CompressionType GetCompressionType() const = 0;
virtual CompressionType GetCompressionType() const {
return kPluginCompression;
};

// Whether the compressor is supported.
// For example, a compressor can implement this method to verify its
Expand Down
56 changes: 56 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,20 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
// Dynamically changeable through SetOptions() API
CompressionType compression;

// Similar to compression, but the algorithm is encapsulated in a Compressor
// class. This adds the ability to select plugin compressors, beyond the
// built-in ones provided through CompressionType.
//
// If compressor is specified (not null), it overrides
// compression/compression_opts (the compressor includes values for its
// options).
//
// If compressor is not specified (null), compression/compression_opts are
// applied as described for those options.
//
// Default: nullptr
std::shared_ptr<Compressor> compressor = nullptr;

// Compression algorithm that will be used for the bottommost level that
// contain files. The behavior for num_levels = 1 is not well defined.
// Right now, with num_levels = 1, all compaction outputs will use
Expand All @@ -223,6 +237,22 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
// Default: kDisableCompressionOption (Disabled)
CompressionType bottommost_compression = kDisableCompressionOption;

// Similar to bottommost_compression, but the algorithm is encapsulated in a
// Compressor object. This adds the ability to select custom compressors,
// beyond the built-in ones provided through CompressionType.
//
// If bottommost_compressor is specified (not null), it overrides
// bottommost_compression/compression_opts/bottommost_compression_opts (the
// compressor includes values for its options).
//
// If bottommost_compressor is not specified (null),
// bottommost_compression/compression_opts/bottommost_compression_opts are
// applied as described for those options.
//
// Default: nullptr (equivalent to bottommost_compression =
// kDisableCompressionOption)
std::shared_ptr<Compressor> bottommost_compressor = nullptr;

// different options for compression algorithms used by bottommost_compression
// if it is enabled. To enable it, please see the definition of
// CompressionOptions. Behavior for num_levels = 1 is the same as
Expand Down Expand Up @@ -1236,6 +1266,19 @@ struct DBOptions {
// versions regardless of the wal_compression settings.
CompressionType wal_compression = kNoCompression;

// Similar to wal_compression, but the algorithm is encapsulated in a
// Compressor class. This adds the ability to select plugin compressors,
// beyond the built-in ones provided through CompressionType.
//
// If wal_compressor is specified (not null), it overrides
// wal_compression.
//
// If wal_compressor is not specified (null), wal_compression is
// applied.
//
// Default: nullptr
std::shared_ptr<Compressor> wal_compressor = nullptr;

// If true, RocksDB supports flushing multiple column families and committing
// their results atomically to MANIFEST. Note that it is not
// necessary to set atomic_flush to true if WAL is always enabled since WAL
Expand Down Expand Up @@ -1821,6 +1864,18 @@ struct CompactionOptions {
// according to the `ColumnFamilyOptions`, taking into account the output
// level if `compression_per_level` is specified.
CompressionType compression;

// Similar to compression, but the algorithm is encapsulated in a Compressor
// class. This adds the ability to select plugin compressors, beyond the
// built-in ones provided through CompressionType.
//
// If compressor is specified (not nullptr), it overrides compression.
//
// If compressor is not specified (nullptr), compression is applied.
//
// Default: nullptr
std::shared_ptr<Compressor> compressor;

// Compaction will create files of size `output_file_size_limit`.
// Default: MAX, which means that compaction will create a single file
uint64_t output_file_size_limit;
Expand All @@ -1829,6 +1884,7 @@ struct CompactionOptions {

CompactionOptions()
: compression(kSnappyCompression),
compressor(nullptr),
output_file_size_limit(std::numeric_limits<uint64_t>::max()),
max_subcompactions(0) {}
};
Expand Down
Loading

0 comments on commit f316b22

Please sign in to comment.