From 183dc169da54a598f299cf3e7529e8567e7d950c Mon Sep 17 00:00:00 2001 From: "Ma, Rong" Date: Wed, 6 Dec 2023 22:33:52 +0800 Subject: [PATCH] address comments --- velox/common/compression/Compression.cpp | 8 +- velox/common/compression/Compression.h | 2 - .../compression/tests/CompressionTest.cpp | 4 - velox/common/compression/v2/Compression.cpp | 75 +++----- velox/common/compression/v2/Compression.h | 128 ++++++------- .../v2/HadoopCompressionFormat.cpp | 6 +- .../compression/v2/HadoopCompressionFormat.h | 10 +- .../common/compression/v2/Lz4Compression.cpp | 152 ++++++++------- velox/common/compression/v2/Lz4Compression.h | 79 ++++---- .../compression/v2/tests/CompressionTest.cpp | 173 +++++++++--------- 10 files changed, 304 insertions(+), 333 deletions(-) diff --git a/velox/common/compression/Compression.cpp b/velox/common/compression/Compression.cpp index 779ed08a6d843..3843f5902a980 100644 --- a/velox/common/compression/Compression.cpp +++ b/velox/common/compression/Compression.cpp @@ -76,10 +76,6 @@ std::string compressionKindToString(CompressionKind kind) { return "lz4"; case CompressionKind_GZIP: return "gzip"; - case CompressionKind_LZ4RAW: - return "lz4_raw"; - case CompressionKind_LZ4HADOOP: - return "lz4_hadoop"; } return folly::to("unknown - ", kind); } @@ -93,9 +89,7 @@ CompressionKind stringToCompressionKind(const std::string& kind) { {"lzo", CompressionKind_LZO}, {"zstd", CompressionKind_ZSTD}, {"lz4", CompressionKind_LZ4}, - {"gzip", CompressionKind_GZIP}, - {"lz4_raw", CompressionKind_LZ4RAW}, - {"lz4_hadoop", CompressionKind_LZ4HADOOP}}; + {"gzip", CompressionKind_GZIP}}; auto iter = stringToCompressionKindMap.find(kind); if (iter != stringToCompressionKindMap.end()) { return iter->second; diff --git a/velox/common/compression/Compression.h b/velox/common/compression/Compression.h index 072c59147edc6..317f9717a2ae5 100644 --- a/velox/common/compression/Compression.h +++ b/velox/common/compression/Compression.h @@ -29,8 +29,6 @@ enum CompressionKind { CompressionKind_ZSTD = 4, CompressionKind_LZ4 = 5, CompressionKind_GZIP = 6, - CompressionKind_LZ4RAW = 7, - CompressionKind_LZ4HADOOP = 8, CompressionKind_MAX = INT64_MAX }; diff --git a/velox/common/compression/tests/CompressionTest.cpp b/velox/common/compression/tests/CompressionTest.cpp index bad9231f7a7ef..312c103e2a98c 100644 --- a/velox/common/compression/tests/CompressionTest.cpp +++ b/velox/common/compression/tests/CompressionTest.cpp @@ -32,8 +32,6 @@ TEST_F(CompressionTest, testCompressionNames) { EXPECT_EQ("lz4", compressionKindToString(CompressionKind_LZ4)); EXPECT_EQ("zstd", compressionKindToString(CompressionKind_ZSTD)); EXPECT_EQ("gzip", compressionKindToString(CompressionKind_GZIP)); - EXPECT_EQ("lz4_raw", compressionKindToString(CompressionKind_LZ4RAW)); - EXPECT_EQ("lz4_hadoop", compressionKindToString(CompressionKind_LZ4HADOOP)); EXPECT_EQ( "unknown - 99", compressionKindToString(static_cast(99))); @@ -59,8 +57,6 @@ TEST_F(CompressionTest, stringToCompressionKind) { EXPECT_EQ(stringToCompressionKind("lz4"), CompressionKind_LZ4); EXPECT_EQ(stringToCompressionKind("zstd"), CompressionKind_ZSTD); EXPECT_EQ(stringToCompressionKind("gzip"), CompressionKind_GZIP); - EXPECT_EQ(stringToCompressionKind("lz4_raw"), CompressionKind_LZ4RAW); - EXPECT_EQ(stringToCompressionKind("lz4_hadoop"), CompressionKind_LZ4HADOOP); VELOX_ASSERT_THROW( stringToCompressionKind("bz2"), "Not support compression kind bz2"); } diff --git a/velox/common/compression/v2/Compression.cpp b/velox/common/compression/v2/Compression.cpp index 9b745baff3441..357dad8e5f9f0 100644 --- a/velox/common/compression/v2/Compression.cpp +++ b/velox/common/compression/v2/Compression.cpp @@ -49,7 +49,6 @@ bool Codec::supportsGetUncompressedLength(CompressionKind kind) { bool Codec::supportsCompressionLevel(CompressionKind kind) { switch (kind) { case CompressionKind::CompressionKind_LZ4: - case CompressionKind::CompressionKind_LZ4RAW: return true; default: return false; @@ -59,14 +58,19 @@ bool Codec::supportsCompressionLevel(CompressionKind kind) { bool Codec::supportsStreamingCompression(CompressionKind kind) { switch (kind) { case CompressionKind::CompressionKind_LZ4: - case CompressionKind::CompressionKind_GZIP: - case CompressionKind::CompressionKind_ZLIB: return true; default: return false; } } +bool Codec::supportsCompressFixedLength(CompressionKind kind) { + switch (kind) { + default: + return false; + } +} + int32_t Codec::maximumCompressionLevel(CompressionKind kind) { checkSupportsCompressionLevel(kind); auto codec = Codec::create(kind); @@ -104,20 +108,29 @@ std::unique_ptr Codec::create( std::unique_ptr codec; switch (kind) { case CompressionKind::CompressionKind_LZ4: + if (auto options = dynamic_cast(&codecOptions)) { + switch (options->type) { + case Lz4CodecOptions::kLz4Frame: + codec = makeLz4FrameCodec(compressionLevel); + break; + case Lz4CodecOptions::kLz4Raw: + codec = makeLz4RawCodec(compressionLevel); + break; + case Lz4CodecOptions::kLz4Hadoop: + codec = makeLz4HadoopRawCodec(compressionLevel); + break; + } + } + // By default, create LZ4 Frame codec. codec = makeLz4FrameCodec(compressionLevel); break; - case CompressionKind::CompressionKind_LZ4RAW: - codec = makeLz4RawCodec(compressionLevel); - break; - case CompressionKind::CompressionKind_LZ4HADOOP: - codec = makeLz4HadoopRawCodec(); - break; default: break; } if (codec == nullptr) { - VELOX_UNSUPPORTED("LZO codec not implemented"); + VELOX_UNSUPPORTED( + "{} codec not implemented", compressionKindToString(kind)); } codec->init(); @@ -135,8 +148,6 @@ bool Codec::isAvailable(CompressionKind kind) { switch (kind) { case CompressionKind::CompressionKind_NONE: case CompressionKind::CompressionKind_LZ4: - case CompressionKind::CompressionKind_LZ4RAW: - case CompressionKind::CompressionKind_LZ4HADOOP: return true; case CompressionKind::CompressionKind_SNAPPY: case CompressionKind::CompressionKind_GZIP: @@ -150,45 +161,15 @@ bool Codec::isAvailable(CompressionKind kind) { std::optional Codec::getUncompressedLength( uint64_t inputLength, - const uint8_t* input, - std::optional uncompressedLength) const { - if (inputLength == 0) { - if (uncompressedLength.value_or(0) != 0) { - VELOX_USER_CHECK_EQ( - uncompressedLength.value_or(0), - 0, - "Invalid uncompressed length: {}.", - *uncompressedLength); - } - return 0; - } - auto actualLength = - doGetUncompressedLength(inputLength, input, uncompressedLength); - if (actualLength) { - if (uncompressedLength) { - VELOX_USER_CHECK_EQ( - *actualLength, - *uncompressedLength, - "Invalid uncompressed length: {}.", - *uncompressedLength); - } - return actualLength; - } - return uncompressedLength; + const uint8_t* input) const { + return std::nullopt; } -std::optional Codec::doGetUncompressedLength( - uint64_t inputLength, +uint64_t Codec::compressFixedLength( const uint8_t* input, - std::optional uncompressedLength) const { - return uncompressedLength; -} - -uint64_t Codec::compressPartial( uint64_t inputLength, - const uint8_t* input, - uint64_t outputLength, - uint8_t* output) { + uint8_t* output, + uint64_t outputLength) { VELOX_UNSUPPORTED("'{}' doesn't support partial compression", name()); } } // namespace facebook::velox::common diff --git a/velox/common/compression/v2/Compression.h b/velox/common/compression/v2/Compression.h index 1816ca4e7007f..da8c4ddd91ca1 100644 --- a/velox/common/compression/v2/Compression.h +++ b/velox/common/compression/v2/Compression.h @@ -30,51 +30,50 @@ namespace facebook::velox::common { static constexpr int32_t kUseDefaultCompressionLevel = std::numeric_limits::min(); -// Streaming compressor interface. -class Compressor { +class StreamingCompressor { public: - virtual ~Compressor() = default; + virtual ~StreamingCompressor() = default; struct CompressResult { uint64_t bytesRead; uint64_t bytesWritten; bool outputTooSmall; }; + struct FlushResult { uint64_t bytesWritten; bool outputTooSmall; }; + struct EndResult { uint64_t bytesWritten; bool outputTooSmall; }; /// Compress some input. - /// If bytes_read is 0 on return, then a larger output buffer should be - /// supplied. + /// If CompressResult.outputTooSmall is true on return, then a larger output + /// buffer should be supplied. virtual CompressResult compress( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, - uint8_t* output) = 0; + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) = 0; /// Flush part of the compressed output. - /// If outputTooSmall is true on return, flush() should be called again - /// with a larger buffer. - virtual FlushResult flush(uint64_t outputLength, uint8_t* output) = 0; + /// If FlushResult.outputTooSmall is true on return, flush() should be called + /// again with a larger buffer. + virtual FlushResult flush(uint8_t* output, uint64_t outputLength) = 0; /// End compressing, doing whatever is necessary to end the stream. - /// If outputTooSmall is true on return, end() should be called again - /// with a larger buffer. Otherwise, the Compressor should not be used - /// anymore. - /// end() implies flush(). - virtual EndResult end(uint64_t outputLength, uint8_t* output) = 0; + /// If EndResult.outputTooSmall is true on return, end() should be called + /// again with a larger buffer. Otherwise, the StreamingCompressor should not + /// be used anymore. end() will flush the compressed output. + virtual EndResult end(uint8_t* output, uint64_t outputLength) = 0; }; -// Streaming decompressor interface -class Decompressor { +class StreamingDecompressor { public: - virtual ~Decompressor() = default; + virtual ~StreamingDecompressor() = default; struct DecompressResult { uint64_t bytesRead; @@ -86,19 +85,18 @@ class Decompressor { /// If outputTooSmall is true on return, a larger output buffer needs /// to be supplied. virtual DecompressResult decompress( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, - uint8_t* output) = 0; + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) = 0; - // Return whether the compressed stream is finished. + /// Return whether the compressed stream is finished. virtual bool isFinished() = 0; - // Reinitialize decompressor, making it ready for a new compressed stream. + /// Reinitialize decompressor, making it ready for a new compressed stream. virtual void reset() = 0; }; -// Compression codec options class CodecOptions { public: explicit CodecOptions(int32_t compressionLevel = kUseDefaultCompressionLevel) @@ -109,7 +107,6 @@ class CodecOptions { int32_t compressionLevel; }; -// Compression codec class Codec { public: virtual ~Codec() = default; @@ -118,29 +115,33 @@ class Codec { /// should use its default compression level. static int32_t useDefaultCompressionLevel(); - // Create a kind for the given compression algorithm with CodecOptions. + /// Create a kind for the given compression algorithm with CodecOptions. static std::unique_ptr create( CompressionKind kind, const CodecOptions& codecOptions = CodecOptions{}); - // Create a kind for the given compression algorithm. + /// Create a kind for the given compression algorithm. static std::unique_ptr create( CompressionKind kind, int32_t compressionLevel); - // Return true if support for indicated kind has been enabled. + /// Return true if support for indicated kind has been enabled. static bool isAvailable(CompressionKind kind); /// Return true if indicated kind supports extracting uncompressed length /// from compressed data. static bool supportsGetUncompressedLength(CompressionKind kind); - // Return true if indicated kind supports setting a compression level. + /// Return true if indicated kind supports setting a compression level. static bool supportsCompressionLevel(CompressionKind kind); - // Return true if indicated kind supports creating streaming de/compressor. + /// Return true if indicated kind supports creating streaming de/compressor. static bool supportsStreamingCompression(CompressionKind kind); + /// Return true if indicated kind supports one-shot compression with fixed + /// compressed length. + static bool supportsCompressFixedLength(CompressionKind kind); + /// Return the smallest supported compression level for the kind /// Note: This function creates a temporary Codec instance. static int32_t minimumCompressionLevel(CompressionKind kind); @@ -153,13 +154,13 @@ class Codec { /// Note: This function creates a temporary Codec instance. static int32_t defaultCompressionLevel(CompressionKind kind); - // Return the smallest supported compression level. + /// Return the smallest supported compression level. virtual int32_t minimumCompressionLevel() const = 0; - // Return the largest supported compression level. + /// Return the largest supported compression level. virtual int32_t maximumCompressionLevel() const = 0; - // Return the default compression level. + /// Return the default compression level. virtual int32_t defaultCompressionLevel() const = 0; /// One-shot decompression function. @@ -169,10 +170,10 @@ class Codec { /// compression. Depending on the codec (e.g. LZ4), different formats may /// be used. virtual uint64_t decompress( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, - uint8_t* output) = 0; + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) = 0; /// Performs one-shot compression. /// `outputLength` must first have been computed using maxCompressedLength(). @@ -181,10 +182,10 @@ class Codec { /// decompression. Depending on the codec (e.g. LZ4), different formats may /// be used. virtual uint64_t compress( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, - uint8_t* output) = 0; + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) = 0; /// Performs one-shot compression. /// This function compresses data and writes the output up to the specified @@ -192,55 +193,48 @@ class Codec { /// data, the function doesn't fail. Instead, it returns the number of bytes /// actually written to the output buffer. Any remaining data that couldn't /// be written in this call will be written in subsequent calls to this - /// function. This is useful when fixed-size compression blocks are required + /// function. This is useful when fixed-length compression blocks are required /// by the caller. /// Note: Only Gzip and Zstd codec supports this function. - virtual uint64_t compressPartial( - uint64_t inputLength, + virtual uint64_t compressFixedLength( const uint8_t* input, - uint64_t outputLength, - uint8_t* output); + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength); - // Maximum compressed length of given input length. + /// Maximum compressed length of given input length. virtual uint64_t maxCompressedLength(uint64_t inputLength) = 0; - /// Extracts the uncompressed length from the compressed data if possible. - /// If the codec doesn't store the uncompressed length, or the data is - /// corrupted it returns the given uncompressedLength. - /// If the uncompressed length is stored in the compressed data and - /// uncompressedLength is not none and they do not match a std::runtime_error - /// is thrown. + /// Retrieves the actual uncompressed length of data using the specified + /// compression library. + /// Note: This functionality is not universally supported by all compression + /// libraries. If not supported, `std::nullopt` will be returned. std::optional getUncompressedLength( uint64_t inputLength, - const uint8_t* input, - std::optional uncompressedLength = std::nullopt) const; + const uint8_t* input) const; - // Create a streaming compressor instance. - virtual std::shared_ptr makeCompressor() = 0; + /// Create a streaming compressor instance. + virtual std::shared_ptr makeStreamingCompressor() = 0; - // Create a streaming compressor instance. - virtual std::shared_ptr makeDecompressor() = 0; + /// Create a streaming compressor instance. + virtual std::shared_ptr + makeStreamingDecompressor() = 0; - // This Codec's compression type. + /// This Codec's compression type. virtual CompressionKind compressionKind() const = 0; - // The name of this Codec's compression type. + /// The name of this Codec's compression type. std::string name() const { return compressionKindToString(compressionKind()); } - // This Codec's compression level, if applicable. + /// This Codec's compression level, if applicable. virtual int32_t compressionLevel() const { return kUseDefaultCompressionLevel; } private: - // Initializes the codec's resources. + /// Initializes the codec's resources. virtual void init(); - - virtual std::optional doGetUncompressedLength( - uint64_t inputLength, - const uint8_t* input, - std::optional uncompressedLength) const; }; } // namespace facebook::velox::common diff --git a/velox/common/compression/v2/HadoopCompressionFormat.cpp b/velox/common/compression/v2/HadoopCompressionFormat.cpp index 4f11884dfb553..df0f7034ea417 100644 --- a/velox/common/compression/v2/HadoopCompressionFormat.cpp +++ b/velox/common/compression/v2/HadoopCompressionFormat.cpp @@ -22,10 +22,10 @@ namespace facebook::velox::common { bool HadoopCompressionFormat::tryDecompressHadoop( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, + uint64_t inputLength, uint8_t* output, + uint64_t outputLength, uint64_t& actualDecompressedSize) { // Parquet files written with the Hadoop Lz4RawCodec use their own framing. // The input buffer can contain an arbitrary number of "frames", each @@ -58,7 +58,7 @@ bool HadoopCompressionFormat::tryDecompressHadoop( // Try decompressing and compare with expected decompressed length. try { auto decompressedSize = decompressInternal( - expectedCompressedSize, input, outputLength, output); + input, expectedCompressedSize, output, outputLength); if (decompressedSize != expectedDecompressedSize) { return false; } diff --git a/velox/common/compression/v2/HadoopCompressionFormat.h b/velox/common/compression/v2/HadoopCompressionFormat.h index aaf5e71e97a37..a23df0622ef93 100644 --- a/velox/common/compression/v2/HadoopCompressionFormat.h +++ b/velox/common/compression/v2/HadoopCompressionFormat.h @@ -23,17 +23,17 @@ namespace facebook::velox::common { class HadoopCompressionFormat { protected: bool tryDecompressHadoop( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, + uint64_t inputLength, uint8_t* output, + uint64_t outputLength, uint64_t& actualDecompressedSize); virtual uint64_t decompressInternal( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, - uint8_t* output) = 0; + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) = 0; // Offset starting at which page data can be read/written. static constexpr uint64_t kPrefixLength = sizeof(uint32_t) * 2; diff --git a/velox/common/compression/v2/Lz4Compression.cpp b/velox/common/compression/v2/Lz4Compression.cpp index e46ec98021e2d..0afc3c6f258fa 100644 --- a/velox/common/compression/v2/Lz4Compression.cpp +++ b/velox/common/compression/v2/Lz4Compression.cpp @@ -38,7 +38,7 @@ LZ4F_preferences_t defaultPreferences(int compressionLevel) { } } // namespace -class LZ4Compressor : public Compressor { +class LZ4Compressor : public StreamingCompressor { public: explicit LZ4Compressor(int32_t compressionLevel); @@ -47,14 +47,14 @@ class LZ4Compressor : public Compressor { void init(); CompressResult compress( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, - uint8_t* output) override; + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; - FlushResult flush(uint64_t outputLength, uint8_t* output) override; + FlushResult flush(uint8_t* output, uint64_t outputLength) override; - EndResult end(uint64_t outputLength, uint8_t* output) override; + EndResult end(uint8_t* output, uint64_t outputLength) override; protected: void @@ -66,7 +66,7 @@ class LZ4Compressor : public Compressor { bool firstTime_; }; -class LZ4Decompressor : public Decompressor { +class LZ4Decompressor : public StreamingDecompressor { public: LZ4Decompressor() {} @@ -81,10 +81,10 @@ class LZ4Decompressor : public Decompressor { void reset() override; DecompressResult decompress( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, - uint8_t* output) override; + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; bool isFinished() override; @@ -113,11 +113,11 @@ void LZ4Compressor::init() { } } -Compressor::CompressResult LZ4Compressor::compress( - uint64_t inputLength, +StreamingCompressor::CompressResult LZ4Compressor::compress( const uint8_t* input, - uint64_t outputLength, - uint8_t* output) { + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { auto inputSize = static_cast(inputLength); auto outputSize = static_cast(outputLength); uint64_t bytesWritten = 0; @@ -144,9 +144,9 @@ Compressor::CompressResult LZ4Compressor::compress( return CompressResult{inputLength, bytesWritten, false}; } -Compressor::FlushResult LZ4Compressor::flush( - uint64_t outputLength, - uint8_t* output) { +StreamingCompressor::FlushResult LZ4Compressor::flush( + uint8_t* output, + uint64_t outputLength) { auto outputSize = static_cast(outputLength); uint64_t bytesWritten = 0; @@ -173,9 +173,9 @@ Compressor::FlushResult LZ4Compressor::flush( return FlushResult{bytesWritten, false}; } -Compressor::EndResult LZ4Compressor::end( - uint64_t outputLength, - uint8_t* output) { +StreamingCompressor::EndResult LZ4Compressor::end( + uint8_t* output, + uint64_t outputLength) { auto outputSize = static_cast(outputLength); uint64_t bytesWritten = 0; @@ -225,22 +225,24 @@ void common::LZ4Decompressor::init() { } void LZ4Decompressor::reset() { - if (ctx_) { #if defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER >= 10800 - // LZ4F_resetDecompressionContext appeared in 1.8.0. - LZ4F_resetDecompressionContext(ctx_); + // LZ4F_resetDecompressionContext appeared in 1.8.0 + VELOX_CHECK_NOT_NULL(ctx_); + LZ4F_resetDecompressionContext(ctx_); + finished_ = false; #else + if (ctx_ != nullptr) { LZ4F_freeDecompressionContext(ctx_); - init(); -#endif } + init(); +#endif } -Decompressor::DecompressResult LZ4Decompressor::decompress( - uint64_t inputLength, +StreamingDecompressor::DecompressResult LZ4Decompressor::decompress( const uint8_t* input, - uint64_t outputLength, - uint8_t* output) { + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { auto inputSize = static_cast(inputLength); auto outputSize = static_cast(outputLength); @@ -286,6 +288,10 @@ int32_t Lz4CodecBase::compressionLevel() const { return compressionLevel_; } +CompressionKind Lz4CodecBase::compressionKind() const { + return CompressionKind::CompressionKind_LZ4; +} + Lz4FrameCodec::Lz4FrameCodec(int32_t compressionLevel) : Lz4CodecBase(compressionLevel), prefs_(defaultPreferences(compressionLevel_)) {} @@ -296,10 +302,10 @@ uint64_t Lz4FrameCodec::maxCompressedLength(uint64_t inputLen) { } uint64_t Lz4FrameCodec::compress( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, - uint8_t* output) { + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { auto ret = LZ4F_compressFrame( output, static_cast(outputLength), @@ -313,16 +319,16 @@ uint64_t Lz4FrameCodec::compress( } uint64_t Lz4FrameCodec::decompress( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, - uint8_t* output) { - auto decompressor = makeDecompressor(); + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + auto decompressor = makeStreamingDecompressor(); uint64_t bytesWritten = 0; while (!decompressor->isFinished() && inputLength != 0) { auto result = - decompressor->decompress(inputLength, input, outputLength, output); + decompressor->decompress(input, inputLength, output, outputLength); input += result.bytesRead; inputLength -= result.bytesRead; output += result.bytesWritten; @@ -341,22 +347,19 @@ uint64_t Lz4FrameCodec::decompress( return bytesWritten; } -std::shared_ptr Lz4FrameCodec::makeCompressor() { +std::shared_ptr Lz4FrameCodec::makeStreamingCompressor() { auto ptr = std::make_shared(compressionLevel_); ptr->init(); return ptr; } -std::shared_ptr Lz4FrameCodec::makeDecompressor() { +std::shared_ptr +Lz4FrameCodec::makeStreamingDecompressor() { auto ptr = std::make_shared(); ptr->init(); return ptr; } -CompressionKind Lz4FrameCodec::compressionKind() const { - return CompressionKind::CompressionKind_LZ4; -} - Lz4RawCodec::Lz4RawCodec(int32_t compressionLevel) : Lz4CodecBase(compressionLevel) {} @@ -366,10 +369,10 @@ uint64_t Lz4RawCodec::maxCompressedLength(uint64_t inputLength) { } uint64_t Lz4RawCodec::decompress( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, - uint8_t* output) { + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { auto decompressedSize = LZ4_decompress_safe( reinterpret_cast(input), reinterpret_cast(output), @@ -382,10 +385,10 @@ uint64_t Lz4RawCodec::decompress( } uint64_t Lz4RawCodec::compress( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, - uint8_t* output) { + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { uint64_t compressedSize; #ifdef LZ4HC_CLEVEL_MIN constexpr int kMinHcClevel = LZ4HC_CLEVEL_MIN; @@ -412,39 +415,37 @@ uint64_t Lz4RawCodec::compress( return static_cast(compressedSize); } -std::shared_ptr Lz4RawCodec::makeCompressor() { +std::shared_ptr Lz4RawCodec::makeStreamingCompressor() { VELOX_UNSUPPORTED( "Streaming compression unsupported with LZ4 raw format. " "Try using LZ4 frame format instead."); } -std::shared_ptr Lz4RawCodec::makeDecompressor() { +std::shared_ptr +Lz4RawCodec::makeStreamingDecompressor() { VELOX_UNSUPPORTED( "Streaming decompression unsupported with LZ4 raw format. " "Try using LZ4 frame format instead."); } -CompressionKind Lz4RawCodec::compressionKind() const { - return CompressionKind::CompressionKind_LZ4RAW; -} - -Lz4HadoopCodec::Lz4HadoopCodec() : Lz4RawCodec(kUseDefaultCompressionLevel) {} +Lz4HadoopCodec::Lz4HadoopCodec(int32_t compressionLevel) + : Lz4RawCodec(compressionLevel) {} uint64_t Lz4HadoopCodec::maxCompressedLength(uint64_t inputLength) { return kPrefixLength + Lz4RawCodec::maxCompressedLength(inputLength); } uint64_t Lz4HadoopCodec::compress( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, - uint8_t* output) { + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { if (outputLength < kPrefixLength) { VELOX_FAIL("Output buffer too small for Lz4HadoopCodec compression."); } uint64_t compressedSize = Lz4RawCodec::compress( - inputLength, input, outputLength - kPrefixLength, output + kPrefixLength); + input, inputLength, output + kPrefixLength, outputLength - kPrefixLength); // Prepend decompressed size in bytes and compressed size in bytes // to be compatible with Hadoop Lz4RawCodec. @@ -459,36 +460,33 @@ uint64_t Lz4HadoopCodec::compress( } uint64_t Lz4HadoopCodec::decompress( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, - uint8_t* output) { + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { uint64_t decompressedSize; if (tryDecompressHadoop( - inputLength, input, outputLength, output, decompressedSize)) { + input, inputLength, output, outputLength, decompressedSize)) { return decompressedSize; } // Fall back on raw LZ4 codec (for files produces by earlier versions of // Parquet C++). - return Lz4RawCodec::decompress(inputLength, input, outputLength, output); + return Lz4RawCodec::decompress(input, inputLength, output, outputLength); } -std::shared_ptr Lz4HadoopCodec::makeCompressor() { +std::shared_ptr Lz4HadoopCodec::makeStreamingCompressor() { VELOX_UNSUPPORTED( "Streaming compression unsupported with LZ4 Hadoop raw format. " "Try using LZ4 frame format instead."); } -std::shared_ptr Lz4HadoopCodec::makeDecompressor() { +std::shared_ptr +Lz4HadoopCodec::makeStreamingDecompressor() { VELOX_UNSUPPORTED( "Streaming decompression unsupported with LZ4 Hadoop raw format. " "Try using LZ4 frame format instead."); } -CompressionKind Lz4HadoopCodec::compressionKind() const { - return CompressionKind::CompressionKind_LZ4HADOOP; -} - int32_t Lz4HadoopCodec::minimumCompressionLevel() const { return kUseDefaultCompressionLevel; } @@ -502,11 +500,11 @@ int32_t Lz4HadoopCodec::defaultCompressionLevel() const { } uint64_t Lz4HadoopCodec::decompressInternal( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, - uint8_t* output) { - return Lz4RawCodec::decompress(inputLength, input, outputLength, output); + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + return Lz4RawCodec::decompress(input, inputLength, output, outputLength); } std::unique_ptr makeLz4FrameCodec(int32_t compressionLevel) { @@ -517,7 +515,7 @@ std::unique_ptr makeLz4RawCodec(int32_t compressionLevel) { return std::make_unique(compressionLevel); } -std::unique_ptr makeLz4HadoopRawCodec() { - return std::make_unique(); +std::unique_ptr makeLz4HadoopRawCodec(int32_t compressionLevel) { + return std::make_unique(compressionLevel); } } // namespace facebook::velox::common diff --git a/velox/common/compression/v2/Lz4Compression.h b/velox/common/compression/v2/Lz4Compression.h index 576657eba007e..a915c0fe8caad 100644 --- a/velox/common/compression/v2/Lz4Compression.h +++ b/velox/common/compression/v2/Lz4Compression.h @@ -28,6 +28,18 @@ namespace facebook::velox::common { static constexpr int32_t kLz4DefaultCompressionLevel = 1; static constexpr int32_t kLz4MinCompressionLevel = 1; +class Lz4CodecOptions : public CodecOptions { + public: + enum Type { kLz4Frame, kLz4Raw, kLz4Hadoop }; + + explicit Lz4CodecOptions( + Lz4CodecOptions::Type type, + int32_t compressionLevel = kUseDefaultCompressionLevel) + : type(type), CodecOptions(compressionLevel) {} + + Lz4CodecOptions::Type type; +}; + class Lz4CodecBase : public Codec { public: explicit Lz4CodecBase(int32_t compressionLevel); @@ -40,6 +52,8 @@ class Lz4CodecBase : public Codec { int32_t compressionLevel() const override; + CompressionKind compressionKind() const override; + protected: const int compressionLevel_; }; @@ -51,22 +65,20 @@ class Lz4FrameCodec : public Lz4CodecBase { uint64_t maxCompressedLength(uint64_t inputLength) override; uint64_t compress( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, - uint8_t* output) override; + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; uint64_t decompress( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, - uint8_t* output) override; - - std::shared_ptr makeCompressor() override; + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; - std::shared_ptr makeDecompressor() override; + std::shared_ptr makeStreamingCompressor() override; - CompressionKind compressionKind() const override; + std::shared_ptr makeStreamingDecompressor() override; protected: const LZ4F_preferences_t prefs_; @@ -79,47 +91,43 @@ class Lz4RawCodec : public Lz4CodecBase { uint64_t maxCompressedLength(uint64_t inputLength) override; uint64_t compress( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, - uint8_t* output) override; + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; uint64_t decompress( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, - uint8_t* output) override; - - std::shared_ptr makeCompressor() override; + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; - std::shared_ptr makeDecompressor() override; + std::shared_ptr makeStreamingCompressor() override; - CompressionKind compressionKind() const override; + std::shared_ptr makeStreamingDecompressor() override; }; class Lz4HadoopCodec : public Lz4RawCodec, public HadoopCompressionFormat { public: - Lz4HadoopCodec(); + Lz4HadoopCodec(int32_t compressionLevel); uint64_t maxCompressedLength(uint64_t inputLength) override; uint64_t compress( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, - uint8_t* output) override; + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; uint64_t decompress( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, - uint8_t* output) override; - - std::shared_ptr makeCompressor() override; + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; - std::shared_ptr makeDecompressor() override; + std::shared_ptr makeStreamingCompressor() override; - CompressionKind compressionKind() const override; + std::shared_ptr makeStreamingDecompressor() override; int32_t minimumCompressionLevel() const override; @@ -129,10 +137,10 @@ class Lz4HadoopCodec : public Lz4RawCodec, public HadoopCompressionFormat { private: uint64_t decompressInternal( - uint64_t inputLength, const uint8_t* input, - uint64_t outputLength, - uint8_t* output) override; + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; }; // Lz4 frame format codec. @@ -144,5 +152,6 @@ std::unique_ptr makeLz4RawCodec( int32_t compressionLevel = kLz4DefaultCompressionLevel); // Lz4 "Hadoop" format codec (== Lz4 raw codec prefixed with lengths header) -std::unique_ptr makeLz4HadoopRawCodec(); +std::unique_ptr makeLz4HadoopRawCodec( + int32_t compressionLevel = kLz4DefaultCompressionLevel); } // namespace facebook::velox::common diff --git a/velox/common/compression/v2/tests/CompressionTest.cpp b/velox/common/compression/v2/tests/CompressionTest.cpp index 8f311a09f511b..c8d135f8c0f9a 100644 --- a/velox/common/compression/v2/tests/CompressionTest.cpp +++ b/velox/common/compression/v2/tests/CompressionTest.cpp @@ -24,6 +24,7 @@ #include "velox/common/base/VeloxException.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/compression/v2/Compression.h" +#include "velox/common/compression/v2/Lz4Compression.h" namespace facebook::velox::common { @@ -32,14 +33,15 @@ namespace { const std::shared_ptr kDefaultCodecOptions = std::make_shared(); -struct TestParam { +struct TestParams { CompressionKind compressionKind; std::shared_ptr codecOptions; - TestParam( + explicit TestParams( common::CompressionKind compressionKind, std::shared_ptr codecOptions = kDefaultCodecOptions) - : compressionKind(compressionKind), codecOptions(codecOptions) {} + : compressionKind(compressionKind), + codecOptions(std::move(codecOptions)) {} }; std::vector makeRandomData(size_t n) { @@ -81,15 +83,15 @@ void checkCodecRoundtrip( // Compress with codec c1. auto compressedSize = c1->compress( - data.size(), data.data(), maxCompressedLen, compressed.data()); + data.data(), data.size(), compressed.data(), maxCompressedLen); compressed.resize(compressedSize); // Decompress with codec c2. auto decompressedSize = c2->decompress( - compressed.size(), compressed.data(), - decompressed.size(), - decompressed.data()); + compressed.size(), + decompressed.data(), + decompressed.size()); ASSERT_EQ(data, decompressed); ASSERT_EQ(data.size(), decompressedSize); @@ -111,7 +113,7 @@ void checkCodecRoundtrip( } void streamingCompress( - const std::shared_ptr& compressor, + const std::shared_ptr& compressor, const std::vector& uncompressed, std::vector& compressed) { const uint8_t* input = uncompressed.data(); @@ -119,34 +121,40 @@ void streamingCompress( uint64_t compressedSize = 0; compressed.resize(10); bool doFlush = false; + // Generate small random input buffer size. auto randomInputSize = makeRandomInputSize(); + // Continue decompressing until consuming all compressed data . while (remaining > 0) { // Feed a small amount each time. auto inputLength = std::min(remaining, randomInputSize()); auto outputLength = compressed.size() - compressedSize; uint8_t* output = compressed.data() + compressedSize; + // Compress once. auto compressResult = - compressor->compress(inputLength, input, outputLength, output); + compressor->compress(input, inputLength, output, outputLength); ASSERT_LE(compressResult.bytesRead, inputLength); ASSERT_LE(compressResult.bytesWritten, outputLength); + // Update result. compressedSize += compressResult.bytesWritten; input += compressResult.bytesRead; remaining -= compressResult.bytesRead; + // Grow compressed buffer if it's too small. if (compressResult.outputTooSmall) { compressed.resize(compressed.capacity() * 2); } + // Once every two iterations, do a flush. if (doFlush) { - Compressor::FlushResult flushResult; + StreamingCompressor::FlushResult flushResult; do { outputLength = compressed.size() - compressedSize; output = compressed.data() + compressedSize; - flushResult = compressor->flush(outputLength, output); + flushResult = compressor->flush(output, outputLength); ASSERT_LE(flushResult.bytesWritten, outputLength); compressedSize += flushResult.bytesWritten; if (flushResult.outputTooSmall) { @@ -156,13 +164,14 @@ void streamingCompress( } doFlush = !doFlush; } + // End the compressed stream. - Compressor::EndResult endResult; + StreamingCompressor::EndResult endResult; do { - int64_t output_len = compressed.size() - compressedSize; + int64_t outputLength = compressed.size() - compressedSize; uint8_t* output = compressed.data() + compressedSize; - endResult = compressor->end(output_len, output); - ASSERT_LE(endResult.bytesWritten, output_len); + endResult = compressor->end(output, outputLength); + ASSERT_LE(endResult.bytesWritten, outputLength); compressedSize += endResult.bytesWritten; if (endResult.outputTooSmall) { compressed.resize(compressed.capacity() * 2); @@ -172,34 +181,39 @@ void streamingCompress( } void streamingDecompress( - const std::shared_ptr& decompressor, + const std::shared_ptr& decompressor, const std::vector& compressed, std::vector& decompressed) { const uint8_t* input = compressed.data(); uint64_t remaining = compressed.size(); uint64_t decompressedSize = 0; decompressed.resize(10); + // Generate small random input buffer size. auto ramdomInputSize = makeRandomInputSize(); + // Continue decompressing until finishes. while (!decompressor->isFinished()) { // Feed a small amount each time. auto inputLength = std::min(remaining, ramdomInputSize()); auto outputLength = decompressed.size() - decompressedSize; uint8_t* output = decompressed.data() + decompressedSize; + // Decompress once. auto result = - decompressor->decompress(inputLength, input, outputLength, output); + decompressor->decompress(input, inputLength, output, outputLength); ASSERT_LE(result.bytesRead, inputLength); ASSERT_LE(result.bytesWritten, outputLength); ASSERT_TRUE( result.outputTooSmall || result.bytesWritten > 0 || result.bytesRead > 0) << "Decompression not progressing anymore"; + // Update result. decompressedSize += result.bytesWritten; input += result.bytesRead; remaining -= result.bytesRead; + // Grow decompressed buffer if it's too small. if (result.outputTooSmall) { decompressed.resize(decompressed.capacity() * 2); @@ -214,14 +228,15 @@ void streamingDecompress( void checkStreamingCompressor(Codec* codec, const std::vector& data) { // Run streaming compression. std::vector compressed; - streamingCompress(codec->makeCompressor(), data, compressed); + streamingCompress(codec->makeStreamingCompressor(), data, compressed); + // Check decompressing the compressed data. std::vector decompressed(data.size()); ASSERT_NO_THROW(codec->decompress( - compressed.size(), compressed.data(), - decompressed.size(), - decompressed.data())); + compressed.size(), + decompressed.data(), + decompressed.size())); ASSERT_EQ(data, decompressed); } @@ -233,11 +248,14 @@ void checkStreamingDecompressor( auto maxCompressedLen = codec->maxCompressedLength(data.size()); std::vector compressed(maxCompressedLen); auto compressedSize = codec->compress( - data.size(), data.data(), maxCompressedLen, compressed.data()); + data.data(), data.size(), compressed.data(), maxCompressedLen); compressed.resize(compressedSize); + // Run streaming decompression. std::vector decompressed; - streamingDecompress(codec->makeDecompressor(), compressed, decompressed); + streamingDecompress( + codec->makeStreamingDecompressor(), compressed, decompressed); + // Check the decompressed data. ASSERT_EQ(data.size(), decompressed.size()); ASSERT_EQ(data, decompressed); @@ -245,8 +263,8 @@ void checkStreamingDecompressor( // Check the streaming compressor and decompressor together. void checkStreamingRoundtrip( - const std::shared_ptr& compressor, - const std::shared_ptr& decompressor, + const std::shared_ptr& compressor, + const std::shared_ptr& decompressor, const std::vector& data) { std::vector compressed; streamingCompress(compressor, data, compressed); @@ -257,11 +275,13 @@ void checkStreamingRoundtrip( void checkStreamingRoundtrip(Codec* codec, const std::vector& data) { checkStreamingRoundtrip( - codec->makeCompressor(), codec->makeDecompressor(), data); + codec->makeStreamingCompressor(), + codec->makeStreamingDecompressor(), + data); } } // namespace -class CodecTest : public ::testing::TestWithParam { +class CodecTest : public ::testing::TestWithParam { protected: static CompressionKind getCompressionKind() { return GetParam().compressionKind; @@ -299,53 +319,21 @@ TEST_P(CodecTest, specifyCompressionLevel) { TEST_P(CodecTest, getUncompressedLength) { auto codec = makeCodec(); - // Test non-empty input. - { - auto inputLength = 100; - auto input = makeRandomData(inputLength); - std::vector compressed(codec->maxCompressedLength(input.size())); - auto compressedLength = codec->compress( - inputLength, input.data(), compressed.size(), compressed.data()); - compressed.resize(compressedLength); - - if (Codec::supportsGetUncompressedLength(getCompressionKind())) { - auto uncompressedLength = - codec->getUncompressedLength(compressedLength, compressed.data()); - ASSERT_EQ( - codec->getUncompressedLength(compressedLength, compressed.data()), - inputLength); - ASSERT_EQ( - codec->getUncompressedLength( - compressedLength, compressed.data(), inputLength), - inputLength); - ASSERT_EQ( - codec->getUncompressedLength( - compressedLength, compressed.data(), std::nullopt), - inputLength); - VELOX_ASSERT_THROW( - codec->getUncompressedLength( - compressedLength, compressed.data(), inputLength + 1), - fmt::format("Invalid uncompressed length: {}", inputLength + 1)); - } else { - ASSERT_EQ( - codec->getUncompressedLength(input.size(), input.data()), - std::nullopt); - ASSERT_EQ( - codec->getUncompressedLength( - input.size(), input.data(), std::nullopt), - std::nullopt); - ASSERT_EQ(codec->getUncompressedLength(input.size(), input.data(), 0), 0); - ASSERT_EQ(codec->getUncompressedLength(input.size(), input.data(), 2), 2); - } - } - // Test empty input. - { - std::vector input{}; - ASSERT_EQ(codec->getUncompressedLength(0, input.data(), 0), 0); - ASSERT_EQ(codec->getUncompressedLength(0, input.data(), std::nullopt), 0); - VELOX_ASSERT_THROW( - codec->getUncompressedLength(0, input.data(), 1), - fmt::format("Invalid uncompressed length: {}", 1)); + auto inputLength = 100; + auto input = makeRandomData(inputLength); + std::vector compressed(codec->maxCompressedLength(input.size())); + auto compressedLength = codec->compress( + input.data(), inputLength, compressed.data(), compressed.size()); + compressed.resize(compressedLength); + + if (Codec::supportsGetUncompressedLength(getCompressionKind())) { + ASSERT_EQ( + codec->getUncompressedLength(compressedLength, compressed.data()), + inputLength); + } else { + ASSERT_EQ( + codec->getUncompressedLength(compressedLength, compressed.data()), + std::nullopt); } } @@ -436,33 +424,46 @@ TEST_P(CodecTest, streamingDecompressorReuse) { } auto codec = makeCodec(); - auto decompressor = codec->makeDecompressor(); + auto decompressor = codec->makeStreamingDecompressor(); checkStreamingRoundtrip( - codec->makeCompressor(), decompressor, makeRandomData(100)); + codec->makeStreamingCompressor(), decompressor, makeRandomData(100)); - // Decompressor::reset() should allow reusing decompressor for a new stream. + // StreamingDecompressor::reset() should allow reusing decompressor for a new + // stream. decompressor->reset(); checkStreamingRoundtrip( - codec->makeCompressor(), decompressor, makeRandomData(200)); + codec->makeStreamingCompressor(), decompressor, makeRandomData(200)); } INSTANTIATE_TEST_SUITE_P( - TestLZ4Frame, + TestLz4Frame, CodecTest, - ::testing::Values(CompressionKind::CompressionKind_LZ4)); + ::testing::Values(TestParams{ + CompressionKind::CompressionKind_LZ4, + std::make_shared(Lz4CodecOptions::kLz4Frame)})); + INSTANTIATE_TEST_SUITE_P( - TestLZ4Raw, + TestLz4Raw, CodecTest, - ::testing::Values(CompressionKind::CompressionKind_LZ4RAW)); + ::testing::Values(TestParams{ + CompressionKind::CompressionKind_LZ4, + std::make_shared(Lz4CodecOptions::kLz4Raw)})); + INSTANTIATE_TEST_SUITE_P( - TestLZ4Hadoop, + TestLz4Hadoop, CodecTest, - ::testing::Values(CompressionKind::CompressionKind_LZ4HADOOP)); + ::testing::Values(TestParams{ + CompressionKind::CompressionKind_LZ4, + std::make_shared(Lz4CodecOptions::kLz4Hadoop)})); TEST(CodecLZ4HadoopTest, compatibility) { // LZ4 Hadoop codec should be able to read back LZ4 raw blocks. - auto c1 = Codec::create(CompressionKind::CompressionKind_LZ4RAW); - auto c2 = Codec::create(CompressionKind::CompressionKind_LZ4HADOOP); + auto c1 = Codec::create( + CompressionKind::CompressionKind_LZ4, + Lz4CodecOptions{Lz4CodecOptions::kLz4Raw}); + auto c2 = Codec::create( + CompressionKind::CompressionKind_LZ4, + Lz4CodecOptions{Lz4CodecOptions::kLz4Hadoop}); for (auto dataSize : {0, 10, 10000, 100000}) { checkCodecRoundtrip(c1, c2, makeRandomData(dataSize));