diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a76da360c872..e3db36395555 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -84,7 +84,7 @@ add_third_party( add_third_party( lz4 - URL https://github.com/lz4/lz4/archive/refs/tags/v1.9.4.tar.gz + URL https://github.com/lz4/lz4/archive/refs/tags/v1.10.0.tar.gz BUILD_IN_SOURCE 1 CONFIGURE_COMMAND echo skip diff --git a/src/server/detail/compressor.cc b/src/server/detail/compressor.cc index 331eb6349d14..8a2c741c2422 100644 --- a/src/server/detail/compressor.cc +++ b/src/server/detail/compressor.cc @@ -52,32 +52,47 @@ io::Result ZstdCompressor::Compress(io::Bytes data) { class Lz4Compressor : public CompressorImpl { public: Lz4Compressor() { - lz4_pref_.compressionLevel = compression_level_; + LZ4F_errorCode_t code = LZ4F_createCompressionContext(&cctx_, LZ4F_VERSION); + CHECK(!LZ4F_isError(code)); } ~Lz4Compressor() { + LZ4F_errorCode_t code = LZ4F_freeCompressionContext(cctx_); + CHECK(!LZ4F_isError(code)); } // compress a string of data io::Result Compress(io::Bytes data); private: - LZ4F_preferences_t lz4_pref_ = LZ4F_INIT_PREFERENCES; + LZ4F_cctx* cctx_; }; io::Result Lz4Compressor::Compress(io::Bytes data) { - lz4_pref_.frameInfo.contentSize = data.size(); - size_t buf_size = LZ4F_compressFrameBound(data.size(), &lz4_pref_); + LZ4F_preferences_t lz4_pref = LZ4F_INIT_PREFERENCES; + lz4_pref.compressionLevel = compression_level_; + lz4_pref.frameInfo.contentSize = data.size(); + + size_t buf_size = LZ4F_compressFrameBound(data.size(), &lz4_pref); if (compr_buf_.capacity() < buf_size) { compr_buf_.reserve(buf_size); } +// TODO: to remove LZ4F_compressFrame code once we confirm this code actually works. +#if 1 + size_t frame_size = + LZ4F_compressFrame_usingCDict(cctx_, compr_buf_.data(), compr_buf_.capacity(), data.data(), + data.size(), nullptr /* dict */, &lz4_pref); +#else size_t frame_size = LZ4F_compressFrame(compr_buf_.data(), compr_buf_.capacity(), data.data(), - data.size(), &lz4_pref_); + data.size(), &lz4_pref); +#endif + if (LZ4F_isError(frame_size)) { LOG(ERROR) << "LZ4F_compressFrame failed with error " << LZ4F_getErrorName(frame_size); return nonstd::make_unexpected(make_error_code(errc::operation_not_supported)); } + compressed_size_total_ += frame_size; uncompressed_size_total_ += data.size(); return io::Bytes(compr_buf_.data(), frame_size); diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 1567fa1c8778..83cbdb616c31 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -1604,13 +1604,20 @@ void SerializerBase::CompressBlob() { } AllocateCompressorOnce(); - // Compress the data - auto ec = compressor_impl_->Compress(blob_to_compress); - if (!ec) { + + // Compress the data. We copy compressed data once into the internal buffer of compressor_impl_ + // and then we copy it again into the mem_buf_. + // + // TODO: it is possible to avoid double copying here by changing the compressor interface, + // so that the compressor will accept the output buffer and return the final size. This requires + // exposing the additional compress bound interface as well. + io::Result res = compressor_impl_->Compress(blob_to_compress); + if (!res) { ++compression_stats_->compression_failed; return; } - Bytes compressed_blob = *ec; + + Bytes compressed_blob = *res; if (compressed_blob.length() > blob_size * kMinCompressionReductionPrecentage) { ++compression_stats_->compression_no_effective; return; diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index 08941e68eeb7..861141106460 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -42,6 +42,14 @@ class RdbTest : public BaseFamilyTest { protected: void SetUp(); + static void SetUpTestSuite() { + static bool init = true; + if (exchange(init, false)) { + fb2::SetDefaultStackResource(&fb2::std_malloc_resource, 32_KB); + } + BaseFamilyTest::SetUpTestSuite(); + } + io::FileSource GetSource(string name); std::error_code LoadRdb(const string& filename) { @@ -56,6 +64,7 @@ class RdbTest : public BaseFamilyTest { void RdbTest::SetUp() { InitWithDbFilename(); + CHECK_EQ(zmalloc_used_memory_tl, 0); max_memory_limit = 40000000; } @@ -457,6 +466,8 @@ TEST_F(RdbTest, JsonTest) { class HllRdbTest : public RdbTest, public testing::WithParamInterface {}; TEST_P(HllRdbTest, Hll) { + LOG(INFO) << " max memory: " << max_memory_limit + << " used_mem_current: " << used_mem_current.load(); auto ec = LoadRdb("hll.rdb"); ASSERT_FALSE(ec) << ec.message();