diff --git a/CMakeLists.txt b/CMakeLists.txt index 669356dab8a8..31c14788aa1c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -424,20 +424,18 @@ resolve_dependency(glog) set_source(fmt) resolve_dependency(fmt) -if(NOT ${VELOX_BUILD_MINIMAL}) - find_package(ZLIB REQUIRED) - find_package(lz4 REQUIRED) - find_package(lzo2 REQUIRED) - find_package(zstd REQUIRED) - find_package(Snappy REQUIRED) - if(NOT TARGET zstd::zstd) - if(TARGET zstd::libzstd_static) - set(ZSTD_TYPE static) - else() - set(ZSTD_TYPE shared) - endif() - add_library(zstd::zstd ALIAS zstd::libzstd_${ZSTD_TYPE}) +find_package(ZLIB REQUIRED) +find_package(lz4 REQUIRED) +find_package(lzo2 REQUIRED) +find_package(zstd REQUIRED) +find_package(Snappy REQUIRED) +if(NOT TARGET zstd::zstd) + if(TARGET zstd::libzstd_static) + set(ZSTD_TYPE static) + else() + set(ZSTD_TYPE shared) endif() + add_library(zstd::zstd ALIAS zstd::libzstd_${ZSTD_TYPE}) endif() set_source(re2) diff --git a/velox/common/CMakeLists.txt b/velox/common/CMakeLists.txt index e96b44b9d2c6..adfe61bac562 100644 --- a/velox/common/CMakeLists.txt +++ b/velox/common/CMakeLists.txt @@ -14,6 +14,7 @@ add_subdirectory(base) add_subdirectory(caching) add_subdirectory(compression) +add_subdirectory(compression/v2) add_subdirectory(config) add_subdirectory(encode) add_subdirectory(file) diff --git a/velox/common/compression/Compression.cpp b/velox/common/compression/Compression.cpp index 3843f5902a98..779ed08a6d84 100644 --- a/velox/common/compression/Compression.cpp +++ b/velox/common/compression/Compression.cpp @@ -76,6 +76,10 @@ std::string compressionKindToString(CompressionKind kind) { return "lz4"; case CompressionKind_GZIP: return "gzip"; + case CompressionKind_LZ4RAW: + return "lz4_raw"; + case CompressionKind_LZ4HADOOP: + return "lz4_hadoop"; } return folly::to("unknown - ", kind); } @@ -89,7 +93,9 @@ CompressionKind stringToCompressionKind(const std::string& kind) { {"lzo", CompressionKind_LZO}, {"zstd", CompressionKind_ZSTD}, {"lz4", CompressionKind_LZ4}, - {"gzip", CompressionKind_GZIP}}; + {"gzip", CompressionKind_GZIP}, + {"lz4_raw", CompressionKind_LZ4RAW}, + {"lz4_hadoop", CompressionKind_LZ4HADOOP}}; auto iter = stringToCompressionKindMap.find(kind); if (iter != stringToCompressionKindMap.end()) { return iter->second; diff --git a/velox/common/compression/Compression.h b/velox/common/compression/Compression.h index 317f9717a2ae..072c59147edc 100644 --- a/velox/common/compression/Compression.h +++ b/velox/common/compression/Compression.h @@ -29,6 +29,8 @@ enum CompressionKind { CompressionKind_ZSTD = 4, CompressionKind_LZ4 = 5, CompressionKind_GZIP = 6, + CompressionKind_LZ4RAW = 7, + CompressionKind_LZ4HADOOP = 8, CompressionKind_MAX = INT64_MAX }; diff --git a/velox/common/compression/tests/CompressionTest.cpp b/velox/common/compression/tests/CompressionTest.cpp index 0b036c55c512..bad9231f7a7e 100644 --- a/velox/common/compression/tests/CompressionTest.cpp +++ b/velox/common/compression/tests/CompressionTest.cpp @@ -31,6 +31,9 @@ TEST_F(CompressionTest, testCompressionNames) { EXPECT_EQ("lzo", compressionKindToString(CompressionKind_LZO)); EXPECT_EQ("lz4", compressionKindToString(CompressionKind_LZ4)); EXPECT_EQ("zstd", compressionKindToString(CompressionKind_ZSTD)); + EXPECT_EQ("gzip", compressionKindToString(CompressionKind_GZIP)); + EXPECT_EQ("lz4_raw", compressionKindToString(CompressionKind_LZ4RAW)); + EXPECT_EQ("lz4_hadoop", compressionKindToString(CompressionKind_LZ4HADOOP)); EXPECT_EQ( "unknown - 99", compressionKindToString(static_cast(99))); @@ -56,6 +59,8 @@ TEST_F(CompressionTest, stringToCompressionKind) { EXPECT_EQ(stringToCompressionKind("lz4"), CompressionKind_LZ4); EXPECT_EQ(stringToCompressionKind("zstd"), CompressionKind_ZSTD); EXPECT_EQ(stringToCompressionKind("gzip"), CompressionKind_GZIP); + EXPECT_EQ(stringToCompressionKind("lz4_raw"), CompressionKind_LZ4RAW); + EXPECT_EQ(stringToCompressionKind("lz4_hadoop"), CompressionKind_LZ4HADOOP); VELOX_ASSERT_THROW( stringToCompressionKind("bz2"), "Not support compression kind bz2"); } diff --git a/velox/common/compression/v2/CMakeLists.txt b/velox/common/compression/v2/CMakeLists.txt new file mode 100644 index 000000000000..9354b35f294a --- /dev/null +++ b/velox/common/compression/v2/CMakeLists.txt @@ -0,0 +1,29 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if(${VELOX_BUILD_TESTING}) + add_subdirectory(tests) +endif() + +add_library(velox_common_compression_v2 + Compression.cpp HadoopCompressionFormat.cpp Lz4Compression.cpp) + +target_link_libraries( + velox_common_compression_v2 + velox_common_base + Folly::folly + Snappy::snappy + zstd::zstd + ZLIB::ZLIB + lz4::lz4) diff --git a/velox/common/compression/v2/Compression.cpp b/velox/common/compression/v2/Compression.cpp new file mode 100644 index 000000000000..78b1bc1d305f --- /dev/null +++ b/velox/common/compression/v2/Compression.cpp @@ -0,0 +1,196 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Adapted from Apache Arrow. + +#include "velox/common/compression/v2/Compression.h" +#include +#include +#include +#include "velox/common/base/Exceptions.h" +#include "velox/common/compression/v2/Lz4Compression.h" + +namespace facebook::velox::common { + +namespace { +void checkSupportsCompressionLevel(CompressionKind kind) { + VELOX_USER_CHECK( + Codec::supportsCompressionLevel(kind), + "Codec '" + compressionKindToString(kind) + + "' doesn't support setting a compression level."); +} +} // namespace + +int32_t Codec::useDefaultCompressionLevel() { + return kUseDefaultCompressionLevel; +} + +void Codec::init() {} + +bool Codec::supportsGetUncompressedLength(CompressionKind kind) { + switch (kind) { + default: + return false; + } +} + +bool Codec::supportsCompressionLevel(CompressionKind kind) { + switch (kind) { + case CompressionKind::CompressionKind_LZ4: + case CompressionKind::CompressionKind_LZ4RAW: + return true; + default: + return false; + } +} + +bool Codec::supportsStreamingCompression(CompressionKind kind) { + switch (kind) { + case CompressionKind::CompressionKind_LZ4: + case CompressionKind::CompressionKind_GZIP: + case CompressionKind::CompressionKind_ZLIB: + return true; + default: + return false; + } +} + +int32_t Codec::maximumCompressionLevel(CompressionKind kind) { + checkSupportsCompressionLevel(kind); + auto codec = Codec::create(kind); + return codec->maximumCompressionLevel(); +} + +int32_t Codec::minimumCompressionLevel(CompressionKind kind) { + checkSupportsCompressionLevel(kind); + auto codec = Codec::create(kind); + return codec->minimumCompressionLevel(); +} + +int32_t Codec::defaultCompressionLevel(CompressionKind kind) { + checkSupportsCompressionLevel(kind); + auto codec = Codec::create(kind); + return codec->defaultCompressionLevel(); +} + +std::unique_ptr Codec::create( + CompressionKind kind, + const CodecOptions& codecOptions) { + if (!isAvailable(kind)) { + auto name = compressionKindToString(kind); + if (folly::StringPiece({name}).startsWith("unknown")) { + VELOX_UNSUPPORTED("Unrecognized codec '{}'", name); + } + VELOX_UNSUPPORTED("Support for codec '{}' not implemented.", name); + } + + auto compressionLevel = codecOptions.compressionLevel; + if (compressionLevel != kUseDefaultCompressionLevel) { + checkSupportsCompressionLevel(kind); + } + + std::unique_ptr codec; + switch (kind) { + case CompressionKind::CompressionKind_LZ4: + codec = makeLz4FrameCodec(compressionLevel); + break; + case CompressionKind::CompressionKind_LZ4RAW: + codec = makeLz4RawCodec(compressionLevel); + break; + case CompressionKind::CompressionKind_LZ4HADOOP: + codec = makeLz4HadoopRawCodec(); + break; + default: + break; + } + + if (codec == nullptr) { + VELOX_UNSUPPORTED("LZO codec not implemented"); + } + + codec->init(); + + return codec; +} + +// use compression level to create Codec +std::unique_ptr Codec::create( + CompressionKind kind, + int32_t compressionLevel) { + return create(kind, CodecOptions{compressionLevel}); +} + +bool Codec::isAvailable(CompressionKind kind) { + switch (kind) { + case CompressionKind::CompressionKind_NONE: + case CompressionKind::CompressionKind_LZ4: + case CompressionKind::CompressionKind_LZ4RAW: + case CompressionKind::CompressionKind_LZ4HADOOP: + return true; + case CompressionKind::CompressionKind_SNAPPY: + case CompressionKind::CompressionKind_GZIP: + case CompressionKind::CompressionKind_ZLIB: + case CompressionKind::CompressionKind_ZSTD: + case CompressionKind::CompressionKind_LZO: + default: + return false; + } +} + +std::optional Codec::getUncompressedLength( + uint64_t inputLength, + const uint8_t* input, + std::optional uncompressedLength) const { + if (inputLength == 0) { + if (uncompressedLength.value_or(0) != 0) { + VELOX_USER_CHECK_EQ( + uncompressedLength.value_or(0), + 0, + "Invalid uncompressed length: {}.", + *uncompressedLength); + } + return 0; + } + auto actualLength = + doGetUncompressedLength(inputLength, input, uncompressedLength); + if (actualLength) { + if (uncompressedLength) { + VELOX_USER_CHECK_EQ( + *actualLength, + *uncompressedLength, + "Invalid uncompressed length: {}.", + *uncompressedLength); + } + return actualLength; + } + return uncompressedLength; +} + +std::optional Codec::doGetUncompressedLength( + uint64_t inputLength, + const uint8_t* input, + std::optional uncompressedLength) const { + return uncompressedLength; +} + +uint64_t Codec::compressPartial( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + VELOX_UNSUPPORTED("'{}' doesn't support partial compression", name()); +} +} // namespace facebook::velox::common diff --git a/velox/common/compression/v2/Compression.h b/velox/common/compression/v2/Compression.h new file mode 100644 index 000000000000..d6ef89b9017c --- /dev/null +++ b/velox/common/compression/v2/Compression.h @@ -0,0 +1,246 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Derived from Apache Arrow. + +#pragma once + +#include +#include +#include +#include +#include +#include "velox/common/compression/Compression.h" + +namespace facebook::velox::common { + +static constexpr int32_t kUseDefaultCompressionLevel = + std::numeric_limits::min(); + +/// Streaming compressor interface. +class Compressor { + public: + virtual ~Compressor() = default; + + struct CompressResult { + uint64_t bytesRead; + uint64_t bytesWritten; + bool outputTooSmall; + }; + struct FlushResult { + uint64_t bytesWritten; + bool outputTooSmall; + }; + struct EndResult { + uint64_t bytesWritten; + bool outputTooSmall; + }; + + /// Compress some input. + /// If bytes_read is 0 on return, then a larger output buffer should be + /// supplied. + virtual CompressResult compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) = 0; + + /// Flush part of the compressed output. + /// If outputTooSmall is true on return, flush() should be called again + /// with a larger buffer. + virtual FlushResult flush(uint64_t outputLength, uint8_t* output) = 0; + + /// End compressing, doing whatever is necessary to end the stream. + /// If outputTooSmall is true on return, end() should be called again + /// with a larger buffer. Otherwise, the Compressor should not be used + /// anymore. + /// end() implies flush(). + virtual EndResult end(uint64_t outputLength, uint8_t* output) = 0; +}; + +/// Streaming decompressor interface +class Decompressor { + public: + virtual ~Decompressor() = default; + + struct DecompressResult { + uint64_t bytesRead; + uint64_t bytesWritten; + bool outputTooSmall; + }; + + /// Decompress some input. + /// If outputTooSmall is true on return, a larger output buffer needs + /// to be supplied. + virtual DecompressResult decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) = 0; + + /// Return whether the compressed stream is finished. + virtual bool isFinished() = 0; + + /// Reinitialize decompressor, making it ready for a new compressed stream. + virtual void reset() = 0; +}; + +/// Compression codec options +class CodecOptions { + public: + explicit CodecOptions(int32_t compressionLevel = kUseDefaultCompressionLevel) + : compressionLevel(compressionLevel) {} + + virtual ~CodecOptions() = default; + + int32_t compressionLevel; +}; + +/// Compression codec +class Codec { + public: + virtual ~Codec() = default; + + /// Return special value to indicate that a codec implementation + /// should use its default compression level. + static int32_t useDefaultCompressionLevel(); + + /// Create a kind for the given compression algorithm with CodecOptions. + static std::unique_ptr create( + CompressionKind kind, + const CodecOptions& codecOptions = CodecOptions{}); + + /// Create a kind for the given compression algorithm. + static std::unique_ptr create( + CompressionKind kind, + int32_t compressionLevel); + + /// Return true if support for indicated kind has been enabled. + static bool isAvailable(CompressionKind kind); + + /// Return true if indicated kind supports extracting uncompressed length + /// from compressed data. + static bool supportsGetUncompressedLength(CompressionKind kind); + + /// Return true if indicated kind supports setting a compression level. + static bool supportsCompressionLevel(CompressionKind kind); + + /// Return true if indicated kind supports creating streaming de/compressor. + static bool supportsStreamingCompression(CompressionKind kind); + + /// Return the smallest supported compression level for the kind + /// Note: This function creates a temporary Codec instance. + static int32_t minimumCompressionLevel(CompressionKind kind); + + /// Return the largest supported compression level for the kind + /// Note: This function creates a temporary Codec instance. + static int32_t maximumCompressionLevel(CompressionKind kind); + + /// Return the default compression level. + /// Note: This function creates a temporary Codec instance. + static int32_t defaultCompressionLevel(CompressionKind kind); + + /// Return the smallest supported compression level. + virtual int32_t minimumCompressionLevel() const = 0; + + /// Return the largest supported compression level. + virtual int32_t maximumCompressionLevel() const = 0; + + /// Return the default compression level. + virtual int32_t defaultCompressionLevel() const = 0; + + /// One-shot decompression function. + /// `outputLength` must be correct and therefore be obtained in advance. + /// The actual decompressed length is returned. + /// Note: One-shot decompression is not always compatible with streaming + /// compression. Depending on the codec (e.g. LZ4), different formats may + /// be used. + virtual uint64_t decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) = 0; + + /// Performs one-shot compression. + /// `outputLength` must first have been computed using maxCompressedLength(). + /// The actual compressed length is returned. + /// Note: One-shot compression is not always compatible with streaming + /// decompression. Depending on the codec (e.g. LZ4), different formats may + /// be used. + virtual uint64_t compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) = 0; + + /// Performs one-shot compression. + /// This function compresses data and writes the output up to the specified + /// outputLength. If outputLength is too small to hold all the compressed + /// data, the function doesn't fail. Instead, it returns the number of bytes + /// actually written to the output buffer. Any remaining data that couldn't + /// be written in this call will be written in subsequent calls to this + /// function. This is useful when fixed-size compression blocks are required + /// by the caller. + /// Note: Only Gzip and Zstd codec supports this function. + virtual uint64_t compressPartial( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output); + + /// Maximum compressed length of given input length. + virtual uint64_t maxCompressedLength(uint64_t inputLength) = 0; + + /// Extracts the uncompressed length from the compressed data if possible. + /// If the codec doesn't store the uncompressed length, or the data is + /// corrupted it returns the given uncompressedLength. + /// If the uncompressed length is stored in the compressed data and + /// uncompressedLength is not none and they do not match a std::runtime_error + /// is thrown. + std::optional getUncompressedLength( + uint64_t inputLength, + const uint8_t* input, + std::optional uncompressedLength = std::nullopt) const; + + /// Create a streaming compressor instance. + virtual std::shared_ptr makeCompressor() = 0; + + /// Create a streaming compressor instance. + virtual std::shared_ptr makeDecompressor() = 0; + + /// This Codec's compression type. + virtual CompressionKind compressionKind() const = 0; + + /// The name of this Codec's compression type. + std::string name() const { + return compressionKindToString(compressionKind()); + } + + /// This Codec's compression level, if applicable. + virtual int32_t compressionLevel() const { + return kUseDefaultCompressionLevel; + } + + private: + /// Initializes the codec's resources. + virtual void init(); + + virtual std::optional doGetUncompressedLength( + uint64_t inputLength, + const uint8_t* input, + std::optional uncompressedLength) const; +}; +} // namespace facebook::velox::common diff --git a/velox/common/compression/v2/HadoopCompressionFormat.cpp b/velox/common/compression/v2/HadoopCompressionFormat.cpp new file mode 100644 index 000000000000..d2b6e4852a70 --- /dev/null +++ b/velox/common/compression/v2/HadoopCompressionFormat.cpp @@ -0,0 +1,81 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/compression/v2/HadoopCompressionFormat.h" +#include "velox/common/base/Exceptions.h" + +#include + +namespace facebook::velox::common { + +bool HadoopCompressionFormat::tryDecompressHadoop( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output, + uint64_t& actualDecompressedSize) { + // Parquet files written with the Hadoop Lz4RawCodec use their own framing. + // The input buffer can contain an arbitrary number of "frames", each + // with the following structure: + // - bytes 0..3: big-endian uint32_t representing the frame decompressed + // size + // - bytes 4..7: big-endian uint32_t representing the frame compressed size + // - bytes 8...: frame compressed data + // + // The Hadoop Lz4Codec source code can be found here: + // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc + uint64_t totalDecompressedSize = 0; + + while (inputLength >= kPrefixLength) { + const uint32_t expectedDecompressedSize = + folly::Endian::big(folly::loadUnaligned(input)); + const uint32_t expectedCompressedSize = folly::Endian::big( + folly::loadUnaligned(input + sizeof(uint32_t))); + input += kPrefixLength; + inputLength -= kPrefixLength; + + if (inputLength < expectedCompressedSize) { + // Not enough bytes for Hadoop "frame" + return false; + } + if (outputLength < expectedDecompressedSize) { + // Not enough bytes to hold advertised output => probably not Hadoop + return false; + } + // Try decompressing and compare with expected decompressed length + try { + auto decompressedSize = decompressInternal( + expectedCompressedSize, input, outputLength, output); + if (decompressedSize != expectedDecompressedSize) { + return false; + } + } catch (const VeloxException& e) { + return false; + } + input += expectedCompressedSize; + inputLength -= expectedCompressedSize; + output += expectedDecompressedSize; + outputLength -= expectedDecompressedSize; + totalDecompressedSize += expectedDecompressedSize; + } + + if (inputLength == 0) { + actualDecompressedSize = totalDecompressedSize; + return true; + } + return false; +} +} // namespace facebook::velox::common diff --git a/velox/common/compression/v2/HadoopCompressionFormat.h b/velox/common/compression/v2/HadoopCompressionFormat.h new file mode 100644 index 000000000000..aaf5e71e97a3 --- /dev/null +++ b/velox/common/compression/v2/HadoopCompressionFormat.h @@ -0,0 +1,41 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace facebook::velox::common { + +class HadoopCompressionFormat { + protected: + bool tryDecompressHadoop( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output, + uint64_t& actualDecompressedSize); + + virtual uint64_t decompressInternal( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) = 0; + + // Offset starting at which page data can be read/written. + static constexpr uint64_t kPrefixLength = sizeof(uint32_t) * 2; +}; +} // namespace facebook::velox::common diff --git a/velox/common/compression/v2/Lz4Compression.cpp b/velox/common/compression/v2/Lz4Compression.cpp new file mode 100644 index 000000000000..d7440ccea77e --- /dev/null +++ b/velox/common/compression/v2/Lz4Compression.cpp @@ -0,0 +1,527 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/compression/v2/Lz4Compression.h" +#include "velox/common/base/Exceptions.h" + +namespace facebook::velox::common { + +namespace { + +void lz4Error(LZ4F_errorCode_t errorCode, const char* prefixMessage) { + VELOX_FAIL(prefixMessage, LZ4F_getErrorName(errorCode)); +} + +LZ4F_preferences_t defaultPreferences() { + LZ4F_preferences_t prefs; + memset(&prefs, 0, sizeof(prefs)); + return prefs; +} + +LZ4F_preferences_t defaultPreferences(int compressionLevel) { + LZ4F_preferences_t prefs = defaultPreferences(); + prefs.compressionLevel = compressionLevel; + return prefs; +} +} // namespace + +class LZ4Compressor : public Compressor { + public: + explicit LZ4Compressor(int32_t compressionLevel); + + ~LZ4Compressor() override; + + void init(); + + CompressResult compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + FlushResult flush(uint64_t outputLength, uint8_t* output) override; + + EndResult end(uint64_t outputLength, uint8_t* output) override; + + protected: + void + compressBegin(uint8_t* output, size_t& outputLen, uint64_t& bytesWritten); + + int compressionLevel_; + LZ4F_compressionContext_t ctx_{nullptr}; + LZ4F_preferences_t prefs_; + bool firstTime_; +}; + +class LZ4Decompressor : public Decompressor { + public: + LZ4Decompressor() {} + + ~LZ4Decompressor() override { + if (ctx_ != nullptr) { + LZ4F_freeDecompressionContext(ctx_); + } + } + + void init(); + + void reset() override; + + DecompressResult decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + bool isFinished() override; + + protected: + LZ4F_decompressionContext_t ctx_ = nullptr; + bool finished_; +}; + +LZ4Compressor::LZ4Compressor(int32_t compressionLevel) + : compressionLevel_(compressionLevel) {} + +LZ4Compressor::~LZ4Compressor() { + if (ctx_ != nullptr) { + LZ4F_freeCompressionContext(ctx_); + } +} + +void LZ4Compressor::init() { + LZ4F_errorCode_t ret; + prefs_ = defaultPreferences(compressionLevel_); + firstTime_ = true; + + ret = LZ4F_createCompressionContext(&ctx_, LZ4F_VERSION); + if (LZ4F_isError(ret)) { + lz4Error(ret, "LZ4 init failed: "); + } +} + +Compressor::CompressResult LZ4Compressor::compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + auto inputSize = static_cast(inputLength); + auto outputSize = static_cast(outputLength); + uint64_t bytesWritten = 0; + + if (firstTime_) { + // Output too small to write LZ4F header. + if (outputLength < LZ4F_HEADER_SIZE_MAX) { + return CompressResult{0, 0, true}; + } + compressBegin(output, outputSize, bytesWritten); + } + + if (outputSize < LZ4F_compressBound(inputSize, &prefs_)) { + // Output too small to compress into. + return CompressResult{0, bytesWritten, true}; + } + auto numBytesOrError = LZ4F_compressUpdate( + ctx_, output, outputSize, input, inputSize, nullptr /* options */); + if (LZ4F_isError(numBytesOrError)) { + lz4Error(numBytesOrError, "LZ4 compress update failed: "); + } + bytesWritten += static_cast(numBytesOrError); + DCHECK_LE(bytesWritten, outputSize); + return CompressResult{inputLength, bytesWritten, false}; +} + +Compressor::FlushResult LZ4Compressor::flush( + uint64_t outputLength, + uint8_t* output) { + auto outputSize = static_cast(outputLength); + uint64_t bytesWritten = 0; + + if (firstTime_) { + // Output too small to write LZ4F header. + if (outputLength < LZ4F_HEADER_SIZE_MAX) { + return FlushResult{0, true}; + } + compressBegin(output, outputSize, bytesWritten); + } + + if (outputSize < LZ4F_compressBound(0, &prefs_)) { + // Output too small to flush into. + return FlushResult{bytesWritten, true}; + } + + auto numBytesOrError = + LZ4F_flush(ctx_, output, outputSize, nullptr /* options */); + if (LZ4F_isError(numBytesOrError)) { + lz4Error(numBytesOrError, "LZ4 flush failed: "); + } + bytesWritten += static_cast(numBytesOrError); + DCHECK_LE(bytesWritten, outputLength); + return FlushResult{bytesWritten, false}; +} + +Compressor::EndResult LZ4Compressor::end( + uint64_t outputLength, + uint8_t* output) { + auto outputSize = static_cast(outputLength); + uint64_t bytesWritten = 0; + + if (firstTime_) { + // Output too small to write LZ4F header. + if (outputLength < LZ4F_HEADER_SIZE_MAX) { + return EndResult{0, true}; + } + compressBegin(output, outputSize, bytesWritten); + } + + if (outputSize < LZ4F_compressBound(0, &prefs_)) { + // Output too small to end frame into. + return EndResult{bytesWritten, true}; + } + + auto numBytesOrError = + LZ4F_compressEnd(ctx_, output, outputSize, nullptr /* options */); + if (LZ4F_isError(numBytesOrError)) { + lz4Error(numBytesOrError, "LZ4 end failed: "); + } + bytesWritten += static_cast(numBytesOrError); + DCHECK_LE(bytesWritten, outputLength); + return EndResult{bytesWritten, false}; +} + +void LZ4Compressor::compressBegin( + uint8_t* output, + size_t& outputLen, + uint64_t& bytesWritten) { + auto numBytesOrError = LZ4F_compressBegin(ctx_, output, outputLen, &prefs_); + if (LZ4F_isError(numBytesOrError)) { + lz4Error(numBytesOrError, "LZ4 compress begin failed: "); + } + firstTime_ = false; + output += numBytesOrError; + outputLen -= numBytesOrError; + bytesWritten += static_cast(numBytesOrError); +} + +void common::LZ4Decompressor::init() { + LZ4F_errorCode_t ret; + finished_ = false; + + ret = LZ4F_createDecompressionContext(&ctx_, LZ4F_VERSION); + if (LZ4F_isError(ret)) { + lz4Error(ret, "LZ4 init failed: "); + } +} + +void LZ4Decompressor::reset() { +#if defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER >= 10800 + // LZ4F_resetDecompressionContext appeared in 1.8.0 + DCHECK_NE(ctx_, nullptr); + LZ4F_resetDecompressionContext(ctx_); + finished_ = false; +#else + if (ctx_ != nullptr) { + LZ4F_freeDecompressionContext(ctx_); + } + init(); +#endif +} + +Decompressor::DecompressResult LZ4Decompressor::decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + auto inputSize = static_cast(inputLength); + auto outputSize = static_cast(outputLength); + + auto ret = LZ4F_decompress( + ctx_, output, &outputSize, input, &inputSize, nullptr /* options */); + if (LZ4F_isError(ret)) { + lz4Error(ret, "LZ4 decompress failed: "); + } + finished_ = (ret == 0); + return DecompressResult{ + static_cast(inputSize), + static_cast(outputSize), + (inputSize == 0 && outputSize == 0)}; +} + +bool LZ4Decompressor::isFinished() { + return finished_; +} + +Lz4CodecBase::Lz4CodecBase(int32_t compressionLevel) + : compressionLevel_( + compressionLevel == kUseDefaultCompressionLevel + ? kLz4DefaultCompressionLevel + : compressionLevel) {} + +int32_t Lz4CodecBase::minimumCompressionLevel() const { + return kLz4MinCompressionLevel; +} + +int32_t Lz4CodecBase::maximumCompressionLevel() const { +#if (defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER < 10800) + return 12; +#else + return LZ4F_compressionLevel_max(); +#endif +} + +int32_t Lz4CodecBase::defaultCompressionLevel() const { + return kLz4DefaultCompressionLevel; +} + +int32_t Lz4CodecBase::compressionLevel() const { + return compressionLevel_; +} + +Lz4FrameCodec::Lz4FrameCodec(int32_t compressionLevel) + : Lz4CodecBase(compressionLevel), + prefs_(defaultPreferences(compressionLevel_)) {} + +uint64_t Lz4FrameCodec::maxCompressedLength(uint64_t inputLen) { + return static_cast( + LZ4F_compressFrameBound(static_cast(inputLen), &prefs_)); +} + +uint64_t Lz4FrameCodec::compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + auto ret = LZ4F_compressFrame( + output, + static_cast(outputLength), + input, + static_cast(inputLength), + &prefs_); + if (LZ4F_isError(ret)) { + lz4Error(ret, "Lz4 compression failure: "); + } + return static_cast(ret); +} + +uint64_t Lz4FrameCodec::decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + auto decompressor = makeDecompressor(); + + uint64_t bytesWritten = 0; + while (!decompressor->isFinished() && inputLength != 0) { + auto result = + decompressor->decompress(inputLength, input, outputLength, output); + input += result.bytesRead; + inputLength -= result.bytesRead; + output += result.bytesWritten; + outputLength -= result.bytesWritten; + bytesWritten += result.bytesWritten; + if (result.outputTooSmall) { + VELOX_FAIL("Lz4 decompression buffer too small."); + } + } + if (!decompressor->isFinished()) { + VELOX_FAIL("Lz4 compressed input contains less than one frame."); + } + if (inputLength != 0) { + VELOX_FAIL("Lz4 compressed input contains more than one frame."); + } + return bytesWritten; +} + +std::shared_ptr Lz4FrameCodec::makeCompressor() { + auto ptr = std::make_shared(compressionLevel_); + ptr->init(); + return ptr; +} + +std::shared_ptr Lz4FrameCodec::makeDecompressor() { + auto ptr = std::make_shared(); + ptr->init(); + return ptr; +} + +CompressionKind Lz4FrameCodec::compressionKind() const { + return CompressionKind::CompressionKind_LZ4; +} + +Lz4RawCodec::Lz4RawCodec(int32_t compressionLevel) + : Lz4CodecBase(compressionLevel) {} + +uint64_t Lz4RawCodec::maxCompressedLength(uint64_t inputLength) { + return static_cast( + LZ4_compressBound(static_cast(inputLength))); +} + +uint64_t Lz4RawCodec::decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + auto decompressedSize = LZ4_decompress_safe( + reinterpret_cast(input), + reinterpret_cast(output), + static_cast(inputLength), + static_cast(outputLength)); + if (decompressedSize < 0) { + VELOX_FAIL("Corrupt Lz4 compressed data."); + } + return static_cast(decompressedSize); +} + +uint64_t Lz4RawCodec::compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + uint64_t compressedSize; +#ifdef LZ4HC_CLEVEL_MIN + constexpr int kMinHcClevel = LZ4HC_CLEVEL_MIN; +#else // For older versions of the lz4 library. + constexpr int kMinHcClevel = 3; +#endif + if (compressionLevel_ < kMinHcClevel) { + compressedSize = LZ4_compress_default( + reinterpret_cast(input), + reinterpret_cast(output), + static_cast(inputLength), + static_cast(outputLength)); + } else { + compressedSize = LZ4_compress_HC( + reinterpret_cast(input), + reinterpret_cast(output), + static_cast(inputLength), + static_cast(outputLength), + compressionLevel_); + } + if (compressedSize == 0) { + VELOX_FAIL("Lz4 compression failure."); + } + return static_cast(compressedSize); +} + +std::shared_ptr Lz4RawCodec::makeCompressor() { + VELOX_UNSUPPORTED( + "Streaming compression unsupported with LZ4 raw format. " + "Try using LZ4 frame format instead."); +} + +std::shared_ptr Lz4RawCodec::makeDecompressor() { + VELOX_UNSUPPORTED( + "Streaming decompression unsupported with LZ4 raw format. " + "Try using LZ4 frame format instead."); +} + +CompressionKind Lz4RawCodec::compressionKind() const { + return CompressionKind::CompressionKind_LZ4RAW; +} + +Lz4HadoopCodec::Lz4HadoopCodec() : Lz4RawCodec(kUseDefaultCompressionLevel) {} + +uint64_t Lz4HadoopCodec::maxCompressedLength(uint64_t inputLength) { + return kPrefixLength + Lz4RawCodec::maxCompressedLength(inputLength); +} + +uint64_t Lz4HadoopCodec::compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + if (outputLength < kPrefixLength) { + VELOX_FAIL("Output buffer too small for Lz4HadoopCodec compression."); + } + + uint64_t compressedSize = Lz4RawCodec::compress( + inputLength, input, outputLength - kPrefixLength, output + kPrefixLength); + + // Prepend decompressed size in bytes and compressed size in bytes + // to be compatible with Hadoop Lz4RawCodec. + const uint32_t decompressedLength = + folly::Endian::big(static_cast(inputLength)); + const uint32_t compressedLength = + folly::Endian::big(static_cast(compressedSize)); + folly::storeUnaligned(output, decompressedLength); + folly::storeUnaligned(output + sizeof(uint32_t), compressedLength); + + return kPrefixLength + compressedSize; +} + +uint64_t Lz4HadoopCodec::decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + uint64_t decompressedSize; + if (tryDecompressHadoop( + inputLength, input, outputLength, output, decompressedSize)) { + return decompressedSize; + } + // Fall back on raw LZ4 codec (for files produces by earlier versions of + // Parquet C++). + return Lz4RawCodec::decompress(inputLength, input, outputLength, output); +} + +std::shared_ptr Lz4HadoopCodec::makeCompressor() { + VELOX_UNSUPPORTED( + "Streaming compression unsupported with LZ4 Hadoop raw format. " + "Try using LZ4 frame format instead."); +} + +std::shared_ptr Lz4HadoopCodec::makeDecompressor() { + VELOX_UNSUPPORTED( + "Streaming decompression unsupported with LZ4 Hadoop raw format. " + "Try using LZ4 frame format instead."); +} + +CompressionKind Lz4HadoopCodec::compressionKind() const { + return CompressionKind::CompressionKind_LZ4HADOOP; +} + +int32_t Lz4HadoopCodec::minimumCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +int32_t Lz4HadoopCodec::maximumCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +int32_t Lz4HadoopCodec::defaultCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +uint64_t Lz4HadoopCodec::decompressInternal( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) { + return Lz4RawCodec::decompress(inputLength, input, outputLength, output); +} + +std::unique_ptr makeLz4FrameCodec(int32_t compressionLevel) { + return std::make_unique(compressionLevel); +} + +std::unique_ptr makeLz4RawCodec(int32_t compressionLevel) { + return std::make_unique(compressionLevel); +} + +std::unique_ptr makeLz4HadoopRawCodec() { + return std::make_unique(); +} +} // namespace facebook::velox::common \ No newline at end of file diff --git a/velox/common/compression/v2/Lz4Compression.h b/velox/common/compression/v2/Lz4Compression.h new file mode 100644 index 000000000000..576657eba007 --- /dev/null +++ b/velox/common/compression/v2/Lz4Compression.h @@ -0,0 +1,148 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include "velox/common/compression/v2/Compression.h" +#include "velox/common/compression/v2/HadoopCompressionFormat.h" + +namespace facebook::velox::common { + +static constexpr int32_t kLz4DefaultCompressionLevel = 1; +static constexpr int32_t kLz4MinCompressionLevel = 1; + +class Lz4CodecBase : public Codec { + public: + explicit Lz4CodecBase(int32_t compressionLevel); + + int32_t minimumCompressionLevel() const override; + + int32_t maximumCompressionLevel() const override; + + int32_t defaultCompressionLevel() const override; + + int32_t compressionLevel() const override; + + protected: + const int compressionLevel_; +}; + +class Lz4FrameCodec : public Lz4CodecBase { + public: + explicit Lz4FrameCodec(int32_t compressionLevel); + + uint64_t maxCompressedLength(uint64_t inputLength) override; + + uint64_t compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + uint64_t decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + std::shared_ptr makeCompressor() override; + + std::shared_ptr makeDecompressor() override; + + CompressionKind compressionKind() const override; + + protected: + const LZ4F_preferences_t prefs_; +}; + +class Lz4RawCodec : public Lz4CodecBase { + public: + explicit Lz4RawCodec(int32_t compressionLevel); + + uint64_t maxCompressedLength(uint64_t inputLength) override; + + uint64_t compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + uint64_t decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + std::shared_ptr makeCompressor() override; + + std::shared_ptr makeDecompressor() override; + + CompressionKind compressionKind() const override; +}; + +class Lz4HadoopCodec : public Lz4RawCodec, public HadoopCompressionFormat { + public: + Lz4HadoopCodec(); + + uint64_t maxCompressedLength(uint64_t inputLength) override; + + uint64_t compress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + uint64_t decompress( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; + + std::shared_ptr makeCompressor() override; + + std::shared_ptr makeDecompressor() override; + + CompressionKind compressionKind() const override; + + int32_t minimumCompressionLevel() const override; + + int32_t maximumCompressionLevel() const override; + + int32_t defaultCompressionLevel() const override; + + private: + uint64_t decompressInternal( + uint64_t inputLength, + const uint8_t* input, + uint64_t outputLength, + uint8_t* output) override; +}; + +// Lz4 frame format codec. +std::unique_ptr makeLz4FrameCodec( + int32_t compressionLevel = kLz4DefaultCompressionLevel); + +// Lz4 "raw" format codec. +std::unique_ptr makeLz4RawCodec( + int32_t compressionLevel = kLz4DefaultCompressionLevel); + +// Lz4 "Hadoop" format codec (== Lz4 raw codec prefixed with lengths header) +std::unique_ptr makeLz4HadoopRawCodec(); +} // namespace facebook::velox::common diff --git a/velox/common/compression/v2/tests/CMakeLists.txt b/velox/common/compression/v2/tests/CMakeLists.txt new file mode 100644 index 000000000000..983fd6c81fda --- /dev/null +++ b/velox/common/compression/v2/tests/CMakeLists.txt @@ -0,0 +1,21 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(velox_common_compression_v2_test CompressionTest.cpp) +add_test(velox_common_compression_v2_test velox_common_compression_v2_test) +target_link_libraries( + velox_common_compression_v2_test + PUBLIC velox_link_libs + PRIVATE velox_common_compression_v2 velox_exception glog::glog gtest + gtest_main) diff --git a/velox/common/compression/v2/tests/CompressionTest.cpp b/velox/common/compression/v2/tests/CompressionTest.cpp new file mode 100644 index 000000000000..8c6dc5ce7ae6 --- /dev/null +++ b/velox/common/compression/v2/tests/CompressionTest.cpp @@ -0,0 +1,471 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include "velox/common/base/VeloxException.h" +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/compression/v2/Compression.h" + +namespace facebook::velox::common { + +namespace { + +const std::shared_ptr kDefaultCodecOptions = + std::make_shared(); + +struct TestParam { + CompressionKind compressionKind; + std::shared_ptr codecOptions; + + TestParam( + common::CompressionKind compressionKind, + std::shared_ptr codecOptions = kDefaultCodecOptions) + : compressionKind(compressionKind), codecOptions(codecOptions) {} +}; + +std::vector makeRandomData(size_t n) { + std::vector data(n); + std::default_random_engine engine(42); + std::uniform_int_distribution dist(0, 255); + std::generate(data.begin(), data.end(), [&]() { return dist(engine); }); + return data; +} + +std::vector makeCompressibleData(size_t size) { + std::string baseData = "The quick brown fox jumps over the lazy dog"; + auto repeats = static_cast(1 + size / baseData.size()); + + std::vector data(baseData.size() * repeats); + for (int i = 0; i < repeats; ++i) { + std::memcpy( + data.data() + i * baseData.size(), baseData.data(), baseData.size()); + } + data.resize(size); + return data; +} + +std::function makeRandomInputSize() { + std::default_random_engine engine(42); + std::uniform_int_distribution sizeDistribution(10, 40); + return [=]() mutable -> uint64_t { return sizeDistribution(engine); }; +} + +// Check roundtrip of one-shot compression and decompression functions. +void checkCodecRoundtrip( + Codec* c1, + Codec* c2, + const std::vector& data) { + auto maxCompressedLen = + static_cast(c1->maxCompressedLength(data.size())); + std::vector compressed(maxCompressedLen); + std::vector decompressed(data.size()); + + // Compress with codec c1. + auto compressedSize = c1->compress( + data.size(), data.data(), maxCompressedLen, compressed.data()); + compressed.resize(compressedSize); + + // Decompress with codec c2. + auto decompressedSize = c2->decompress( + compressed.size(), + compressed.data(), + decompressed.size(), + decompressed.data()); + + ASSERT_EQ(data, decompressed); + ASSERT_EQ(data.size(), decompressedSize); +} + +// Use same codec for both compression and decompression. +void checkCodecRoundtrip( + const std::unique_ptr& codec, + const std::vector& data) { + checkCodecRoundtrip(codec.get(), codec.get(), data); +} + +// Compress with codec c1 and decompress with codec c2. +void checkCodecRoundtrip( + const std::unique_ptr& c1, + const std::unique_ptr& c2, + const std::vector& data) { + checkCodecRoundtrip(c1.get(), c2.get(), data); +} + +void streamingCompress( + const std::shared_ptr& compressor, + const std::vector& uncompressed, + std::vector& compressed) { + const uint8_t* input = uncompressed.data(); + uint64_t remaining = uncompressed.size(); + uint64_t compressedSize = 0; + compressed.resize(10); + bool doFlush = false; + // Generate small random input buffer size. + auto randomInputSize = makeRandomInputSize(); + // Continue decompressing until consuming all compressed data . + while (remaining > 0) { + // Feed a small amount each time. + auto inputLength = std::min(remaining, randomInputSize()); + auto outputLength = compressed.size() - compressedSize; + uint8_t* output = compressed.data() + compressedSize; + // Compress once. + auto compressResult = + compressor->compress(inputLength, input, outputLength, output); + ASSERT_LE(compressResult.bytesRead, inputLength); + ASSERT_LE(compressResult.bytesWritten, outputLength); + // Update result. + compressedSize += compressResult.bytesWritten; + input += compressResult.bytesRead; + remaining -= compressResult.bytesRead; + // Grow compressed buffer if it's too small. + if (compressResult.outputTooSmall) { + compressed.resize(compressed.capacity() * 2); + } + // Once every two iterations, do a flush. + if (doFlush) { + Compressor::FlushResult flushResult; + do { + outputLength = compressed.size() - compressedSize; + output = compressed.data() + compressedSize; + flushResult = compressor->flush(outputLength, output); + ASSERT_LE(flushResult.bytesWritten, outputLength); + compressedSize += flushResult.bytesWritten; + if (flushResult.outputTooSmall) { + compressed.resize(compressed.capacity() * 2); + } + } while (flushResult.outputTooSmall); + } + doFlush = !doFlush; + } + // End the compressed stream. + Compressor::EndResult endResult; + do { + int64_t output_len = compressed.size() - compressedSize; + uint8_t* output = compressed.data() + compressedSize; + endResult = compressor->end(output_len, output); + ASSERT_LE(endResult.bytesWritten, output_len); + compressedSize += endResult.bytesWritten; + if (endResult.outputTooSmall) { + compressed.resize(compressed.capacity() * 2); + } + } while (endResult.outputTooSmall); + compressed.resize(compressedSize); +} + +void streamingDecompress( + const std::shared_ptr& decompressor, + const std::vector& compressed, + std::vector& decompressed) { + const uint8_t* input = compressed.data(); + uint64_t remaining = compressed.size(); + uint64_t decompressedSize = 0; + decompressed.resize(10); + // Generate small random input buffer size. + auto ramdomInputSize = makeRandomInputSize(); + // Continue decompressing until finishes. + while (!decompressor->isFinished()) { + // Feed a small amount each time. + auto inputLength = std::min(remaining, ramdomInputSize()); + auto outputLength = decompressed.size() - decompressedSize; + uint8_t* output = decompressed.data() + decompressedSize; + // Decompress once. + auto result = + decompressor->decompress(inputLength, input, outputLength, output); + ASSERT_LE(result.bytesRead, inputLength); + ASSERT_LE(result.bytesWritten, outputLength); + ASSERT_TRUE( + result.outputTooSmall || result.bytesWritten > 0 || + result.bytesRead > 0) + << "Decompression not progressing anymore"; + // Update result. + decompressedSize += result.bytesWritten; + input += result.bytesRead; + remaining -= result.bytesRead; + // Grow decompressed buffer if it's too small. + if (result.outputTooSmall) { + decompressed.resize(decompressed.capacity() * 2); + } + } + ASSERT_TRUE(decompressor->isFinished()); + ASSERT_EQ(remaining, 0); + decompressed.resize(decompressedSize); +} + +// Check the streaming compressor against one-shot decompression +void checkStreamingCompressor(Codec* codec, const std::vector& data) { + // Run streaming compression. + std::vector compressed; + streamingCompress(codec->makeCompressor(), data, compressed); + // Check decompressing the compressed data. + std::vector decompressed(data.size()); + ASSERT_NO_THROW(codec->decompress( + compressed.size(), + compressed.data(), + decompressed.size(), + decompressed.data())); + ASSERT_EQ(data, decompressed); +} + +// Check the streaming decompressor against one-shot compression. +void checkStreamingDecompressor( + Codec* codec, + const std::vector& data) { + // Create compressed data. + auto maxCompressedLen = codec->maxCompressedLength(data.size()); + std::vector compressed(maxCompressedLen); + auto compressedSize = codec->compress( + data.size(), data.data(), maxCompressedLen, compressed.data()); + compressed.resize(compressedSize); + // Run streaming decompression. + std::vector decompressed; + streamingDecompress(codec->makeDecompressor(), compressed, decompressed); + // Check the decompressed data. + ASSERT_EQ(data.size(), decompressed.size()); + ASSERT_EQ(data, decompressed); +} + +// Check the streaming compressor and decompressor together. +void checkStreamingRoundtrip( + const std::shared_ptr& compressor, + const std::shared_ptr& decompressor, + const std::vector& data) { + std::vector compressed; + streamingCompress(compressor, data, compressed); + std::vector decompressed; + streamingDecompress(decompressor, compressed, decompressed); + ASSERT_EQ(data, decompressed); +} + +void checkStreamingRoundtrip(Codec* codec, const std::vector& data) { + checkStreamingRoundtrip( + codec->makeCompressor(), codec->makeDecompressor(), data); +} +} // namespace + +class CodecTest : public ::testing::TestWithParam { + protected: + static CompressionKind getCompressionKind() { + return GetParam().compressionKind; + } + + static const CodecOptions& getCodecOptions() { + return *GetParam().codecOptions; + } + + static std::unique_ptr makeCodec() { + return Codec::create(getCompressionKind(), getCodecOptions()); + } +}; + +TEST_P(CodecTest, specifyCompressionLevel) { + std::vector data = makeRandomData(2000); + const auto kind = getCompressionKind(); + if (!Codec::isAvailable(kind)) { + // Support for this codec hasn't been built. + VELOX_ASSERT_THROW( + Codec::create(kind, Codec::useDefaultCompressionLevel()), + "Support for codec '" + compressionKindToString(kind) + + "' not implemented."); + } else if (!Codec::supportsCompressionLevel(kind)) { + VELOX_ASSERT_THROW( + Codec::create(kind, 1), + fmt::format( + "Codec '{}' doesn't support setting a compression level.", + compressionKindToString(kind))); + } else { + auto codec = Codec::create(kind, Codec::minimumCompressionLevel(kind)); + checkCodecRoundtrip(codec, data); + } +} + +TEST_P(CodecTest, getUncompressedLength) { + auto codec = makeCodec(); + // Test non-empty input. + { + auto inputLength = 100; + auto input = makeRandomData(inputLength); + std::vector compressed(codec->maxCompressedLength(input.size())); + auto compressedLength = codec->compress( + inputLength, input.data(), compressed.size(), compressed.data()); + compressed.resize(compressedLength); + + if (Codec::supportsGetUncompressedLength(getCompressionKind())) { + auto uncompressedLength = + codec->getUncompressedLength(compressedLength, compressed.data()); + ASSERT_EQ( + codec->getUncompressedLength(compressedLength, compressed.data()), + inputLength); + ASSERT_EQ( + codec->getUncompressedLength( + compressedLength, compressed.data(), inputLength), + inputLength); + ASSERT_EQ( + codec->getUncompressedLength( + compressedLength, compressed.data(), std::nullopt), + inputLength); + VELOX_ASSERT_THROW( + codec->getUncompressedLength( + compressedLength, compressed.data(), inputLength + 1), + fmt::format("Invalid uncompressed length: {}", inputLength + 1)); + } else { + ASSERT_EQ( + codec->getUncompressedLength(input.size(), input.data()), + std::nullopt); + ASSERT_EQ( + codec->getUncompressedLength( + input.size(), input.data(), std::nullopt), + std::nullopt); + ASSERT_EQ(codec->getUncompressedLength(input.size(), input.data(), 0), 0); + ASSERT_EQ(codec->getUncompressedLength(input.size(), input.data(), 2), 2); + } + } + // Test empty input. + { + std::vector input{}; + ASSERT_EQ(codec->getUncompressedLength(0, input.data(), 0), 0); + ASSERT_EQ(codec->getUncompressedLength(0, input.data(), std::nullopt), 0); + VELOX_ASSERT_THROW( + codec->getUncompressedLength(0, input.data(), 1), + fmt::format("Invalid uncompressed length: {}", 1)); + } +} + +TEST_P(CodecTest, codecRoundtrip) { + auto codec = makeCodec(); + for (int dataSize : {0, 10, 10000, 100000}) { + checkCodecRoundtrip(codec, makeRandomData(dataSize)); + checkCodecRoundtrip(codec, makeCompressibleData(dataSize)); + } +} + +TEST_P(CodecTest, minMaxCompressionLevel) { + auto type = getCompressionKind(); + auto codec = makeCodec(); + auto notSupportCompressionLevel = [](CompressionKind kind) { + return fmt::format( + "Codec '{}' doesn't support setting a compression level.", + compressionKindToString(kind)); + }; + + if (Codec::supportsCompressionLevel(type)) { + auto minLevel = Codec::minimumCompressionLevel(type); + auto maxLevel = Codec::maximumCompressionLevel(type); + auto defaultLevel = Codec::defaultCompressionLevel(type); + ASSERT_NE(minLevel, Codec::useDefaultCompressionLevel()); + ASSERT_NE(maxLevel, Codec::useDefaultCompressionLevel()); + ASSERT_NE(defaultLevel, Codec::useDefaultCompressionLevel()); + ASSERT_LT(minLevel, maxLevel); + ASSERT_EQ(minLevel, codec->minimumCompressionLevel()); + ASSERT_EQ(maxLevel, codec->maximumCompressionLevel()); + ASSERT_GE(defaultLevel, minLevel); + ASSERT_LE(defaultLevel, maxLevel); + } else { + VELOX_ASSERT_THROW( + Codec::minimumCompressionLevel(type), notSupportCompressionLevel(type)); + VELOX_ASSERT_THROW( + Codec::maximumCompressionLevel(type), notSupportCompressionLevel(type)); + VELOX_ASSERT_THROW( + Codec::defaultCompressionLevel(type), notSupportCompressionLevel(type)); + ASSERT_EQ( + codec->minimumCompressionLevel(), Codec::useDefaultCompressionLevel()); + ASSERT_EQ( + codec->maximumCompressionLevel(), Codec::useDefaultCompressionLevel()); + ASSERT_EQ( + codec->defaultCompressionLevel(), Codec::useDefaultCompressionLevel()); + } +} + +TEST_P(CodecTest, streamingCompressor) { + if (!Codec::supportsStreamingCompression(getCompressionKind())) { + return; + } + + for (auto dataSize : {0, 10, 10000, 100000}) { + auto codec = makeCodec(); + checkStreamingCompressor(codec.get(), makeRandomData(dataSize)); + checkStreamingCompressor(codec.get(), makeCompressibleData(dataSize)); + } +} + +TEST_P(CodecTest, streamingDecompressor) { + if (!Codec::supportsStreamingCompression(getCompressionKind())) { + return; + } + + for (auto dataSize : {0, 10, 10000, 100000}) { + auto codec = makeCodec(); + checkStreamingDecompressor(codec.get(), makeRandomData(dataSize)); + checkStreamingDecompressor(codec.get(), makeCompressibleData(dataSize)); + } +} + +TEST_P(CodecTest, streamingRoundtrip) { + if (!Codec::supportsStreamingCompression(getCompressionKind())) { + return; + } + + for (auto dataSize : {0, 10, 10000, 100000}) { + auto codec = makeCodec(); + checkStreamingRoundtrip(codec.get(), makeRandomData(dataSize)); + checkStreamingRoundtrip(codec.get(), makeCompressibleData(dataSize)); + } +} + +TEST_P(CodecTest, streamingDecompressorReuse) { + if (!Codec::supportsStreamingCompression(getCompressionKind())) { + return; + } + + auto codec = makeCodec(); + auto decompressor = codec->makeDecompressor(); + checkStreamingRoundtrip( + codec->makeCompressor(), decompressor, makeRandomData(100)); + + // Decompressor::reset() should allow reusing decompressor for a new stream. + decompressor->reset(); + checkStreamingRoundtrip( + codec->makeCompressor(), decompressor, makeRandomData(200)); +} + +INSTANTIATE_TEST_SUITE_P( + TestLZ4Frame, + CodecTest, + ::testing::Values(CompressionKind::CompressionKind_LZ4)); +INSTANTIATE_TEST_SUITE_P( + TestLZ4Raw, + CodecTest, + ::testing::Values(CompressionKind::CompressionKind_LZ4RAW)); +INSTANTIATE_TEST_SUITE_P( + TestLZ4Hadoop, + CodecTest, + ::testing::Values(CompressionKind::CompressionKind_LZ4HADOOP)); + +TEST(CodecLZ4HadoopTest, compatibility) { + // LZ4 Hadoop codec should be able to read back LZ4 raw blocks. + auto c1 = Codec::create(CompressionKind::CompressionKind_LZ4RAW); + auto c2 = Codec::create(CompressionKind::CompressionKind_LZ4HADOOP); + + for (auto dataSize : {0, 10, 10000, 100000}) { + checkCodecRoundtrip(c1, c2, makeRandomData(dataSize)); + } +} +} // namespace facebook::velox::common