From f316b229298a1d1fd61f2f29729258686aca2d36 Mon Sep 17 00:00:00 2001 From: Luca Giacchino Date: Mon, 14 Feb 2022 18:57:08 -0800 Subject: [PATCH] Add pluggable compression --- db/blob/blob_file_builder.h | 2 +- db/blob/blob_file_reader.h | 1 - db/blob/blob_read_request.h | 2 +- db/blob/blob_source.h | 2 +- db/compact_files_test.cc | 2 +- db/db_basic_test.cc | 2 +- db/db_block_cache_test.cc | 2 +- db/db_impl/db_impl_open.cc | 9 +- db/db_options_test.cc | 27 +- db/db_test2.cc | 2 +- include/rocksdb/advanced_options.h | 27 + include/rocksdb/compression_type.h | 4 + {util => include/rocksdb}/compressor.h | 4 +- include/rocksdb/options.h | 56 ++ options/cf_options.cc | 66 +- options/db_options.cc | 6 + options/db_options.h | 1 - options/options.cc | 2 + options/options_helper.cc | 5 + options/options_settable_test.cc | 10 + .../block_based/block_based_table_builder.cc | 13 +- .../block_based_table_reader_test.cc | 3 +- table/block_fetcher_test.cc | 2 +- table/format.cc | 2 +- table/format.h | 1 + test_util/testutil.cc | 2 +- tools/db_bench_tool.cc | 148 ++-- tools/sst_dump_test.cc | 2 +- util/compression.h | 5 +- util/compression_test.cc | 707 +++++++++++++++++- util/compressor.cc | 11 +- utilities/blob_db/blob_dump_tool.h | 1 - 32 files changed, 1037 insertions(+), 92 deletions(-) rename {util => include/rocksdb}/compressor.h (99%) diff --git a/db/blob/blob_file_builder.h b/db/blob/blob_file_builder.h index 86bf359c8cd..f41d2f0d1fc 100644 --- a/db/blob/blob_file_builder.h +++ b/db/blob/blob_file_builder.h @@ -12,10 +12,10 @@ #include "rocksdb/advanced_options.h" #include "rocksdb/compression_type.h" +#include "rocksdb/compressor.h" #include "rocksdb/env.h" #include "rocksdb/rocksdb_namespace.h" #include "rocksdb/types.h" -#include "util/compressor.h" namespace ROCKSDB_NAMESPACE { diff --git a/db/blob/blob_file_reader.h b/db/blob/blob_file_reader.h index f9414f4c0dc..3b8764c3432 100644 --- a/db/blob/blob_file_reader.h +++ b/db/blob/blob_file_reader.h @@ -13,7 +13,6 @@ #include "rocksdb/compression_type.h" #include "rocksdb/rocksdb_namespace.h" #include "util/autovector.h" -#include "util/compressor.h" namespace ROCKSDB_NAMESPACE { diff --git a/db/blob/blob_read_request.h b/db/blob/blob_read_request.h index 94bffb9c745..a8085ebd1ef 100644 --- a/db/blob/blob_read_request.h +++ b/db/blob/blob_read_request.h @@ -8,10 +8,10 @@ #include #include "rocksdb/compression_type.h" +#include "rocksdb/compressor.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" #include "util/autovector.h" -#include "util/compressor.h" namespace ROCKSDB_NAMESPACE { diff --git a/db/blob/blob_source.h b/db/blob/blob_source.h index 347330159e0..1195d2bbcbb 100644 --- a/db/blob/blob_source.h +++ b/db/blob/blob_source.h @@ -13,10 +13,10 @@ #include "db/blob/blob_file_cache.h" #include "db/blob/blob_read_request.h" #include "rocksdb/cache.h" +#include "rocksdb/compressor.h" #include "rocksdb/rocksdb_namespace.h" #include "table/block_based/cachable_entry.h" #include "util/autovector.h" -#include "util/compressor.h" namespace ROCKSDB_NAMESPACE { diff --git a/db/compact_files_test.cc b/db/compact_files_test.cc index 8cc51b7e8ed..1a5d30a00e7 100644 --- a/db/compact_files_test.cc +++ b/db/compact_files_test.cc @@ -12,12 +12,12 @@ #include "db/db_impl/db_impl.h" #include "port/port.h" +#include "rocksdb/compressor.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "test_util/sync_point.h" #include "test_util/testharness.h" #include "util/cast_util.h" -#include "util/compressor.h" #include "util/string_util.h" namespace ROCKSDB_NAMESPACE { diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 7ffec5a62b6..0153d5092d6 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -12,6 +12,7 @@ #include "db/db_test_util.h" #include "options/options_helper.h" #include "port/stack_trace.h" +#include "rocksdb/compressor.h" #include "rocksdb/filter_policy.h" #include "rocksdb/flush_block_policy.h" #include "rocksdb/merge_operator.h" @@ -23,7 +24,6 @@ #if !defined(ROCKSDB_LITE) #include "test_util/sync_point.h" #endif -#include "util/compressor.h" #include "util/file_checksum_helper.h" #include "util/random.h" #include "utilities/counted_fs.h" diff --git a/db/db_block_cache_test.cc b/db/db_block_cache_test.cc index 59ea69d47db..32934cda97f 100644 --- a/db/db_block_cache_test.cc +++ b/db/db_block_cache_test.cc @@ -20,13 +20,13 @@ #include "db/db_test_util.h" #include "env/unique_id_gen.h" #include "port/stack_trace.h" +#include "rocksdb/compressor.h" #include "rocksdb/persistent_cache.h" #include "rocksdb/statistics.h" #include "rocksdb/table.h" #include "rocksdb/table_properties.h" #include "table/block_based/block_based_table_reader.h" #include "table/unique_id_impl.h" -#include "util/compressor.h" #include "util/defer.h" #include "util/hash.h" #include "util/math.h" diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index a7c184d5f74..9386c43cc5b 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -199,10 +199,17 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src, #endif // !ROCKSDB_LITE // Supported wal compression types - if (!StreamingCompressionTypeSupported(result.wal_compression)) { + CompressionType wal_compression = result.wal_compression; + if (result.wal_compressor != nullptr) { + wal_compression = result.wal_compressor->GetCompressionType(); + } + if (!StreamingCompressionTypeSupported(wal_compression)) { + result.wal_compressor = BuiltinCompressor::GetCompressor(kNoCompression); result.wal_compression = kNoCompression; ROCKS_LOG_WARN(result.info_log, "wal_compression is disabled since only zstd is supported"); + } else if (result.wal_compressor == nullptr) { + result.wal_compressor = BuiltinCompressor::GetCompressor(wal_compression); } if (!result.paranoid_checks) { diff --git a/db/db_options_test.cc b/db/db_options_test.cc index 9160924e56e..780a3dfc37c 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -1110,6 +1110,8 @@ TEST_F(DBOptionsTest, ChangeCompression) { "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) { Compaction* c = reinterpret_cast(arg); compression_used = c->output_compressor()->GetCompressionType(); + compression_opt_used = + *(c->output_compressor()->GetOptions()); compacted = true; }); SyncPoint::GetInstance()->EnableProcessing(); @@ -1130,7 +1132,8 @@ TEST_F(DBOptionsTest, ChangeCompression) { compression_used = CompressionType::kLZ4Compression; compacted = false; ASSERT_OK(dbfull()->SetOptions( - {{"bottommost_compression", "kSnappyCompression"}, + {{"bottommost_compressor", "nullptr"}, + {"bottommost_compression", "kSnappyCompression"}, {"bottommost_compression_opts", "0:6:0:0:4:true"}})); ASSERT_OK(Put("foo", "foofoofoo")); ASSERT_OK(Put("bar", "foofoofoo")); @@ -1141,8 +1144,30 @@ TEST_F(DBOptionsTest, ChangeCompression) { ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_TRUE(compacted); ASSERT_EQ(CompressionType::kSnappyCompression, compression_used); + // Snappy compressor does not define level option. Default is returned. + ASSERT_EQ(32767, compression_opt_used.level); // Right now parallel_level is not yet allowed to be changed. + if (!Zlib_Supported()) { + return; + } + compression_used = CompressionType::kLZ4Compression; + compacted = false; + ASSERT_OK(dbfull()->SetOptions( + {{"bottommost_compressor", "nullptr"}, + {"bottommost_compression", "kZlibCompression"}, + {"bottommost_compression_opts", "0:6:0:0:4:true"}})); + ASSERT_OK(Put("foo", "foofoofoo")); + ASSERT_OK(Put("bar", "foofoofoo")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("foo", "foofoofoo")); + ASSERT_OK(Put("bar", "foofoofoo")); + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_TRUE(compacted); + ASSERT_EQ(CompressionType::kZlibCompression, compression_used); + ASSERT_EQ(6, compression_opt_used.level); + SyncPoint::GetInstance()->DisableProcessing(); } diff --git a/db/db_test2.cc b/db/db_test2.cc index 5d7bd22f417..d84f90a3e5e 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -17,6 +17,7 @@ #include "options/options_helper.h" #include "port/port.h" #include "port/stack_trace.h" +#include "rocksdb/compressor.h" #include "rocksdb/experimental.h" #include "rocksdb/iostats_context.h" #include "rocksdb/persistent_cache.h" @@ -25,7 +26,6 @@ #include "rocksdb/utilities/replayer.h" #include "rocksdb/wal_filter.h" #include "test_util/testutil.h" -#include "util/compressor.h" #include "util/random.h" #include "utilities/fault_injection_env.h" diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index b41e50ca1b4..4f0c84cd1af 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -17,6 +17,7 @@ namespace ROCKSDB_NAMESPACE { +class Compressor; class Slice; class SliceTransform; class TablePropertiesCollectorFactory; @@ -532,6 +533,19 @@ struct AdvancedColumnFamilyOptions { // Dynamically changeable through SetOptions() API std::vector compression_per_level; + // Similar to compression_per_level, but the algorithms are encapsulated in + // Compressor objects. This adds the ability to select custom compressors, + // beyond the built-in ones provided through CompressionType. + // + // If compressor_per_level is specified (not empty), it overrides + // compression_per_level. + // + // If compressor_per_level is not specified (empty), + // compression_per_level is applied as described for that option. + // + // Default: empty + std::vector> compressor_per_level; + // Number of levels for this database int num_levels = 7; @@ -994,6 +1008,19 @@ struct AdvancedColumnFamilyOptions { // Dynamically changeable through the SetOptions() API CompressionType blob_compression_type = kNoCompression; + // Similar to blob_compression_type, but the algorithm is encapsulated in a + // Compressor class. This adds the ability to select plugin compressors, + // beyond the built-in ones provided through CompressionType. + // + // If blob_compressor is specified (not null), it overrides + // blob_compression_type. + // + // If blob_compressor is not specified (null), blob_compression_type is + // applied. + // + // Default: nullptr + std::shared_ptr blob_compressor = nullptr; + // Enables garbage collection of blobs. Blob GC is performed as part of // compaction. Valid blobs residing in blob files older than a cutoff get // relocated to new files as they are encountered during compaction, which diff --git a/include/rocksdb/compression_type.h b/include/rocksdb/compression_type.h index bfeb00bdef0..a02ff53d330 100644 --- a/include/rocksdb/compression_type.h +++ b/include/rocksdb/compression_type.h @@ -33,6 +33,10 @@ enum CompressionType : unsigned char { // eventually remove the option from the public API. kZSTDNotFinalCompression = 0x40, + // Used as block compression type identifier when using a non built-in + // compressor. + kPluginCompression = 0x41, + // kDisableCompressionOption is used to disable some compression options. kDisableCompressionOption = 0xff, }; diff --git a/util/compressor.h b/include/rocksdb/compressor.h similarity index 99% rename from util/compressor.h rename to include/rocksdb/compressor.h index 9442b215435..5366909c679 100644 --- a/util/compressor.h +++ b/include/rocksdb/compressor.h @@ -157,7 +157,9 @@ class Compressor : public Customizable { static std::vector GetDictSupported(); // Get the numeric type associated with this compressor - virtual CompressionType GetCompressionType() const = 0; + virtual CompressionType GetCompressionType() const { + return kPluginCompression; + }; // Whether the compressor is supported. // For example, a compressor can implement this method to verify its diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 3ba13d12993..932cf8328c9 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -214,6 +214,20 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions { // Dynamically changeable through SetOptions() API CompressionType compression; + // Similar to compression, but the algorithm is encapsulated in a Compressor + // class. This adds the ability to select plugin compressors, beyond the + // built-in ones provided through CompressionType. + // + // If compressor is specified (not null), it overrides + // compression/compression_opts (the compressor includes values for its + // options). + // + // If compressor is not specified (null), compression/compression_opts are + // applied as described for those options. + // + // Default: nullptr + std::shared_ptr compressor = nullptr; + // Compression algorithm that will be used for the bottommost level that // contain files. The behavior for num_levels = 1 is not well defined. // Right now, with num_levels = 1, all compaction outputs will use @@ -223,6 +237,22 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions { // Default: kDisableCompressionOption (Disabled) CompressionType bottommost_compression = kDisableCompressionOption; + // Similar to bottommost_compression, but the algorithm is encapsulated in a + // Compressor object. This adds the ability to select custom compressors, + // beyond the built-in ones provided through CompressionType. + // + // If bottommost_compressor is specified (not null), it overrides + // bottommost_compression/compression_opts/bottommost_compression_opts (the + // compressor includes values for its options). + // + // If bottommost_compressor is not specified (null), + // bottommost_compression/compression_opts/bottommost_compression_opts are + // applied as described for those options. + // + // Default: nullptr (equivalent to bottommost_compression = + // kDisableCompressionOption) + std::shared_ptr bottommost_compressor = nullptr; + // different options for compression algorithms used by bottommost_compression // if it is enabled. To enable it, please see the definition of // CompressionOptions. Behavior for num_levels = 1 is the same as @@ -1236,6 +1266,19 @@ struct DBOptions { // versions regardless of the wal_compression settings. CompressionType wal_compression = kNoCompression; + // Similar to wal_compression, but the algorithm is encapsulated in a + // Compressor class. This adds the ability to select plugin compressors, + // beyond the built-in ones provided through CompressionType. + // + // If wal_compressor is specified (not null), it overrides + // wal_compression. + // + // If wal_compressor is not specified (null), wal_compression is + // applied. + // + // Default: nullptr + std::shared_ptr wal_compressor = nullptr; + // If true, RocksDB supports flushing multiple column families and committing // their results atomically to MANIFEST. Note that it is not // necessary to set atomic_flush to true if WAL is always enabled since WAL @@ -1821,6 +1864,18 @@ struct CompactionOptions { // according to the `ColumnFamilyOptions`, taking into account the output // level if `compression_per_level` is specified. CompressionType compression; + + // Similar to compression, but the algorithm is encapsulated in a Compressor + // class. This adds the ability to select plugin compressors, beyond the + // built-in ones provided through CompressionType. + // + // If compressor is specified (not nullptr), it overrides compression. + // + // If compressor is not specified (nullptr), compression is applied. + // + // Default: nullptr + std::shared_ptr compressor; + // Compaction will create files of size `output_file_size_limit`. // Default: MAX, which means that compaction will create a single file uint64_t output_file_size_limit; @@ -1829,6 +1884,7 @@ struct CompactionOptions { CompactionOptions() : compression(kSnappyCompression), + compressor(nullptr), output_file_size_limit(std::numeric_limits::max()), max_subcompactions(0) {} }; diff --git a/options/cf_options.cc b/options/cf_options.cc index 37b430dc584..b7d4b1e0c69 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -17,6 +17,7 @@ #include "options/options_parser.h" #include "port/port.h" #include "rocksdb/compaction_filter.h" +#include "rocksdb/compressor.h" #include "rocksdb/concurrent_task_limiter.h" #include "rocksdb/configurable.h" #include "rocksdb/convenience.h" @@ -29,7 +30,6 @@ #include "rocksdb/utilities/options_type.h" #include "util/cast_util.h" #include "util/compression.h" -#include "util/compressor.h" #include "util/string_util.h" // NOTE: in this file, many option flags that were deprecated @@ -376,6 +376,11 @@ static std::unordered_map {offsetof(struct MutableCFOptions, compression), OptionType::kCompressionType, OptionVerificationType::kNormal, OptionTypeFlags::kMutable}}, + {"compressor", + OptionTypeInfo::AsCustomSharedPtr( + offsetof(struct MutableCFOptions, compressor), + OptionVerificationType::kByNameAllowNull, + OptionTypeFlags::kMutable | OptionTypeFlags::kAllowNull)}, {"prefix_extractor", OptionTypeInfo::AsCustomSharedPtr( offsetof(struct MutableCFOptions, prefix_extractor), @@ -441,6 +446,11 @@ static std::unordered_map {offsetof(struct MutableCFOptions, blob_compression_type), OptionType::kCompressionType, OptionVerificationType::kNormal, OptionTypeFlags::kMutable}}, + {"blob_compressor", + OptionTypeInfo::AsCustomSharedPtr( + offsetof(struct MutableCFOptions, blob_compressor), + OptionVerificationType::kByNameAllowNull, + OptionTypeFlags::kMutable | OptionTypeFlags::kAllowNull)}, {"enable_blob_garbage_collection", {offsetof(struct MutableCFOptions, enable_blob_garbage_collection), OptionType::kBoolean, OptionVerificationType::kNormal, @@ -474,11 +484,22 @@ static std::unordered_map {offsetof(struct MutableCFOptions, bottommost_compression), OptionType::kCompressionType, OptionVerificationType::kNormal, OptionTypeFlags::kMutable}}, + {"bottommost_compressor", + OptionTypeInfo::AsCustomSharedPtr( + offsetof(struct MutableCFOptions, bottommost_compressor), + OptionVerificationType::kByNameAllowNull, + OptionTypeFlags::kMutable | OptionTypeFlags::kAllowNull)}, {"compression_per_level", OptionTypeInfo::Vector( offsetof(struct MutableCFOptions, compression_per_level), OptionVerificationType::kNormal, OptionTypeFlags::kMutable, {0, OptionType::kCompressionType})}, + {"compressor_per_level", + OptionTypeInfo::Vector>( + offsetof(struct MutableCFOptions, compressor_per_level), + OptionVerificationType::kByName, OptionTypeFlags::kMutable, + OptionTypeInfo::AsCustomSharedPtr( + 0, OptionVerificationType::kByName, OptionTypeFlags::kNone))}, {"experimental_mempurge_threshold", {offsetof(struct MutableCFOptions, experimental_mempurge_threshold), OptionType::kDouble, OptionVerificationType::kNormal, @@ -1024,6 +1045,7 @@ MutableCFOptions::MutableCFOptions(const ColumnFamilyOptions& options) min_blob_size(options.min_blob_size), blob_file_size(options.blob_file_size), blob_compression_type(options.blob_compression_type), + blob_compressor(options.blob_compressor), enable_blob_garbage_collection(options.enable_blob_garbage_collection), blob_garbage_collection_age_cutoff( options.blob_garbage_collection_age_cutoff), @@ -1039,7 +1061,9 @@ MutableCFOptions::MutableCFOptions(const ColumnFamilyOptions& options) paranoid_file_checks(options.paranoid_file_checks), report_bg_io_stats(options.report_bg_io_stats), compression(options.compression), + compressor(options.compressor), bottommost_compression(options.bottommost_compression), + bottommost_compressor(options.bottommost_compressor), compression_opts(options.compression_opts), bottommost_compression_opts(options.bottommost_compression_opts), last_level_temperature(options.last_level_temperature == @@ -1050,7 +1074,8 @@ MutableCFOptions::MutableCFOptions(const ColumnFamilyOptions& options) options.memtable_protection_bytes_per_key), sample_for_compression( options.sample_for_compression), // TODO: is 0 fine here? - compression_per_level(options.compression_per_level) { + compression_per_level(options.compression_per_level), + compressor_per_level(options.compressor_per_level) { RefreshDerivedOptions(options.num_levels, options.compaction_style); } @@ -1116,22 +1141,37 @@ void MutableCFOptions::RefreshDerivedOptions(int num_levels, max_file_size[i] = target_file_size_base; } } - compressor = BuiltinCompressor::GetCompressor(compression, compression_opts); - if (bottommost_compression != kDisableCompressionOption) { - if (bottommost_compression_opts.enabled) { - bottommost_compressor = BuiltinCompressor::GetCompressor( - bottommost_compression, bottommost_compression_opts); - } else { - bottommost_compressor = BuiltinCompressor::GetCompressor( - bottommost_compression, compression_opts); + if (compressor == nullptr) { + compressor = + BuiltinCompressor::GetCompressor(compression, compression_opts); + } + if (compressor == nullptr) { + compressor = BuiltinCompressor::GetCompressor(kSnappyCompression); + } + + if (bottommost_compressor == nullptr) { + if (bottommost_compression != kDisableCompressionOption) { + if (bottommost_compression_opts.enabled) { + bottommost_compressor = BuiltinCompressor::GetCompressor( + bottommost_compression, bottommost_compression_opts); + } else { + bottommost_compressor = BuiltinCompressor::GetCompressor( + bottommost_compression, compression_opts); + } } } - if (blob_compression_type != kDisableCompressionOption) { - blob_compressor = BuiltinCompressor::GetCompressor(blob_compression_type, - compression_opts); + if (blob_compressor == nullptr) { + if (blob_compression_type != kDisableCompressionOption) { + blob_compressor = BuiltinCompressor::GetCompressor(blob_compression_type, + compression_opts); + } + } + if (blob_compressor == nullptr) { + blob_compressor = BuiltinCompressor::GetCompressor(kNoCompression); } + if (compressor_per_level.empty()) { for (auto type : compression_per_level) { compressor_per_level.push_back( diff --git a/options/db_options.cc b/options/db_options.cc index 0217add873a..403cdb5c41f 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -388,6 +388,11 @@ static std::unordered_map {offsetof(struct ImmutableDBOptions, wal_compression), OptionType::kCompressionType, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"wal_compressor", + OptionTypeInfo::AsCustomSharedPtr( + offsetof(struct ImmutableDBOptions, wal_compressor), + OptionVerificationType::kByNameAllowNull, + OptionTypeFlags::kAllowNull)}, {"seq_per_batch", {0, OptionType::kBoolean, OptionVerificationType::kDeprecated, OptionTypeFlags::kNone}}, @@ -745,6 +750,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) two_write_queues(options.two_write_queues), manual_wal_flush(options.manual_wal_flush), wal_compression(options.wal_compression), + wal_compressor(options.wal_compressor), atomic_flush(options.atomic_flush), avoid_unnecessary_blocking_io(options.avoid_unnecessary_blocking_io), persist_stats_to_disk(options.persist_stats_to_disk), diff --git a/options/db_options.h b/options/db_options.h index 59e081055a1..784137b5c73 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -9,7 +9,6 @@ #include #include "rocksdb/options.h" -#include "util/compressor.h" namespace ROCKSDB_NAMESPACE { class SystemClock; diff --git a/options/options.cc b/options/options.cc index 817eaa0d2e1..351f72a373e 100644 --- a/options/options.cc +++ b/options/options.cc @@ -61,6 +61,7 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options) bloom_locality(options.bloom_locality), arena_block_size(options.arena_block_size), compression_per_level(options.compression_per_level), + compressor_per_level(options.compressor_per_level), num_levels(options.num_levels), level0_slowdown_writes_trigger(options.level0_slowdown_writes_trigger), level0_stop_writes_trigger(options.level0_stop_writes_trigger), @@ -102,6 +103,7 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options) min_blob_size(options.min_blob_size), blob_file_size(options.blob_file_size), blob_compression_type(options.blob_compression_type), + blob_compressor(options.blob_compressor), enable_blob_garbage_collection(options.enable_blob_garbage_collection), blob_garbage_collection_age_cutoff( options.blob_garbage_collection_age_cutoff), diff --git a/options/options_helper.cc b/options/options_helper.cc index 021411242e2..4449f2684d2 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -168,6 +168,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, options.two_write_queues = immutable_db_options.two_write_queues; options.manual_wal_flush = immutable_db_options.manual_wal_flush; options.wal_compression = immutable_db_options.wal_compression; + options.wal_compressor = immutable_db_options.wal_compressor; options.atomic_flush = immutable_db_options.atomic_flush; options.avoid_unnecessary_blocking_io = immutable_db_options.avoid_unnecessary_blocking_io; @@ -252,6 +253,7 @@ void UpdateColumnFamilyOptions(const MutableCFOptions& moptions, cf_opts->min_blob_size = moptions.min_blob_size; cf_opts->blob_file_size = moptions.blob_file_size; cf_opts->blob_compression_type = moptions.blob_compression_type; + cf_opts->blob_compressor = moptions.blob_compressor; cf_opts->enable_blob_garbage_collection = moptions.enable_blob_garbage_collection; cf_opts->blob_garbage_collection_age_cutoff = @@ -271,11 +273,14 @@ void UpdateColumnFamilyOptions(const MutableCFOptions& moptions, cf_opts->paranoid_file_checks = moptions.paranoid_file_checks; cf_opts->report_bg_io_stats = moptions.report_bg_io_stats; cf_opts->compression = moptions.compression; + cf_opts->compressor = moptions.compressor; cf_opts->compression_opts = moptions.compression_opts; cf_opts->bottommost_compression = moptions.bottommost_compression; + cf_opts->bottommost_compressor = moptions.bottommost_compressor; cf_opts->bottommost_compression_opts = moptions.bottommost_compression_opts; cf_opts->sample_for_compression = moptions.sample_for_compression; cf_opts->compression_per_level = moptions.compression_per_level; + cf_opts->compressor_per_level = moptions.compressor_per_level; cf_opts->last_level_temperature = moptions.last_level_temperature; cf_opts->bottommost_temperature = moptions.last_level_temperature; } diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 4a6e3017ed2..96f5f312ed2 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -244,6 +244,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { sizeof(std::vector>)}, {offsetof(struct DBOptions, row_cache), sizeof(std::shared_ptr)}, {offsetof(struct DBOptions, wal_filter), sizeof(const WalFilter*)}, + {offsetof(struct DBOptions, wal_compressor), + sizeof(std::shared_ptr)}, {offsetof(struct DBOptions, file_checksum_gen_factory), sizeof(std::shared_ptr)}, {offsetof(struct DBOptions, db_host_id), sizeof(std::string)}, @@ -393,6 +395,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { sizeof(std::shared_ptr)}, {offsetof(struct ColumnFamilyOptions, compression_per_level), sizeof(std::vector)}, + {offsetof(struct ColumnFamilyOptions, compressor_per_level), + sizeof(std::vector>)}, {offsetof(struct ColumnFamilyOptions, max_bytes_for_level_multiplier_additional), sizeof(std::vector)}, @@ -405,6 +409,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { sizeof(uint64_t)}, {offsetof(struct ColumnFamilyOptions, preserve_internal_time_seconds), sizeof(uint64_t)}, + {offsetof(struct ColumnFamilyOptions, blob_compressor), + sizeof(std::shared_ptr)}, {offsetof(struct ColumnFamilyOptions, blob_cache), sizeof(std::shared_ptr)}, {offsetof(struct ColumnFamilyOptions, comparator), sizeof(Comparator*)}, @@ -414,6 +420,10 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { sizeof(const CompactionFilter*)}, {offsetof(struct ColumnFamilyOptions, compaction_filter_factory), sizeof(std::shared_ptr)}, + {offsetof(struct ColumnFamilyOptions, compressor), + sizeof(std::shared_ptr)}, + {offsetof(struct ColumnFamilyOptions, bottommost_compressor), + sizeof(std::shared_ptr)}, {offsetof(struct ColumnFamilyOptions, prefix_extractor), sizeof(std::shared_ptr)}, {offsetof(struct ColumnFamilyOptions, snap_refresh_nanos), diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index 5805bcdb162..a346d3e8239 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -1647,7 +1647,18 @@ void BlockBasedTableBuilder::WritePropertiesBlock( rep_->ioptions.merge_operator != nullptr ? rep_->ioptions.merge_operator->Name() : "nullptr"; - rep_->props.compression_name = rep_->compressor->GetId(); +#ifndef ROCKSDB_LITE + std::string compression_name; + if (rep_->compressor->GetCompressionType() == kPluginCompression) { + ConfigOptions config_options; + compression_name = rep_->compressor->ToString(config_options); + } else { + compression_name = rep_->compressor->GetId(); + } +#else + std::string compression_name = rep_->compressor->GetId(); +#endif // ROCKSDB_LITE + rep_->props.compression_name = compression_name; rep_->props.prefix_extractor_name = rep_->moptions.prefix_extractor != nullptr ? rep_->moptions.prefix_extractor->AsString() diff --git a/table/block_based/block_based_table_reader_test.cc b/table/block_based/block_based_table_reader_test.cc index 4baf5c9190b..5ba0d5ccba7 100644 --- a/table/block_based/block_based_table_reader_test.cc +++ b/table/block_based/block_based_table_reader_test.cc @@ -16,7 +16,7 @@ #include "options/options_helper.h" #include "port/port.h" #include "port/stack_trace.h" -#include "rocksdb/compression_type.h" +#include "rocksdb/compressor.h" #include "rocksdb/db.h" #include "rocksdb/file_system.h" #include "table/block_based/block_based_table_builder.h" @@ -25,7 +25,6 @@ #include "table/format.h" #include "test_util/testharness.h" #include "test_util/testutil.h" -#include "util/compressor.h" #include "util/random.h" namespace ROCKSDB_NAMESPACE { diff --git a/table/block_fetcher_test.cc b/table/block_fetcher_test.cc index f0e65406969..21b3fa52e29 100644 --- a/table/block_fetcher_test.cc +++ b/table/block_fetcher_test.cc @@ -10,6 +10,7 @@ #include "options/options_helper.h" #include "port/port.h" #include "port/stack_trace.h" +#include "rocksdb/compressor.h" #include "rocksdb/convenience.h" #include "rocksdb/db.h" #include "rocksdb/file_system.h" @@ -19,7 +20,6 @@ #include "table/block_based/block_based_table_reader.h" #include "table/format.h" #include "test_util/testharness.h" -#include "util/compressor.h" #include "utilities/memory_allocators.h" namespace ROCKSDB_NAMESPACE { diff --git a/table/format.cc b/table/format.cc index e3fbc68ebd6..de81d53150a 100644 --- a/table/format.cc +++ b/table/format.cc @@ -18,6 +18,7 @@ #include "monitoring/perf_context_imp.h" #include "monitoring/statistics.h" #include "options/options_helper.h" +#include "rocksdb/compressor.h" #include "rocksdb/env.h" #include "rocksdb/options.h" #include "rocksdb/table.h" @@ -26,7 +27,6 @@ #include "table/persistent_cache_helper.h" #include "util/cast_util.h" #include "util/coding.h" -#include "util/compressor.h" #include "util/crc32c.h" #include "util/hash.h" #include "util/stop_watch.h" diff --git a/table/format.h b/table/format.h index 33214845725..ae6ecd5b030 100644 --- a/table/format.h +++ b/table/format.h @@ -19,6 +19,7 @@ #include "options/cf_options.h" #include "port/malloc.h" #include "port/port.h" // noexcept +#include "rocksdb/compressor.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" #include "rocksdb/table.h" diff --git a/test_util/testutil.cc b/test_util/testutil.cc index d58a5837e44..019fb640d9b 100644 --- a/test_util/testutil.cc +++ b/test_util/testutil.cc @@ -23,12 +23,12 @@ #include "file/sequence_file_reader.h" #include "file/writable_file_writer.h" #include "port/port.h" +#include "rocksdb/compressor.h" #include "rocksdb/convenience.h" #include "rocksdb/system_clock.h" #include "rocksdb/utilities/object_registry.h" #include "test_util/mock_time_env.h" #include "test_util/sync_point.h" -#include "util/compressor.h" #include "util/random.h" #include "util/string_util.h" diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 09f7548b52f..735ca6508ac 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -47,6 +47,7 @@ #include "port/port.h" #include "port/stack_trace.h" #include "rocksdb/cache.h" +#include "rocksdb/compressor.h" #include "rocksdb/convenience.h" #include "rocksdb/db.h" #include "rocksdb/env.h" @@ -602,9 +603,8 @@ DEFINE_double(compressed_secondary_cache_low_pri_pool_ratio, 0.0, DEFINE_string(compressed_secondary_cache_compression_type, "lz4", "The compression algorithm to use for large " "values stored in CompressedSecondaryCache."); -static enum ROCKSDB_NAMESPACE::CompressionType - FLAGS_compressed_secondary_cache_compression_type_e = - ROCKSDB_NAMESPACE::kLZ4Compression; +static std::shared_ptr + FLAGS_compressed_secondary_cache_compressor; DEFINE_uint32( compressed_secondary_cache_compress_format_version, 2, @@ -830,8 +830,7 @@ DEFINE_bool(manual_wal_flush, false, DEFINE_string(wal_compression, "none", "Algorithm to use for WAL compression. none to disable."); -static enum ROCKSDB_NAMESPACE::CompressionType FLAGS_wal_compression_e = - ROCKSDB_NAMESPACE::kNoCompression; +static std::shared_ptr FLAGS_wal_compressor; DEFINE_string(wal_dir, "", "If not empty, use the given dir for WAL"); @@ -1045,8 +1044,7 @@ DEFINE_uint64(blob_db_file_size, DEFINE_string( blob_db_compression_type, "snappy", "[Stacked BlobDB] Algorithm to use to compress blobs in blob files."); -static enum ROCKSDB_NAMESPACE::CompressionType - FLAGS_blob_db_compression_type_e = ROCKSDB_NAMESPACE::kSnappyCompression; +static std::shared_ptr FLAGS_blob_db_compressor; #endif // ROCKSDB_LITE @@ -1255,30 +1253,48 @@ DEFINE_uint64( "num_file_reads_for_auto_readahead indicates after how many sequential " "reads into that file internal auto prefetching should be start."); -static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType( - const char* ctype) { +static std::shared_ptr StringToCompressor( + const char* ctype, const std::string& opts = "") { assert(ctype); + std::string compressor_name = std::string(ctype); if (!strcasecmp(ctype, "none")) - return ROCKSDB_NAMESPACE::kNoCompression; + compressor_name = ROCKSDB_NAMESPACE::NoCompressor::kClassName(); else if (!strcasecmp(ctype, "snappy")) - return ROCKSDB_NAMESPACE::kSnappyCompression; + compressor_name = ROCKSDB_NAMESPACE::SnappyCompressor::kClassName(); else if (!strcasecmp(ctype, "zlib")) - return ROCKSDB_NAMESPACE::kZlibCompression; + compressor_name = ROCKSDB_NAMESPACE::ZlibCompressor::kClassName(); else if (!strcasecmp(ctype, "bzip2")) - return ROCKSDB_NAMESPACE::kBZip2Compression; + compressor_name = ROCKSDB_NAMESPACE::BZip2Compressor::kClassName(); else if (!strcasecmp(ctype, "lz4")) - return ROCKSDB_NAMESPACE::kLZ4Compression; + compressor_name = ROCKSDB_NAMESPACE::LZ4Compressor::kClassName(); else if (!strcasecmp(ctype, "lz4hc")) - return ROCKSDB_NAMESPACE::kLZ4HCCompression; + compressor_name = ROCKSDB_NAMESPACE::LZ4HCCompressor::kClassName(); else if (!strcasecmp(ctype, "xpress")) - return ROCKSDB_NAMESPACE::kXpressCompression; + compressor_name = ROCKSDB_NAMESPACE::XpressCompressor::kClassName(); else if (!strcasecmp(ctype, "zstd")) - return ROCKSDB_NAMESPACE::kZSTD; - else { - fprintf(stderr, "Cannot parse compression type '%s'\n", ctype); - exit(1); + compressor_name = ROCKSDB_NAMESPACE::ZSTDCompressor::kClassName(); + + ROCKSDB_NAMESPACE::ConfigOptions config_options; + config_options.ignore_unknown_options = true; + std::shared_ptr compressor; + ROCKSDB_NAMESPACE::Status s; + if (opts.empty()) { + s = ROCKSDB_NAMESPACE::Compressor::CreateFromString( + config_options, compressor_name, &compressor); + } else { + s = ROCKSDB_NAMESPACE::Compressor::CreateFromString( + config_options, + ROCKSDB_NAMESPACE::OptionTypeInfo::kIdPropName() + std::string("=") + + compressor_name + config_options.delimiter + opts, + &compressor); + } + if (s.ok() && compressor != nullptr) { + return compressor; } + + fprintf(stderr, "Cannot parse compression type '%s'\n", ctype); + exit(1); } static std::string ColumnFamilyName(size_t i) { @@ -1292,9 +1308,10 @@ static std::string ColumnFamilyName(size_t i) { } DEFINE_string(compression_type, "snappy", - "Algorithm to use to compress the database"); -static enum ROCKSDB_NAMESPACE::CompressionType FLAGS_compression_type_e = - ROCKSDB_NAMESPACE::kSnappyCompression; + "Algorithm to use to compress the database. Built-in " + "compressors: none, snappy, zlib, bzip2, lz4, lz4hc, xpress, " + "zstd. Plugin compressors: use the compressor name."); +static std::shared_ptr FLAGS_compressor; DEFINE_int64(sample_for_compression, 0, "Sample every N block for compression"); @@ -1331,6 +1348,27 @@ DEFINE_bool(compression_use_zstd_dict_trainer, "If true, use ZSTD_TrainDictionary() to create dictionary, else" "use ZSTD_FinalizeDictionary() to create dictionary"); +DEFINE_string(compressor_options, "", "Options for specific compressor."); + +static std::string GetCompressorOptions() { + // If compressor_options is empty, use the individual options + if (FLAGS_compressor_options.empty()) { + ROCKSDB_NAMESPACE::CompressionOptions compression_opts; + compression_opts.level = FLAGS_compression_level; + compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes; + compression_opts.zstd_max_train_bytes = + FLAGS_compression_zstd_max_train_bytes; + compression_opts.parallel_threads = FLAGS_compression_parallel_threads; + compression_opts.max_dict_buffer_bytes = + FLAGS_compression_max_dict_buffer_bytes; + compression_opts.use_zstd_dict_trainer = + FLAGS_compression_use_zstd_dict_trainer; + return ROCKSDB_NAMESPACE::CompressionOptionsToString(compression_opts); + } else { + return FLAGS_compressor_options; + } +} + static bool ValidateTableCacheNumshardbits(const char* flagname, int32_t value) { if (0 >= value || value >= 20) { @@ -2835,7 +2873,7 @@ class Benchmark { #endif } - auto compression = CompressionTypeToString(FLAGS_compression_type_e); + auto compression = FLAGS_compressor->GetId(); fprintf(stdout, "Compression: %s\n", compression.c_str()); fprintf(stdout, "Compression sampling rate: %" PRId64 "\n", FLAGS_sample_for_compression); @@ -2859,16 +2897,15 @@ class Benchmark { fprintf(stdout, "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n"); #endif - if (FLAGS_compression_type_e != ROCKSDB_NAMESPACE::kNoCompression) { + if (FLAGS_compressor->GetCompressionType() != + ROCKSDB_NAMESPACE::kNoCompression) { // The test string should not be too small. const int len = FLAGS_block_size; std::string input_str(len, 'y'); std::string compressed; - std::shared_ptr compressor = - BuiltinCompressor::GetCompressor(FLAGS_compression_type_e); CompressionInfo info(FLAGS_sample_for_compression); - bool result = - info.CompressData(compressor.get(), Slice(input_str), &compressed); + bool result = info.CompressData(FLAGS_compressor.get(), Slice(input_str), + &compressed); if (!result) { fprintf(stdout, "WARNING: %s compression is not enabled\n", @@ -3088,7 +3125,7 @@ class Benchmark { secondary_cache_opts.low_pri_pool_ratio = FLAGS_compressed_secondary_cache_low_pri_pool_ratio; secondary_cache_opts.compression_type = - FLAGS_compressed_secondary_cache_compression_type_e; + FLAGS_compressed_secondary_cache_compressor->GetCompressionType(); secondary_cache_opts.compress_format_version = FLAGS_compressed_secondary_cache_compress_format_version; opts.secondary_cache = @@ -4020,10 +4057,9 @@ class Benchmark { int64_t produced = 0; Status s; std::string compressed; - std::shared_ptr compressor = - BuiltinCompressor::GetCompressor(FLAGS_compression_type_e); - auto raw_compressor = compressor.get(); + auto raw_compressor = FLAGS_compressor.get(); CompressionInfo info(FLAGS_sample_for_compression); + // Compress 1G while (s.ok() && bytes < int64_t(1) << 30) { compressed.clear(); @@ -4049,18 +4085,16 @@ class Benchmark { Slice input = gen.Generate(FLAGS_block_size); std::string compressed; - std::shared_ptr compressor = - BuiltinCompressor::GetCompressor(FLAGS_compression_type_e); CompressionInfo compression_info(FLAGS_sample_for_compression); UncompressionInfo uncompression_info; - bool ok = - compression_info.CompressData(compressor.get(), input, &compressed); + bool ok = compression_info.CompressData(FLAGS_compressor.get(), input, + &compressed); int64_t bytes = 0; size_t uncompressed_size = 0; while (ok && bytes < 1024 * 1048576) { CacheAllocationPtr uncompressed = uncompression_info.UncompressData( - compressor.get(), compressed.data(), compressed.size(), + FLAGS_compressor.get(), compressed.data(), compressed.size(), &uncompressed_size); ok = uncompressed.get() != nullptr; @@ -4157,7 +4191,7 @@ class Benchmark { options.use_direct_io_for_flush_and_compaction = FLAGS_use_direct_io_for_flush_and_compaction; options.manual_wal_flush = FLAGS_manual_wal_flush; - options.wal_compression = FLAGS_wal_compression_e; + options.wal_compressor = FLAGS_wal_compressor; #ifndef ROCKSDB_LITE options.ttl = FLAGS_fifo_compaction_ttl; options.compaction_options_fifo = CompactionOptionsFIFO( @@ -4506,7 +4540,7 @@ class Benchmark { FLAGS_level0_file_num_compaction_trigger; options.level0_slowdown_writes_trigger = FLAGS_level0_slowdown_writes_trigger; - options.compression = FLAGS_compression_type_e; + options.compressor = FLAGS_compressor; if (FLAGS_simulate_hybrid_fs_file != "") { options.bottommost_temperature = Temperature::kWarm; } @@ -4521,12 +4555,13 @@ class Benchmark { if (FLAGS_min_level_to_compress >= 0) { assert(FLAGS_min_level_to_compress <= FLAGS_num_levels); - options.compression_per_level.resize(FLAGS_num_levels); + options.compressor_per_level.resize(FLAGS_num_levels); for (int i = 0; i < FLAGS_min_level_to_compress; i++) { - options.compression_per_level[i] = kNoCompression; + options.compressor_per_level[i] = + BuiltinCompressor::GetCompressor(kNoCompression); } for (int i = FLAGS_min_level_to_compress; i < FLAGS_num_levels; i++) { - options.compression_per_level[i] = FLAGS_compression_type_e; + options.compressor_per_level[i] = FLAGS_compressor; } } options.soft_pending_compaction_bytes_limit = @@ -4621,8 +4656,8 @@ class Benchmark { options.enable_blob_files = FLAGS_enable_blob_files; options.min_blob_size = FLAGS_min_blob_size; options.blob_file_size = FLAGS_blob_file_size; - options.blob_compression_type = - StringToCompressionType(FLAGS_blob_compression_type.c_str()); + options.blob_compressor = + StringToCompressor(FLAGS_blob_compression_type.c_str()); options.enable_blob_garbage_collection = FLAGS_enable_blob_garbage_collection; options.blob_garbage_collection_age_cutoff = @@ -4891,7 +4926,8 @@ class Benchmark { blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size; blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync; blob_db_options.blob_file_size = FLAGS_blob_db_file_size; - blob_db_options.compression = FLAGS_blob_db_compression_type_e; + blob_db_options.compression = + FLAGS_blob_db_compressor->GetCompressionType(); blob_db::BlobDB* ptr = nullptr; s = blob_db::BlobDB::Open(options, blob_db_options, db_name, &ptr); if (s.ok()) { @@ -5546,7 +5582,7 @@ class Benchmark { for (size_t i = 0; i < num_db; i++) { auto db = db_list[i]; auto compactionOptions = CompactionOptions(); - compactionOptions.compression = FLAGS_compression_type_e; + compactionOptions.compressor = FLAGS_compressor; auto options = db->GetOptions(); MutableCFOptions mutable_cf_options(options); for (size_t j = 0; j < sorted_runs[i].size(); j++) { @@ -5601,7 +5637,7 @@ class Benchmark { for (size_t i = 0; i < num_db; i++) { auto db = db_list[i]; auto compactionOptions = CompactionOptions(); - compactionOptions.compression = FLAGS_compression_type_e; + compactionOptions.compressor = FLAGS_compressor; auto options = db->GetOptions(); MutableCFOptions mutable_cf_options(options); for (size_t j = 0; j < sorted_runs[i].size(); j++) { @@ -8553,19 +8589,19 @@ int db_bench_tool(int argc, char** argv) { #endif } - FLAGS_compression_type_e = - StringToCompressionType(FLAGS_compression_type.c_str()); + FLAGS_compressor = StringToCompressor(FLAGS_compression_type.c_str(), + GetCompressorOptions()); - FLAGS_wal_compression_e = - StringToCompressionType(FLAGS_wal_compression.c_str()); + FLAGS_wal_compressor = StringToCompressor(FLAGS_wal_compression.c_str()); - FLAGS_compressed_secondary_cache_compression_type_e = StringToCompressionType( - FLAGS_compressed_secondary_cache_compression_type.c_str()); + FLAGS_compressed_secondary_cache_compressor = StringToCompressor( + FLAGS_compressed_secondary_cache_compression_type.c_str(), + GetCompressorOptions()); #ifndef ROCKSDB_LITE // Stacked BlobDB - FLAGS_blob_db_compression_type_e = - StringToCompressionType(FLAGS_blob_db_compression_type.c_str()); + FLAGS_blob_db_compressor = + StringToCompressor(FLAGS_blob_db_compression_type.c_str()); int env_opts = !FLAGS_env_uri.empty() + !FLAGS_fs_uri.empty(); if (env_opts > 1) { diff --git a/tools/sst_dump_test.cc b/tools/sst_dump_test.cc index d446e3a6988..0a757a93a94 100644 --- a/tools/sst_dump_test.cc +++ b/tools/sst_dump_test.cc @@ -12,6 +12,7 @@ #include "file/random_access_file_reader.h" #include "port/stack_trace.h" +#include "rocksdb/compressor.h" #include "rocksdb/convenience.h" #include "rocksdb/filter_policy.h" #include "rocksdb/sst_dump_tool.h" @@ -19,7 +20,6 @@ #include "table/table_builder.h" #include "test_util/testharness.h" #include "test_util/testutil.h" -#include "util/compressor.h" #include "util/string_util.h" namespace ROCKSDB_NAMESPACE { diff --git a/util/compression.h b/util/compression.h index d60a348bb96..f6f2ed41d8b 100644 --- a/util/compression.h +++ b/util/compression.h @@ -22,12 +22,12 @@ #include #include "memory/memory_allocator.h" +#include "rocksdb/compressor.h" #include "rocksdb/convenience.h" #include "rocksdb/options.h" #include "test_util/sync_point.h" #include "util/coding.h" #include "util/compression_context_cache.h" -#include "util/compressor.h" #include "util/string_util.h" #ifdef SNAPPY @@ -271,6 +271,9 @@ inline std::string CompressionOptionsToString( result.append("enabled=") .append(std::to_string(compression_options.enabled)) .append("; "); + result.append("parallel_threads=") + .append(std::to_string(compression_options.parallel_threads)) + .append("; "); result.append("max_dict_buffer_bytes=") .append(std::to_string(compression_options.max_dict_buffer_bytes)) .append("; "); diff --git a/util/compression_test.cc b/util/compression_test.cc index 51d9ffc873b..51731e5afa6 100644 --- a/util/compression_test.cc +++ b/util/compression_test.cc @@ -11,6 +11,7 @@ #include "util/compression.h" #include "port/stack_trace.h" +#include "rocksdb/compressor.h" #include "rocksdb/configurable.h" #include "rocksdb/convenience.h" #include "rocksdb/db.h" @@ -18,10 +19,255 @@ #include "rocksdb/utilities/options_type.h" #include "rocksdb/utilities/options_util.h" #include "test_util/testharness.h" -#include "util/compressor.h" namespace ROCKSDB_NAMESPACE { +#ifndef ROCKSDB_LITE + +// Registers a factory function for a Compressor in the default ObjectLibrary. +// The factory function calls the Compressor constructor. +template +void AddCompressorFactory(std::shared_ptr library) { + library->AddFactory( + T::kClassName(), + [](const std::string& /* uri */, std::unique_ptr* c, + std::string* /* errmsg */) { + c->reset(new T()); + return c->get(); + }); +} + +struct DummyCompressorOptions { + static const char* kName() { return "DummyCompressorOptions"; }; + std::string option_str = "default"; + int option_int = 0; +}; + +static std::unordered_map + dummy_compressor_type_info = { + {"option_str", + {offsetof(struct DummyCompressorOptions, option_str), + OptionType::kString, OptionVerificationType::kNormal, + OptionTypeFlags::kMutable}}, + {"option_int", + {offsetof(struct DummyCompressorOptions, option_int), OptionType::kInt, + OptionVerificationType::kNormal, OptionTypeFlags::kMutable}}}; + +template +class DummyCompressor : public Compressor { + public: + DummyCompressor() { + RegisterOptions(&options_, &dummy_compressor_type_info); + }; + + static const char* kClassName() { + name_ = "DummyCompressor" + std::to_string(id); + return name_.c_str(); + } + + const char* Name() const override { return kClassName(); } + + Status Compress(const CompressionInfo& info, const Slice& input, + std::string* output) override { + (void)info; + (void)input; + (void)output; + return Status::OK(); + } + + Status Uncompress(const UncompressionInfo& info, const char* input, + size_t input_length, char** output, + size_t* output_length) override { + (void)info; + (void)input; + (void)input_length; + (void)output; + (void)output_length; + return Status::OK(); + } + + private: + static std::string name_; + DummyCompressorOptions options_; +}; + +template +std::string DummyCompressor::name_; + +struct DummyDictionaryCompressorOptions { + static const char* kName() { return "DummyDictionaryCompressorOptions"; }; + uint32_t max_dict_bytes; + uint32_t max_train_bytes; + uint64_t max_dict_buffer_bytes; +}; + +static std::unordered_map + dummy_dictionary_compressor_type_info = { + {"max_dict_bytes", + {offsetof(struct DummyDictionaryCompressorOptions, max_dict_bytes), + OptionType::kInt, OptionVerificationType::kNormal, + OptionTypeFlags::kMutable}}, + {"max_train_bytes", + {offsetof(struct DummyDictionaryCompressorOptions, max_train_bytes), + OptionType::kUInt32T, OptionVerificationType::kNormal, + OptionTypeFlags::kMutable}}, + {"max_dict_buffer_bytes", + {offsetof(struct DummyDictionaryCompressorOptions, + max_dict_buffer_bytes), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kMutable}}}; + +class DummyDictionaryCompressor : public Compressor { + public: + DummyDictionaryCompressor() { + RegisterOptions(&options_, &dummy_dictionary_compressor_type_info); + }; + + static const char* kClassName() { return "DummyDictionaryCompressor"; } + + const char* Name() const override { return kClassName(); } + + bool DictCompressionSupported() const override { return true; } + + Status Compress(const CompressionInfo& info, const Slice& input, + std::string* output) override { + (void)info; + output->append(input.data(), input.size()); + return Status::OK(); + } + + Status Uncompress(const UncompressionInfo& info, const char* input, + size_t input_length, char** output, + size_t* output_length) override { + *output = Allocate(input_length, info.GetMemoryAllocator()); + if (!*output) { + return Status::MemoryLimit(); + } + memcpy(*output, input, input_length); + *output_length = input_length; + return Status::OK(); + } + + Status CreateDict(std::vector& /*data_block_buffers*/, + std::unique_ptr* dict) override { + *dict = NewCompressionDict(std::string()); + num_create_dictionary_calls++; + return Status::OK(); + } + + uint64_t GetMaxDictBufferBytes() const override { + return options_.max_dict_buffer_bytes; + } + + int num_create_dictionary_calls = 0; + + private: + uint32_t GetMaxDictBytes() const override { return options_.max_dict_bytes; } + + uint32_t GetMaxTrainBytes() const override { + return options_.max_train_bytes; + } + + DummyDictionaryCompressorOptions options_; +}; + +// Simple RLE compressor for testing purposes. +// It needs to compress enough to pass GoodCompressionRatio check. +class SimpleRLECompressor : public Compressor { + public: + static const char* kClassName() { return "SimpleRLECompressor"; } + + const char* Name() const override { return kClassName(); } + + Status Compress(const CompressionInfo& info, const Slice& input, + std::string* output) override; + + Status Uncompress(const UncompressionInfo& info, const char* input, + size_t input_length, char** output, + size_t* output_length) override; + + std::atomic num_compress_calls = 0; + std::atomic num_uncompress_calls = 0; + + private: + const char delim_ = '~'; + + void outputSeq(char last, char seq, std::string* output); +}; + +Status SimpleRLECompressor::Compress(const CompressionInfo& info, + const Slice& input, std::string* output) { + (void)info; + + output->clear(); + char last = input[0]; + char seq = 0; + for (size_t i = 0; i < input.size(); i++) { + if (input[i] == last && seq < delim_ - 1) { + seq++; + } else { + outputSeq(last, seq, output); + seq = 1; + } + last = input[i]; + } + outputSeq(last, seq, output); + + num_compress_calls++; + return Status::OK(); +} + +Status SimpleRLECompressor::Uncompress(const UncompressionInfo& info, + const char* input, size_t input_length, + char** output, size_t* output_length) { + (void)info; + + std::string uncompressed; + size_t i = 0; + while (i < input_length) { + if (i < input_length - 1 && input[i] == delim_ && input[i + 1] == delim_) { + uncompressed += delim_; + i += 2; + } else if (i < input_length - 2 && input[i] == delim_) { + uncompressed.append(input[i + 1], input[i + 2]); + i += 3; + } else { + uncompressed += input[i]; + i++; + } + } + + *output = Allocate(uncompressed.length(), info.GetMemoryAllocator()); + if (!*output) { + return Status::MemoryLimit(); + } + memcpy(*output, uncompressed.c_str(), uncompressed.length()); + *output_length = uncompressed.length(); + num_uncompress_calls++; + return Status::OK(); +} + +void SimpleRLECompressor::outputSeq(char last, char seq, std::string* output) { + if (last != delim_) { + if (seq >= 4) { + *output += delim_; + *output += seq; + *output += last; + } else { + output->append(seq, last); + } + } else { + if (seq >= 2) { + *output += delim_; + *output += seq; + *output += last; + } else { + output->append(seq * 2, last); + } + } +} +#endif // ROCKSDB_LITE + TEST(Compression, CreateFromString) { ConfigOptions config_options; config_options.ignore_unsupported_options = false; @@ -104,6 +350,243 @@ TEST(Compression, GetSupportedCompressions) { } } } + +TEST(Compression, SimpleRLECompressor) { + SimpleRLECompressor compressor; + char input[] = "aaaaaaaaaabbbbbbbbbb"; + size_t input_length = 21; + std::string compressed; + CompressionInfo compression_info(CompressionDict::GetEmptyDict(), 0, 0); + Slice data(input, input_length); + Status s = compressor.Compress(compression_info, data, &compressed); + ASSERT_OK(s); + ASSERT_STREQ(compressed.c_str(), "~\na~\nb"); + char* decompress_data = nullptr; + size_t decompress_size = 0; + UncompressionInfo uncompression_info(UncompressionDict::GetEmptyDict(), 0); + s = compressor.Uncompress(uncompression_info, compressed.c_str(), + compressed.length(), &decompress_data, + &decompress_size); + CacheAllocationPtr original; + original.reset(decompress_data); + ASSERT_OK(s); + ASSERT_NE(original, nullptr); + ASSERT_EQ(decompress_size, input_length); + ASSERT_STREQ(original.get(), input); +} + +TEST(Compression, CreatePluginCompressor) { + ConfigOptions config_options; + auto library = config_options.registry->AddLibrary("CreatePluginCompressor"); + std::shared_ptr compressor; + std::vector> compressors; + + AddCompressorFactory(library); + std::string compressor_name = SimpleRLECompressor::kClassName(); + Status s = Compressor::CreateFromString(config_options, compressor_name, + &compressor); + + // Was compressor created? + ASSERT_OK(s); + ASSERT_NE(compressor, nullptr); + ASSERT_EQ(compressor->Name(), compressor_name); + + // Was compressor assigned the expected numeric type? + CompressionType compression_type = compressor->GetCompressionType(); + ASSERT_EQ(compression_type, kPluginCompression); + + // Was factory added to ObjectLibrary? + auto entry = library->FindFactory(compressor_name); + ASSERT_NE(entry, nullptr); +} + +TEST(Compression, CreateCompressorsWithDifferentOptions) { + ConfigOptions config_options; + auto library = config_options.registry->AddLibrary( + "CreateCompressorsWithDifferentOptions"); + std::vector> compressors; + + AddCompressorFactory>(library); + std::string compressor_name = DummyCompressor<1>::kClassName(); + + // Create DummyCompressor with default configuration + std::shared_ptr compressor_config0; + Status s = Compressor::CreateFromString(config_options, compressor_name, + &compressor_config0); + ASSERT_OK(s); + ASSERT_NE(compressor_config0, nullptr); + + // Create DummyCompressor with configuration 1 + std::shared_ptr compressor_config1; + s = Compressor::CreateFromString( + config_options, + "id=" + std::string(DummyCompressor<1>::kClassName()) + + ";option_str=str1;option_int=1", + &compressor_config1); + ASSERT_OK(s); + ASSERT_NE(compressor_config1, nullptr); + + // Create DummyCompressor with configuration 2 + std::shared_ptr compressor_config2; + s = Compressor::CreateFromString( + config_options, + "id=" + std::string(DummyCompressor<1>::kClassName()) + + ";option_str=str2;option_int=2", + &compressor_config2); + ASSERT_OK(s); + ASSERT_NE(compressor_config2, nullptr); + + // Verify default configuration + ASSERT_EQ( + compressor_config0->GetOptions()->option_str, + DummyCompressorOptions().option_str); + ASSERT_EQ( + compressor_config0->GetOptions()->option_int, + DummyCompressorOptions().option_int); + + // Verify configuration 1 + ASSERT_STREQ(compressor_config1->GetOptions() + ->option_str.c_str(), + "str1"); + ASSERT_EQ( + compressor_config1->GetOptions()->option_int, 1); + + // Verify configuration 2 + ASSERT_STREQ(compressor_config2->GetOptions() + ->option_str.c_str(), + "str2"); + ASSERT_EQ( + compressor_config2->GetOptions()->option_int, 2); +} + +TEST(Compression, ColumnFamilyOptionsFromStringWithMissingCompressor) { + std::vector> compressors; + ColumnFamilyOptions options, new_options; + ConfigOptions config_options; + config_options.ignore_unsupported_options = false; + + Status s = GetColumnFamilyOptionsFromString( + config_options, options, "compressor=MissingCompressor;", &new_options); + ASSERT_NOK(s); + ASSERT_EQ(s.ToString(), + "Invalid argument: Cannot find compressor : MissingCompressor"); +} + +TEST(Compression, ColumnFamilyOptionsFromStringWithAddedCompressor) { + ConfigOptions config_options; + auto library = config_options.registry->AddLibrary( + "ColumnFamilyOptionsFromStringWithAddedCompressor"); + std::vector> compressors; + ColumnFamilyOptions options, new_options; + config_options.ignore_unsupported_options = false; + + AddCompressorFactory(library); + std::string compressor_name = SimpleRLECompressor::kClassName(); + Status s = GetColumnFamilyOptionsFromString( + config_options, options, "compressor=" + compressor_name + ";", + &new_options); + ASSERT_OK(s); + ASSERT_NE(new_options.compressor, nullptr); + ASSERT_EQ(new_options.compressor->Name(), compressor_name); + ASSERT_EQ(new_options.compressor->GetCompressionType(), kPluginCompression); +} + +TEST(Compression, StringFromColumnFamilyOptions) { + ConfigOptions config_options; + auto library = + config_options.registry->AddLibrary("StringFromColumnFamilyOptions"); + std::shared_ptr compressor; + std::vector> compressors; + + // Select compressor with options + AddCompressorFactory>(library); + std::string compressor_name = DummyCompressor<1>::kClassName(); + std::string opts_str = + "{id=" + compressor_name + ";option_int=1;option_str=str1;}"; + + ColumnFamilyOptions options, new_options; + Status s = GetColumnFamilyOptionsFromString( + config_options, options, "compressor=" + opts_str, &new_options); + ASSERT_OK(s); + ASSERT_NE(new_options.compressor, nullptr); + ASSERT_EQ(new_options.compressor->Name(), compressor_name); + ASSERT_EQ(new_options.compressor->GetCompressionType(), kPluginCompression); + + // Verify compressor was configured with options + ASSERT_STREQ(new_options.compressor->GetOptions() + ->option_str.c_str(), + "str1"); + ASSERT_EQ( + new_options.compressor->GetOptions()->option_int, + 1); + + // Verify serialization of options + std::string opts_serialized; + s = GetStringFromColumnFamilyOptions(config_options, new_options, + &opts_serialized); + ASSERT_OK(s); + ASSERT_TRUE(opts_serialized.find("compressor=" + opts_str) != + std::string::npos); + ASSERT_TRUE(opts_serialized.find("bottommost_compressor=nullptr") != + std::string::npos); + ASSERT_TRUE(opts_serialized.find("blob_compressor={id=" + + std::string(NoCompressor::kClassName())) != + std::string::npos); + + // Re-parse serialized options + s = GetColumnFamilyOptionsFromString(config_options, options, opts_serialized, + &new_options); + ASSERT_OK(s); +} + +TEST(Compression, ColumnFamilyOptionsFromStringWithCompressionPerLevel) { + ConfigOptions config_options; + auto library = config_options.registry->AddLibrary( + "ColumnFamilyOptionsFromStringWithCompressionPerLevel"); + std::vector> compressors; + + // Select two different compressor configurations (per level) + AddCompressorFactory>(library); + std::string opts_str1 = + "{id=" + std::string(DummyCompressor<1>::kClassName()) + + ";option_int=1;option_str=str1;}"; + std::string opts_str2 = + "{id=" + std::string(DummyCompressor<1>::kClassName()) + + ";option_int=2;option_str=str2;}"; + + ColumnFamilyOptions options, new_options; + std::string opts = "compressor_per_level={NoCompression:" + opts_str1 + ":" + + opts_str2 + "}"; + Status s = GetColumnFamilyOptionsFromString(config_options, options, opts, + &new_options); + ASSERT_OK(s); + + std::shared_ptr compressor1 = new_options.compressor_per_level[1]; + ASSERT_NE(compressor1, nullptr); + std::shared_ptr compressor2 = new_options.compressor_per_level[2]; + ASSERT_NE(compressor2, nullptr); + + // Verify compressors were set up with options + ASSERT_STREQ( + compressor1->GetOptions()->option_str.c_str(), + "str1"); + ASSERT_EQ(compressor1->GetOptions()->option_int, 1); + + ASSERT_STREQ( + compressor2->GetOptions()->option_str.c_str(), + "str2"); + ASSERT_EQ(compressor2->GetOptions()->option_int, 2); + + // Verify serialization of options + std::string opts_serialized; + s = GetStringFromColumnFamilyOptions(config_options, new_options, + &opts_serialized); + ASSERT_OK(s); + ASSERT_TRUE( + opts_serialized.find( + "compressor_per_level={{id=NoCompression;parallel_threads=1;}:" + + opts_str1 + ":" + opts_str2 + "}") != std::string::npos); +} #endif // ROCKSDB_LITE static void WriteDBAndFlush(DB* db, int num_keys, const std::string& val) { @@ -123,6 +606,99 @@ static void CloseDB(DB* db) { delete db; } +#ifndef ROCKSDB_LITE +TEST(Compression, DBWithSimpleRLECompressor) { + Options options; + std::string dbname = test::PerThreadDBPath("compression_test"); + ASSERT_OK(DestroyDB(dbname, options)); + + // Select SimpleRLECompressor through options.compressor + options.create_if_missing = true; + ConfigOptions config_options; + auto library = + config_options.registry->AddLibrary("DBWithSimpleRLECompressor"); + AddCompressorFactory(library); + Status s = Compressor::CreateFromString( + config_options, SimpleRLECompressor::kClassName(), &options.compressor); + ASSERT_OK(s); + ASSERT_NE(options.compressor, nullptr); + SimpleRLECompressor* compressor = + reinterpret_cast(options.compressor.get()); + unsigned int compression_type = options.compressor->GetCompressionType(); + + // Open database + DB* db = nullptr; + s = DB::Open(options, dbname, &db); + ASSERT_OK(s); + ASSERT_NE(db, nullptr); + ASSERT_EQ(compressor->num_compress_calls, 0); + ASSERT_EQ(compressor->num_uncompress_calls, 0); + + // Write 200 values, each 20 bytes + int num_keys = 200; + std::string val = "aaaaaaaaaabbbbbbbbbb"; + WriteDBAndFlush(db, num_keys, val); + ASSERT_EQ(compressor->num_compress_calls, 3); + + // Read and verify + ReadOptions ro; + std::string result; + for (int i = 0; i < num_keys; i++) { + std::string key = std::to_string(i); + s = db->Get(ro, key, &result); + ASSERT_OK(s); + ASSERT_EQ(result, val); + } + // Index block not compressed, as not passing GoodCompressionRatio test + ASSERT_EQ(compressor->num_uncompress_calls, 2); + + // Verify options file + DBOptions db_options; + std::vector cf_descs; + s = LoadLatestOptions(config_options, db->GetName(), &db_options, &cf_descs); + ASSERT_OK(s); + if (BuiltinCompressor::TypeSupported(kSnappyCompression)) { + ASSERT_EQ(cf_descs[0].options.compression, kSnappyCompression); + } else { + ASSERT_EQ(cf_descs[0].options.compression, kNoCompression); + } + ASSERT_EQ(cf_descs[0].options.compressor->GetCompressionType(), + compression_type); + + CloseDB(db); + + // Reopen database + compressor->num_compress_calls = 0; + compressor->num_uncompress_calls = 0; + db = nullptr; + Options reopen_options; + s = DB::Open(reopen_options, dbname, &db); + ASSERT_OK(s); + ASSERT_NE(db, nullptr); + + // Verify table properties + TablePropertiesCollection all_tables_props; + s = db->GetPropertiesOfAllTables(&all_tables_props); + ASSERT_OK(s); + for (auto it = all_tables_props.begin(); it != all_tables_props.end(); ++it) { + ASSERT_EQ(it->second->compression_name, std::string(compressor->Name())); + } + + // Read and verify + for (int i = 0; i < num_keys; i++) { + std::string key = std::to_string(i); + s = db->Get(ro, key, &result); + ASSERT_OK(s); + ASSERT_EQ(result, val); + } + ASSERT_EQ(compressor->num_compress_calls, 0); + ASSERT_EQ(compressor->num_uncompress_calls, 2); + + CloseDB(db); + ASSERT_OK(DestroyDB(dbname, options)); +} +#endif // ROCKSDB_LITE + TEST(Compression, DBWithZlibAndCompressionOptions) { if (!BuiltinCompressor::TypeSupported(kZlibCompression)) { ROCKSDB_GTEST_BYPASS("Test requires ZLIB compression"); @@ -199,6 +775,135 @@ TEST(Compression, DBWithCompressionPerLevel) { ASSERT_OK(DestroyDB(dbname, options)); } +TEST(Compression, DBwithZlibAndCompressorOptions) { + if (!ZlibCompressor().Supported()) { + return; + } + + Options options; + std::string dbname = test::PerThreadDBPath("compression_test"); + ASSERT_OK(DestroyDB(dbname, options)); + + // Select Zlib and its options through options.compressor + options.create_if_missing = true; + Status s; +#ifndef ROCKSDB_LITE + ConfigOptions config_options; + s = Compressor::CreateFromString( + config_options, + "id=" + std::string(ZlibCompressor::kClassName()) + ";window_bits=-13", + &options.compressor); + ASSERT_OK(s); +#else + CompressionOptions compression_opts; + compression_opts.window_bits = -13; + options.compressor = + BuiltinCompressor::GetCompressor(kZlibCompression, compression_opts); +#endif + ASSERT_NE(options.compressor, nullptr); + + // Open database + DB* db = nullptr; + s = DB::Open(options, dbname, &db); + ASSERT_OK(s); + ASSERT_NE(db, nullptr); + + // Write 200 values, each 20 bytes + WriteDBAndFlush(db, 200, "aaaaaaaaaabbbbbbbbbb"); + +#ifndef ROCKSDB_LITE + CompressionType compression_type = options.compressor->GetCompressionType(); + + // Verify ZlibCompressor created with selected options + auto compressor = BuiltinCompressor::GetCompressor(compression_type); + ASSERT_NE(compressor, nullptr); + std::string value; + s = compressor->GetOption(config_options, "window_bits", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "-13"); + + // Verify table properties + TablePropertiesCollection all_tables_props; + s = db->GetPropertiesOfAllTables(&all_tables_props); + ASSERT_OK(s); + for (auto it = all_tables_props.begin(); it != all_tables_props.end(); ++it) { + ASSERT_EQ(it->second->compression_name, + BuiltinCompressor::TypeToString(kZlibCompression)); + } + + // Verify options file + DBOptions db_options; + std::vector cf_descs; + s = LoadLatestOptions(config_options, db->GetName(), &db_options, &cf_descs); + ASSERT_OK(s); + // CompressionOptions are still default + ASSERT_EQ(cf_descs[0].options.compression_opts.window_bits, -14); +#endif + + CloseDB(db); + + // Reopen database + db = nullptr; + Options reopen_options; + s = DB::Open(reopen_options, dbname, &db); + ASSERT_OK(s); + ASSERT_NE(db, nullptr); + +#ifndef ROCKSDB_LITE + // Verify table properties + s = db->GetPropertiesOfAllTables(&all_tables_props); + ASSERT_OK(s); + for (auto it = all_tables_props.begin(); it != all_tables_props.end(); ++it) { + ASSERT_EQ(it->second->compression_name, + BuiltinCompressor::TypeToString(kZlibCompression)); + } +#endif + + CloseDB(db); + ASSERT_OK(DestroyDB(dbname, options)); +} + +#ifndef ROCKSDB_LITE +TEST(Compression, DBWithDictionaryCompression) { + Options options; + std::string dbname = test::PerThreadDBPath("compression_test"); + ASSERT_OK(DestroyDB(dbname, options)); + + // Select compressor that supports dictionary compression + options.create_if_missing = true; + ConfigOptions config_options; + auto library = + config_options.registry->AddLibrary("DBWithDictionaryCompression"); + AddCompressorFactory(library); + Status s = Compressor::CreateFromString( + config_options, + "id=" + std::string(DummyDictionaryCompressor::kClassName()) + + ";max_dict_bytes=4096;", + &options.compressor); + ASSERT_OK(s); + ASSERT_NE(options.compressor, nullptr); + DummyDictionaryCompressor* compressor = + reinterpret_cast(options.compressor.get()); + + // Open database + DB* db = nullptr; + s = DB::Open(options, dbname, &db); + ASSERT_OK(s); + ASSERT_NE(db, nullptr); + + // Write 2000 values, each 20 bytes + int num_keys = 2000; + std::string val = "aaaaaaaaaabbbbbbbbbb"; + WriteDBAndFlush(db, num_keys, val); + + // Dictionary was enabled + ASSERT_EQ(compressor->num_create_dictionary_calls, 1); + + CloseDB(db); + ASSERT_OK(DestroyDB(dbname, options)); +} +#endif // ROCKSDB_LITE + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/util/compressor.cc b/util/compressor.cc index f18e6442b1c..8339a5debef 100644 --- a/util/compressor.cc +++ b/util/compressor.cc @@ -86,7 +86,7 @@ bool CreateIfMatches(const std::string& id, std::shared_ptr* c) { } } -static Status NewCompressor(const ConfigOptions& /*config_options*/, +static Status NewCompressor(const ConfigOptions& config_options, const std::string& id, std::shared_ptr* result) { if (CreateIfMatches(id, result) || @@ -100,6 +100,15 @@ static Status NewCompressor(const ConfigOptions& /*config_options*/, CreateIfMatches(id, result)) { return Status::OK(); } else { +#ifndef ROCKSDB_LITE + Status s = + LoadSharedObject(config_options, id, nullptr, result); + if (s.ok() && result != nullptr) { + return Status::OK(); + } +#else + (void)config_options; +#endif // ROCKSDB_LITE return Status::NotSupported("Cannot find compressor ", id); } } diff --git a/utilities/blob_db/blob_dump_tool.h b/utilities/blob_db/blob_dump_tool.h index 36250019b89..4922f6fac22 100644 --- a/utilities/blob_db/blob_dump_tool.h +++ b/utilities/blob_db/blob_dump_tool.h @@ -13,7 +13,6 @@ #include "file/random_access_file_reader.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" -#include "util/compressor.h" namespace ROCKSDB_NAMESPACE { namespace blob_db {