Skip to content

Commit

Permalink
Add pluggable compression
Browse files Browse the repository at this point in the history
  • Loading branch information
lucagiac81 committed Jan 16, 2024
1 parent e267425 commit 3bedd21
Show file tree
Hide file tree
Showing 47 changed files with 1,169 additions and 153 deletions.
2 changes: 1 addition & 1 deletion db/blob/blob_file_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ BlobFileBuilder::BlobFileBuilder(
immutable_options_(immutable_options),
min_blob_size_(mutable_cf_options->min_blob_size),
blob_file_size_(mutable_cf_options->blob_file_size),
blob_compressor_(mutable_cf_options->blob_compressor),
blob_compressor_(mutable_cf_options->derived_blob_compressor),
prepopulate_blob_cache_(mutable_cf_options->prepopulate_blob_cache),
file_options_(file_options),
write_options_(write_options),
Expand Down
2 changes: 1 addition & 1 deletion db/blob/blob_file_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@

#include "rocksdb/advanced_options.h"
#include "rocksdb/compression_type.h"
#include "rocksdb/compressor.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/types.h"
#include "util/compressor.h"

namespace ROCKSDB_NAMESPACE {

Expand Down
1 change: 0 additions & 1 deletion db/blob/blob_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "rocksdb/compression_type.h"
#include "rocksdb/rocksdb_namespace.h"
#include "util/autovector.h"
#include "util/compressor.h"

namespace ROCKSDB_NAMESPACE {

Expand Down
2 changes: 1 addition & 1 deletion db/blob/blob_read_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
#include <cinttypes>

#include "rocksdb/compression_type.h"
#include "rocksdb/compressor.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "util/autovector.h"
#include "util/compressor.h"

namespace ROCKSDB_NAMESPACE {

Expand Down
2 changes: 1 addition & 1 deletion db/blob/blob_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
#include "db/blob/blob_file_cache.h"
#include "db/blob/blob_read_request.h"
#include "rocksdb/cache.h"
#include "rocksdb/compressor.h"
#include "rocksdb/rocksdb_namespace.h"
#include "table/block_based/cachable_entry.h"
#include "util/autovector.h"
#include "util/compressor.h"

namespace ROCKSDB_NAMESPACE {

Expand Down
24 changes: 13 additions & 11 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,18 @@ void GetIntTblPropCollectorFactory(
Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
MutableCFOptions moptions(cf_options);
ImmutableCFOptions ioptions(cf_options);
if (moptions.compressor && !moptions.compressor->Supported()) {
if (moptions.derived_compressor &&
!moptions.derived_compressor->Supported()) {
return Status::InvalidArgument("Compression type " +
moptions.compressor->GetId() +
moptions.derived_compressor->GetId() +
" is not linked with the binary.");
} else if (moptions.bottommost_compressor &&
!moptions.bottommost_compressor->Supported()) {
return Status::InvalidArgument("Compression type " +
moptions.bottommost_compressor->GetId() +
" is not linked with the binary.");
} else if (!moptions.compressor_per_level.empty()) {
for (const auto& compressor : moptions.compressor_per_level) {
} else if (moptions.derived_bottommost_compressor &&
!moptions.derived_bottommost_compressor->Supported()) {
return Status::InvalidArgument(
"Compression type " + moptions.derived_bottommost_compressor->GetId() +
" is not linked with the binary.");
} else if (!moptions.derived_compressor_per_level.empty()) {
for (const auto& compressor : moptions.derived_compressor_per_level) {
if (compressor == nullptr) {
return Status::InvalidArgument("Compression type is invalid.");
} else if (!compressor->Supported()) {
Expand Down Expand Up @@ -153,9 +154,10 @@ Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
"should be nonzero if we're using zstd's dictionary generator.");
}
}
if (moptions.blob_compressor && !moptions.blob_compressor->Supported()) {
if (moptions.derived_blob_compressor &&
!moptions.derived_blob_compressor->Supported()) {
return Status::InvalidArgument("Blob compression type " +
moptions.blob_compressor->GetId() +
moptions.derived_blob_compressor->GetId() +
" is not linked with the binary.");
}
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion db/compact_files_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@

#include "db/db_impl/db_impl.h"
#include "port/port.h"
#include "rocksdb/compressor.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
#include "util/cast_util.h"
#include "util/compressor.h"
#include "util/string_util.h"

namespace ROCKSDB_NAMESPACE {
Expand Down
13 changes: 7 additions & 6 deletions db/compaction/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ std::shared_ptr<Compressor> GetCompressor(const VersionStorageInfo* vstorage,
// If bottommost_compression is set and we are compacting to the
// bottommost level then we should use it.
bool bottom_level = (level >= (vstorage->num_non_empty_levels() - 1));
if (moptions.bottommost_compressor != nullptr && bottom_level) {
return moptions.bottommost_compressor;
if (moptions.derived_bottommost_compressor != nullptr && bottom_level) {
return moptions.derived_bottommost_compressor;
}
// If the user has specified a different compression level for each level,
// then pick the compression for that level.
if (!moptions.compressor_per_level.empty()) {
if (!moptions.derived_compressor_per_level.empty()) {
// It is possible for level_ to be -1; in that case, we use level
// 0's compression. This occurs mostly in backwards compatibility
// situations when the builder doesn't know what level the file
Expand All @@ -101,7 +101,8 @@ std::shared_ptr<Compressor> GetCompressor(const VersionStorageInfo* vstorage,
assert(level == 0 || level >= base_level);
int lvl = std::max(0, level - base_level + 1);
int idx = std::min(
static_cast<int>(moptions.compressor_per_level.size()) - 1, lvl);
static_cast<int>(moptions.derived_compressor_per_level.size()) - 1,
lvl);
// If not specified directly by the user, compressors in
// compressor_per_level are instantiated using compression_opts. If the user
// enabled bottommost_compression_opts, we need to create a new compressor
Expand All @@ -112,10 +113,10 @@ std::shared_ptr<Compressor> GetCompressor(const VersionStorageInfo* vstorage,
moptions.compression_per_level[idx],
moptions.bottommost_compression_opts);
} else {
return moptions.compressor_per_level[idx];
return moptions.derived_compressor_per_level[idx];
}
} else {
return moptions.compressor;
return moptions.derived_compressor;
}
}

Expand Down
4 changes: 2 additions & 2 deletions db/compaction/compaction_picker_fifo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
vstorage, ioptions_, mutable_cf_options, mutable_db_options,
{comp_inputs}, 0, 16 * 1024 * 1024 /* output file size limit */,
0 /* max compaction bytes, not applicable */,
0 /* output path ID */, mutable_cf_options.compressor,
0 /* output path ID */, mutable_cf_options.derived_compressor,
Temperature::kUnknown, 0 /* max_subcompactions */, {},
/* is manual */ false, /* trim_ts */ "",
vstorage->CompactionScore(0),
Expand Down Expand Up @@ -419,7 +419,7 @@ Compaction* FIFOCompactionPicker::PickTemperatureChangeCompaction(
vstorage, ioptions_, mutable_cf_options, mutable_db_options,
std::move(inputs), 0, 0 /* output file size limit */,
0 /* max compaction bytes, not applicable */, 0 /* output path ID */,
mutable_cf_options.compressor, compaction_target_temp,
mutable_cf_options.derived_compressor, compaction_target_temp,
/* max_subcompactions */ 0, {}, /* is manual */ false, /* trim_ts */ "",
vstorage->CompactionScore(0),
/* is deletion compaction */ false, /* l0_files_might_overlap */ true,
Expand Down
2 changes: 1 addition & 1 deletion db/db_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "db/db_test_util.h"
#include "options/options_helper.h"
#include "port/stack_trace.h"
#include "rocksdb/compressor.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/merge_operator.h"
Expand All @@ -21,7 +22,6 @@
#include "table/block_based/block_based_table_reader.h"
#include "table/block_based/block_builder.h"
#include "test_util/sync_point.h"
#include "util/compressor.h"
#include "util/file_checksum_helper.h"
#include "util/random.h"
#include "utilities/counted_fs.h"
Expand Down
2 changes: 1 addition & 1 deletion db/db_block_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
#include "db/db_test_util.h"
#include "env/unique_id_gen.h"
#include "port/stack_trace.h"
#include "rocksdb/compressor.h"
#include "rocksdb/persistent_cache.h"
#include "rocksdb/statistics.h"
#include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/unique_id_impl.h"
#include "util/compressor.h"
#include "util/defer.h"
#include "util/hash.h"
#include "util/math.h"
Expand Down
8 changes: 4 additions & 4 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,14 @@ std::shared_ptr<Compressor> GetCompressionFlush(
// latency overhead is not offset by saving much space.
if (ioptions.compaction_style == kCompactionStyleUniversal) {
if (moptions.compaction_options_universal.compression_size_percent < 0) {
return moptions.compressor;
return moptions.derived_compressor;
} else {
return BuiltinCompressor::GetCompressor(kNoCompression);
}
} else if (moptions.compressor_per_level.empty()) {
return moptions.compressor;
} else if (moptions.derived_compressor_per_level.empty()) {
return moptions.derived_compressor;
} else {
return moptions.compressor_per_level[0];
return moptions.derived_compressor_per_level[0];
}
}

Expand Down
7 changes: 4 additions & 3 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1868,8 +1868,9 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
->compaction_style) /* output file size limit, not applicable */
,
LLONG_MAX /* max compaction bytes, not applicable */,
0 /* output path ID, not applicable */, mutable_cf_options.compressor,
Temperature::kUnknown, 0 /* max_subcompactions, not applicable */,
0 /* output path ID, not applicable */,
mutable_cf_options.derived_compressor, Temperature::kUnknown,
0 /* max_subcompactions, not applicable */,
{} /* grandparents, not applicable */, false /* is manual */,
"" /* trim_ts */, -1 /* score, not applicable */,
false /* is deletion compaction, not applicable */,
Expand Down Expand Up @@ -4179,7 +4180,7 @@ void DBImpl::BuildCompactionJobInfo(
newf.first, file_number, meta.oldest_blob_file_number});
}
compaction_job_info->blob_compression_type =
c->mutable_cf_options()->blob_compressor->GetCompressionType();
c->mutable_cf_options()->derived_blob_compressor->GetCompressionType();

// Update BlobFilesInfo.
for (const auto& blob_file : c->edit()->GetBlobFileAdditions()) {
Expand Down
23 changes: 22 additions & 1 deletion db/db_options_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1414,7 +1414,8 @@ TEST_F(DBOptionsTest, ChangeCompression) {
compression_used = CompressionType::kLZ4Compression;
compacted = false;
ASSERT_OK(dbfull()->SetOptions(
{{"bottommost_compression", "kSnappyCompression"},
{{"bottommost_compressor", "nullptr"},
{"bottommost_compression", "kSnappyCompression"},
{"bottommost_compression_opts", "0:6:0:0:4:true"}}));
ASSERT_OK(Put("foo", "foofoofoo"));
ASSERT_OK(Put("bar", "foofoofoo"));
Expand All @@ -1429,6 +1430,26 @@ TEST_F(DBOptionsTest, ChangeCompression) {
ASSERT_EQ(32767, compression_opt_used.level);
// Right now parallel_level is not yet allowed to be changed.

if (!Zlib_Supported()) {
return;
}
compression_used = CompressionType::kLZ4Compression;
compacted = false;
ASSERT_OK(dbfull()->SetOptions(
{{"bottommost_compressor", "nullptr"},
{"bottommost_compression", "kZlibCompression"},
{"bottommost_compression_opts", "0:6:0:0:4:true"}}));
ASSERT_OK(Put("foo", "foofoofoo"));
ASSERT_OK(Put("bar", "foofoofoo"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo", "foofoofoo"));
ASSERT_OK(Put("bar", "foofoofoo"));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_TRUE(compacted);
ASSERT_EQ(CompressionType::kZlibCompression, compression_used);
ASSERT_EQ(6, compression_opt_used.level);

SyncPoint::GetInstance()->DisableProcessing();
}

Expand Down
2 changes: 1 addition & 1 deletion db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "options/options_helper.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/compressor.h"
#include "rocksdb/experimental.h"
#include "rocksdb/iostats_context.h"
#include "rocksdb/persistent_cache.h"
Expand All @@ -26,7 +27,6 @@
#include "rocksdb/utilities/replayer.h"
#include "rocksdb/wal_filter.h"
#include "test_util/testutil.h"
#include "util/compressor.h"
#include "util/random.h"
#include "utilities/fault_injection_env.h"

Expand Down
5 changes: 3 additions & 2 deletions db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -536,8 +536,9 @@ void ExternalSstFileIngestionJob::CreateEquivalentFileIngestingCompactions() {
*/
,
LLONG_MAX /* max compaction bytes, not applicable */,
0 /* output path ID, not applicable */, mutable_cf_options.compressor,
Temperature::kUnknown, 0 /* max_subcompaction, not applicable */,
0 /* output path ID, not applicable */,
mutable_cf_options.derived_compressor, Temperature::kUnknown,
0 /* max_subcompaction, not applicable */,
{} /* grandparents, not applicable */, false /* is manual */,
"" /* trim_ts */, -1 /* score, not applicable */,
false /* is deletion compaction, not applicable */,
Expand Down
2 changes: 1 addition & 1 deletion db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,7 @@ std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
info->table_properties = table_properties_;
info->flush_reason = flush_reason_;
info->blob_compression_type =
mutable_cf_options_.blob_compressor->GetCompressionType();
mutable_cf_options_.derived_blob_compressor->GetCompressionType();

// Update BlobFilesInfo.
for (const auto& blob_file : edit_->GetBlobFileAdditions()) {
Expand Down
2 changes: 1 addition & 1 deletion db/table_properties_collector_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void MakeBuilder(
const WriteOptions write_options;
TableBuilderOptions tboptions(
ioptions, moptions, read_options, write_options, internal_comparator,
int_tbl_prop_collector_factories, moptions.compressor,
int_tbl_prop_collector_factories, moptions.derived_compressor,
kTestColumnFamilyId, kTestColumnFamilyName, kTestLevel);
builder->reset(NewTableBuilder(tboptions, writable->get()));
}
Expand Down
27 changes: 27 additions & 0 deletions include/rocksdb/advanced_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

namespace ROCKSDB_NAMESPACE {

class Compressor;
class Slice;
class SliceTransform;
class TablePropertiesCollectorFactory;
Expand Down Expand Up @@ -427,6 +428,19 @@ struct AdvancedColumnFamilyOptions {
// Dynamically changeable through SetOptions() API
std::vector<CompressionType> compression_per_level;

// Similar to compression_per_level, but the algorithms are encapsulated in
// Compressor objects. This adds the ability to select custom compressors,
// beyond the built-in ones provided through CompressionType.
//
// If compressor_per_level is specified (not empty), it overrides
// compression_per_level.
//
// If compressor_per_level is not specified (empty),
// compression_per_level is applied as described for that option.
//
// Default: empty
std::vector<std::shared_ptr<Compressor>> compressor_per_level;

// Number of levels for this database
int num_levels = 7;

Expand Down Expand Up @@ -927,6 +941,19 @@ struct AdvancedColumnFamilyOptions {
// Dynamically changeable through the SetOptions() API
CompressionType blob_compression_type = kNoCompression;

// Similar to blob_compression_type, but the algorithm is encapsulated in a
// Compressor class. This adds the ability to select plugin compressors,
// beyond the built-in ones provided through CompressionType.
//
// If blob_compressor is specified (not null), it overrides
// blob_compression_type.
//
// If blob_compressor is not specified (null), blob_compression_type is
// applied.
//
// Default: nullptr
std::shared_ptr<Compressor> blob_compressor = nullptr;

// Enables garbage collection of blobs. Blob GC is performed as part of
// compaction. Valid blobs residing in blob files older than a cutoff get
// relocated to new files as they are encountered during compaction, which
Expand Down
4 changes: 4 additions & 0 deletions include/rocksdb/compression_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ enum CompressionType : unsigned char {
// eventually remove the option from the public API.
kZSTDNotFinalCompression = 0x40,

// Used as block compression type identifier when using a non built-in
// compressor.
kPluginCompression = 0x41,

// kDisableCompressionOption is used to disable some compression options.
kDisableCompressionOption = 0xff,
};
Expand Down
4 changes: 3 additions & 1 deletion util/compressor.h → include/rocksdb/compressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ class Compressor : public Customizable {
static std::vector<std::string> GetDictSupported();

// Get the numeric type associated with this compressor
virtual CompressionType GetCompressionType() const = 0;
virtual CompressionType GetCompressionType() const {
return kPluginCompression;
};

// Whether the compressor is supported.
// For example, a compressor can implement this method to verify its
Expand Down
Loading

0 comments on commit 3bedd21

Please sign in to comment.