From 9c0ae08f29818e96bcf3d298d45c1526da465847 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Wed, 26 Feb 2025 13:09:35 +0000 Subject: [PATCH 1/4] add compression v2 API and lz4_frame/lz4_raw/lz4_hadoop codec --- CMakeLists.txt | 12 +- velox/common/compression/CMakeLists.txt | 10 +- velox/common/compression/Compression.cpp | 131 +++++ velox/common/compression/Compression.h | 198 +++++++ .../compression/HadoopCompressionFormat.cpp | 69 +++ .../compression/HadoopCompressionFormat.h | 56 ++ velox/common/compression/Lz4Compression.cpp | 513 ++++++++++++++++++ velox/common/compression/Lz4Compression.h | 148 +++++ .../compression/tests/CompressionTest.cpp | 455 +++++++++++++++- velox/dwio/dwrf/test/TestDecompression.cpp | 12 +- 10 files changed, 1597 insertions(+), 7 deletions(-) create mode 100644 velox/common/compression/HadoopCompressionFormat.cpp create mode 100644 velox/common/compression/HadoopCompressionFormat.h create mode 100644 velox/common/compression/Lz4Compression.cpp create mode 100644 velox/common/compression/Lz4Compression.h diff --git a/CMakeLists.txt b/CMakeLists.txt index eb781253f1ca..29e755144b08 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -140,6 +140,7 @@ option(VELOX_ENABLE_PARQUET "Enable Parquet support" ON) option(VELOX_ENABLE_ARROW "Enable Arrow support" OFF) option(VELOX_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" OFF) option(VELOX_ENABLE_CCACHE "Use ccache if installed." ON) +option(VELOX_ENABLE_COMPRESSION_LZ4 "Enable Lz4 compression support." OFF) option(VELOX_BUILD_TEST_UTILS "Builds Velox test utilities" OFF) option(VELOX_BUILD_VECTOR_TEST_UTILS "Builds Velox vector test utilities" OFF) @@ -190,6 +191,12 @@ if(VELOX_BUILD_TESTING OR VELOX_BUILD_TEST_UTILS) set(VELOX_ENABLE_PARSE ON) endif() +if(${VELOX_BUILD_TESTING} + OR ${VELOX_BUILD_MINIMAL_WITH_DWIO} + OR ${VELOX_ENABLE_HIVE_CONNECTOR}) + set(VELOX_ENABLE_COMPRESSION_LZ4 ON) +endif() + if(${VELOX_ENABLE_EXAMPLES}) set(VELOX_ENABLE_EXPRESSION ON) endif() @@ -457,12 +464,15 @@ velox_resolve_dependency(glog) velox_set_source(fmt) velox_resolve_dependency(fmt 9.0.0) +if(VELOX_ENABLE_COMPRESSION_LZ4) + find_package(lz4 REQUIRED) +endif() + if(${VELOX_BUILD_MINIMAL_WITH_DWIO} OR ${VELOX_ENABLE_HIVE_CONNECTOR}) # DWIO needs all sorts of stream compression libraries. # # TODO: make these optional and pluggable. find_package(ZLIB REQUIRED) - find_package(lz4 REQUIRED) find_package(lzo2 REQUIRED) find_package(zstd REQUIRED) find_package(Snappy REQUIRED) diff --git a/velox/common/compression/CMakeLists.txt b/velox/common/compression/CMakeLists.txt index 25835c26c4bb..d6ff2579ea8b 100644 --- a/velox/common/compression/CMakeLists.txt +++ b/velox/common/compression/CMakeLists.txt @@ -19,5 +19,13 @@ endif() velox_add_library(velox_common_compression Compression.cpp LzoDecompressor.cpp) velox_link_libraries( velox_common_compression - PUBLIC Folly::folly + PUBLIC velox_status Folly::folly PRIVATE velox_exception) + +if(VELOX_ENABLE_COMPRESSION_LZ4) + velox_sources(velox_common_compression PRIVATE Lz4Compression.cpp + HadoopCompressionFormat.cpp) + velox_link_libraries(velox_common_compression PUBLIC lz4::lz4) + velox_compile_definitions(velox_common_compression + PRIVATE VELOX_ENABLE_COMPRESSION_LZ4) +endif() diff --git a/velox/common/compression/Compression.cpp b/velox/common/compression/Compression.cpp index c0031cb792c7..fc4eedc760e5 100644 --- a/velox/common/compression/Compression.cpp +++ b/velox/common/compression/Compression.cpp @@ -16,6 +16,9 @@ #include "velox/common/compression/Compression.h" #include "velox/common/base/Exceptions.h" +#ifdef VELOX_ENABLE_COMPRESSION_LZ4 +#include "velox/common/compression/Lz4Compression.h" +#endif #include @@ -99,4 +102,132 @@ CompressionKind stringToCompressionKind(const std::string& kind) { VELOX_UNSUPPORTED("Not support compression kind {}", kind); } } + +Status Codec::init() { + return Status::OK(); +} + +bool Codec::supportsGetUncompressedLength(CompressionKind kind) { + // TODO: Return true if it's supported by compression kind. + return false; +} + +bool Codec::supportsStreamingCompression(CompressionKind kind) { + switch (kind) { +#ifdef VELOX_ENABLE_COMPRESSION_LZ4 + case CompressionKind::CompressionKind_LZ4: + return true; +#endif + default: + return false; + } +} + +bool Codec::supportsCompressFixedLength(CompressionKind kind) { + // TODO: Return true if it's supported by compression kind. + return false; +} + +Expected> Codec::create( + CompressionKind kind, + const CodecOptions& codecOptions) { + if (!isAvailable(kind)) { + auto name = compressionKindToString(kind); + VELOX_RETURN_UNEXPECTED_IF( + folly::StringPiece({name}).startsWith("unknown"), + Status::Invalid("Unrecognized codec: ", name)); + return folly::makeUnexpected(Status::Invalid( + "Support for codec '{}' is either not built or not implemented.", + name)); + } + + auto compressionLevel = codecOptions.compressionLevel; + std::unique_ptr codec; + switch (kind) { +#ifdef VELOX_ENABLE_COMPRESSION_LZ4 + case CompressionKind::CompressionKind_LZ4: + if (auto options = dynamic_cast(&codecOptions)) { + switch (options->type) { + case Lz4CodecOptions::kLz4Frame: + codec = makeLz4FrameCodec(compressionLevel); + break; + case Lz4CodecOptions::kLz4Raw: + codec = makeLz4RawCodec(compressionLevel); + break; + case Lz4CodecOptions::kLz4Hadoop: + codec = makeLz4HadoopCodec(); + break; + } + } + // By default, create LZ4 Frame codec. + codec = makeLz4FrameCodec(compressionLevel); + break; +#endif + default: + break; + } + VELOX_RETURN_UNEXPECTED_IF( + codec == nullptr, + Status::Invalid(fmt::format( + "Support for codec '{}' is either not built or not implemented.", + compressionKindToString(kind)))); + + VELOX_RETURN_UNEXPECTED_NOT_OK(codec->init()); + + return codec; +} + +Expected> Codec::create( + CompressionKind kind, + int32_t compressionLevel) { + return create(kind, CodecOptions{compressionLevel}); +} + +bool Codec::isAvailable(CompressionKind kind) { + switch (kind) { + case CompressionKind::CompressionKind_NONE: + return true; +#ifdef VELOX_ENABLE_COMPRESSION_LZ4 + case CompressionKind::CompressionKind_LZ4: + return true; +#endif + default: + return false; + } +} + +std::optional Codec::getUncompressedLength( + const uint8_t* input, + uint64_t inputLength) const { + return std::nullopt; +} + +Expected Codec::compressFixedLength( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + return folly::makeUnexpected( + Status::Invalid("'{}' doesn't support fixed-length compression", name())); +} + +Expected> +Codec::makeStreamingCompressor() { + return folly::makeUnexpected(Status::Invalid( + "Streaming compression is unsupported with {} format.", name())); +} + +Expected> +Codec::makeStreamingDecompressor() { + return folly::makeUnexpected(Status::Invalid( + "Streaming decompression is unsupported with {} format.", name())); +} + +int32_t Codec::compressionLevel() const { + return kUseDefaultCompressionLevel; +} + +std::string Codec::name() const { + return compressionKindToString(compressionKind()); +} } // namespace facebook::velox::common diff --git a/velox/common/compression/Compression.h b/velox/common/compression/Compression.h index 218760787861..925b5e27891b 100644 --- a/velox/common/compression/Compression.h +++ b/velox/common/compression/Compression.h @@ -17,9 +17,12 @@ #pragma once #include +#include #include #include +#include "velox/common/base/Status.h" + namespace facebook::velox::common { enum CompressionKind { @@ -45,6 +48,201 @@ CompressionKind stringToCompressionKind(const std::string& kind); constexpr uint64_t DEFAULT_COMPRESSION_BLOCK_SIZE = 256 * 1024; +static constexpr int32_t kUseDefaultCompressionLevel = + std::numeric_limits::min(); + +class StreamingCompressor { + public: + virtual ~StreamingCompressor() = default; + + struct CompressResult { + uint64_t bytesRead; + uint64_t bytesWritten; + bool outputTooSmall; + }; + + struct FlushResult { + uint64_t bytesWritten; + bool outputTooSmall; + }; + + struct EndResult { + uint64_t bytesWritten; + bool outputTooSmall; + }; + + /// Compress some input. + /// If CompressResult.outputTooSmall is true on return, then a larger output + /// buffer should be supplied. + virtual Expected compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) = 0; + + /// Flush part of the compressed output. + /// If FlushResult.outputTooSmall is true on return, flush() should be called + /// again with a larger buffer. + virtual Expected flush( + uint8_t* output, + uint64_t outputLength) = 0; + + /// End compressing, doing whatever is necessary to end the stream. + /// If EndResult.outputTooSmall is true on return, end() should be called + /// again with a larger buffer. Otherwise, the StreamingCompressor should not + /// be used anymore. end() will flush the compressed output. + virtual Expected end(uint8_t* output, uint64_t outputLength) = 0; +}; + +class StreamingDecompressor { + public: + virtual ~StreamingDecompressor() = default; + + struct DecompressResult { + uint64_t bytesRead; + uint64_t bytesWritten; + bool outputTooSmall; + }; + + /// Decompress some input. + /// If outputTooSmall is true on return, a larger output buffer needs + /// to be supplied. + virtual Expected decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) = 0; + + // Return whether the compressed stream is finished. + virtual bool isFinished() = 0; + + // Reinitialize decompressor, making it ready for a new compressed stream. + virtual Status reset() = 0; +}; + +struct CodecOptions { + int32_t compressionLevel; + + CodecOptions(int32_t compressionLevel = kUseDefaultCompressionLevel) + : compressionLevel(compressionLevel) {} + + virtual ~CodecOptions() = default; +}; + +class Codec { + public: + virtual ~Codec() = default; + + // Create a kind for the given compression algorithm with CodecOptions. + static Expected> create( + CompressionKind kind, + const CodecOptions& codecOptions = CodecOptions{}); + + // Create a kind for the given compression algorithm. + static Expected> create( + CompressionKind kind, + int32_t compressionLevel); + + // Return true if support for indicated kind has been enabled. + static bool isAvailable(CompressionKind kind); + + /// Return true if indicated kind supports extracting uncompressed length + /// from compressed data. + static bool supportsGetUncompressedLength(CompressionKind kind); + + /// Return true if indicated kind supports one-shot compression with fixed + /// compressed length. + static bool supportsCompressFixedLength(CompressionKind kind); + + // Return true if indicated kind supports creating streaming de/compressor. + static bool supportsStreamingCompression(CompressionKind kind); + + /// Return the smallest supported compression level. + /// If the codec doesn't support compression level, + /// `kUseDefaultCompressionLevel` will be returned. + virtual int32_t minimumCompressionLevel() const = 0; + + /// Return the largest supported compression level. + /// If the codec doesn't support compression level, + /// `kUseDefaultCompressionLevel` will be returned. + virtual int32_t maximumCompressionLevel() const = 0; + + /// Return the default compression level. + /// If the codec doesn't support compression level, + /// `kUseDefaultCompressionLevel` will be returned. + virtual int32_t defaultCompressionLevel() const = 0; + + /// Performs one-shot compression. + /// `outputLength` must first have been computed using maxCompressedLength(). + /// The actual compressed length will be written to actualOutputLength. + /// Note: One-shot compression is not always compatible with streaming + /// decompression. Depending on the codec (e.g. LZ4), different formats may + /// be used. + virtual Expected compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) = 0; + + /// One-shot decompression function. + /// `outputLength` must be correct and therefore be obtained in advance. + /// The actual decompressed length is returned. + /// Note: One-shot decompression is not always compatible with streaming + /// compression. Depending on the codec (e.g. LZ4), different formats may + /// be used. + virtual Expected decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) = 0; + + /// Performs one-shot compression. + /// This function compresses data and writes the output up to the specified + /// outputLength. If outputLength is too small to hold all the compressed + /// data, the function doesn't fail. Instead, it returns the number of bytes + /// actually written to the output buffer. Any remaining data that couldn't + /// be written in this call will be written in subsequent calls to this + /// function. This is useful when fixed-length compression blocks are required + /// by the caller. + /// Note: Only Gzip and Zstd codec supports this function. + virtual Expected compressFixedLength( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength); + + // Maximum compressed length of given input length. + virtual uint64_t maxCompressedLength(uint64_t inputLength) = 0; + + /// Retrieves the actual uncompressed length of data using the specified + /// compression library. + /// Note: This functionality is not universally supported by all compression + /// libraries. If not supported, `std::nullopt` will be returned. + virtual std::optional getUncompressedLength( + const uint8_t* input, + uint64_t inputLength) const; + + // Create a streaming compressor instance. + virtual Expected> + makeStreamingCompressor(); + + // Create a streaming compressor instance. + virtual Expected> + makeStreamingDecompressor(); + + // This Codec's compression type. + virtual CompressionKind compressionKind() const = 0; + + // This Codec's compression level, if applicable. + virtual int32_t compressionLevel() const; + + // The name of this Codec's compression type. + std::string name() const; + + private: + // Initializes the codec's resources. + virtual Status init(); +}; } // namespace facebook::velox::common template <> diff --git a/velox/common/compression/HadoopCompressionFormat.cpp b/velox/common/compression/HadoopCompressionFormat.cpp new file mode 100644 index 000000000000..91731fc226ea --- /dev/null +++ b/velox/common/compression/HadoopCompressionFormat.cpp @@ -0,0 +1,69 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/compression/HadoopCompressionFormat.h" +#include "velox/common/base/Exceptions.h" + +#include + +namespace facebook::velox::common { + +bool HadoopCompressionFormat::tryDecompressHadoop( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength, + uint64_t& actualDecompressedSize) { + uint64_t totalDecompressedSize = 0; + + while (inputLength >= kPrefixLength) { + const uint32_t expectedDecompressedSize = + folly::Endian::big(folly::loadUnaligned(input)); + const uint32_t expectedCompressedSize = folly::Endian::big( + folly::loadUnaligned(input + sizeof(uint32_t))); + input += kPrefixLength; + inputLength -= kPrefixLength; + + if (inputLength < expectedCompressedSize) { + // Not enough bytes for Hadoop "frame". + return false; + } + if (outputLength < expectedDecompressedSize) { + // Not enough bytes to hold advertised output => probably not Hadoop. + return false; + } + // Try decompressing and compare with expected decompressed length. + auto maybeDecompressedSize = + decompressInternal(input, expectedCompressedSize, output, outputLength); + if (maybeDecompressedSize.hasError() || + maybeDecompressedSize.value() != expectedDecompressedSize) { + return false; + } + + input += expectedCompressedSize; + inputLength -= expectedCompressedSize; + output += expectedDecompressedSize; + outputLength -= expectedDecompressedSize; + totalDecompressedSize += expectedDecompressedSize; + } + + if (inputLength == 0) { + actualDecompressedSize = totalDecompressedSize; + return true; + } + return false; +} +} // namespace facebook::velox::common diff --git a/velox/common/compression/HadoopCompressionFormat.h b/velox/common/compression/HadoopCompressionFormat.h new file mode 100644 index 000000000000..39bdb960cb9b --- /dev/null +++ b/velox/common/compression/HadoopCompressionFormat.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include "velox/common/base/Status.h" + +namespace facebook::velox::common { + +/// Parquet files written with the Hadoop compression codecs use their own +/// framing. +/// The input buffer can contain an arbitrary number of "frames", each +/// with the following structure: +/// - bytes 0..3: big-endian uint32_t representing the frame decompressed +/// size +/// - bytes 4..7: big-endian uint32_t representing the frame compressed size +/// - bytes 8...: frame compressed data +class HadoopCompressionFormat { + protected: + /// Try to decompress input data in Hadoop's compression format. + /// Returns true if decompression is successful, false otherwise. + bool tryDecompressHadoop( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength, + uint64_t& actualDecompressedSize); + + /// Called by tryDecompressHadoop to decompress a single frame and + /// should be implemented based on the specific compression format. + /// E.g. Lz4HadoopCodec uses Lz4RawCodec::decompress to decompress a frame. + virtual Expected decompressInternal( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) = 0; + + // Offset starting at which page data can be read/written. + static constexpr uint64_t kPrefixLength = sizeof(uint32_t) * 2; +}; +} // namespace facebook::velox::common diff --git a/velox/common/compression/Lz4Compression.cpp b/velox/common/compression/Lz4Compression.cpp new file mode 100644 index 000000000000..463e2963b634 --- /dev/null +++ b/velox/common/compression/Lz4Compression.cpp @@ -0,0 +1,513 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/compression/Lz4Compression.h" +#include "velox/common/base/Exceptions.h" + +namespace facebook::velox::common { +namespace { + +constexpr int32_t kLz4DefaultCompressionLevel = 1; +constexpr int32_t kLz4MinCompressionLevel = 1; + +#if (defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER < 10800) +constexpr int32_t kLegacyLz4MaxCompressionLevel = 12; +#endif + +static inline Status lz4Error( + const char* prefixMessage, + LZ4F_errorCode_t errorCode) { + return Status::IOError(prefixMessage, LZ4F_getErrorName(errorCode)); +} + +LZ4F_preferences_t defaultPreferences() { + LZ4F_preferences_t prefs; + memset(&prefs, 0, sizeof(prefs)); + return prefs; +} + +LZ4F_preferences_t defaultPreferences(int32_t compressionLevel) { + LZ4F_preferences_t prefs = defaultPreferences(); + prefs.compressionLevel = compressionLevel; + return prefs; +} +} // namespace + +class LZ4Compressor : public StreamingCompressor { + public: + explicit LZ4Compressor(int32_t compressionLevel); + + ~LZ4Compressor() override; + + Status init(); + + Expected compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + Expected flush(uint8_t* output, uint64_t outputLength) override; + + Expected end(uint8_t* output, uint64_t outputLength) override; + + protected: + Status + compressBegin(uint8_t* output, size_t& outputLen, uint64_t& bytesWritten); + + int32_t compressionLevel_; + LZ4F_compressionContext_t ctx_{nullptr}; + LZ4F_preferences_t prefs_; + bool firstTime_; +}; + +class LZ4Decompressor : public StreamingDecompressor { + public: + LZ4Decompressor() {} + + ~LZ4Decompressor() override { + if (ctx_ != nullptr) { + LZ4F_freeDecompressionContext(ctx_); + } + } + + Expected decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + bool isFinished() override; + + Status reset() override; + + Status init(); + + protected: + LZ4F_decompressionContext_t ctx_{nullptr}; + bool finished_{false}; +}; + +LZ4Compressor::LZ4Compressor(int32_t compressionLevel) + : compressionLevel_(compressionLevel) {} + +LZ4Compressor::~LZ4Compressor() { + if (ctx_ != nullptr) { + LZ4F_freeCompressionContext(ctx_); + } +} + +Status LZ4Compressor::init() { + LZ4F_errorCode_t ret; + prefs_ = defaultPreferences(compressionLevel_); + firstTime_ = true; + + ret = LZ4F_createCompressionContext(&ctx_, LZ4F_VERSION); + VELOX_RETURN_IF(LZ4F_isError(ret), lz4Error("LZ4 init failed: ", ret)); + return Status::OK(); +} + +Expected LZ4Compressor::compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + auto inputSize = static_cast(inputLength); + auto outputSize = static_cast(outputLength); + uint64_t bytesWritten = 0; + + if (firstTime_) { + // Output too small to write LZ4F header. + if (outputLength < LZ4F_HEADER_SIZE_MAX) { + return CompressResult{0, 0, true}; + } + VELOX_RETURN_UNEXPECTED_NOT_OK( + compressBegin(output, outputSize, bytesWritten)); + } + + if (outputSize < LZ4F_compressBound(inputSize, &prefs_)) { + // Output too small to compress into. + return CompressResult{0, bytesWritten, true}; + } + auto numBytesOrError = LZ4F_compressUpdate( + ctx_, output, outputSize, input, inputSize, nullptr /* options */); + VELOX_RETURN_UNEXPECTED_IF( + LZ4F_isError(numBytesOrError), + lz4Error("LZ4 compress updated failed: ", numBytesOrError)); + bytesWritten += static_cast(numBytesOrError); + + VELOX_DCHECK_LE(bytesWritten, outputSize); + return CompressResult{inputLength, bytesWritten, false}; +} + +Expected LZ4Compressor::flush( + uint8_t* output, + uint64_t outputLength) { + auto outputSize = static_cast(outputLength); + uint64_t bytesWritten = 0; + + if (firstTime_) { + // Output too small to write LZ4F header. + if (outputLength < LZ4F_HEADER_SIZE_MAX) { + return FlushResult{0, true}; + } + VELOX_RETURN_UNEXPECTED_NOT_OK( + compressBegin(output, outputSize, bytesWritten)); + } + + if (outputSize < LZ4F_compressBound(0, &prefs_)) { + // Output too small to flush into. + return FlushResult{bytesWritten, true}; + } + + auto numBytesOrError = + LZ4F_flush(ctx_, output, outputSize, nullptr /* options */); + VELOX_RETURN_UNEXPECTED_IF( + LZ4F_isError(numBytesOrError), + lz4Error("LZ4 flush failed: ", numBytesOrError)); + bytesWritten += static_cast(numBytesOrError); + + VELOX_DCHECK_LE(bytesWritten, outputLength); + return FlushResult{bytesWritten, false}; +} + +Expected LZ4Compressor::end( + uint8_t* output, + uint64_t outputLength) { + auto outputSize = static_cast(outputLength); + uint64_t bytesWritten = 0; + + if (firstTime_) { + // Output too small to write LZ4F header. + if (outputLength < LZ4F_HEADER_SIZE_MAX) { + return EndResult{0, true}; + } + VELOX_RETURN_UNEXPECTED_NOT_OK( + compressBegin(output, outputSize, bytesWritten)); + } + + if (outputSize < LZ4F_compressBound(0, &prefs_)) { + // Output too small to end frame into. + return EndResult{bytesWritten, true}; + } + + auto numBytesOrError = + LZ4F_compressEnd(ctx_, output, outputSize, nullptr /* options */); + VELOX_RETURN_UNEXPECTED_IF( + LZ4F_isError(numBytesOrError), + lz4Error("LZ4 end failed: ", numBytesOrError)); + bytesWritten += static_cast(numBytesOrError); + + VELOX_DCHECK_LE(bytesWritten, outputLength); + return EndResult{bytesWritten, false}; +} + +Status LZ4Compressor::compressBegin( + uint8_t* output, + size_t& outputLen, + uint64_t& bytesWritten) { + auto numBytesOrError = LZ4F_compressBegin(ctx_, output, outputLen, &prefs_); + VELOX_RETURN_IF( + LZ4F_isError(numBytesOrError), + lz4Error("LZ4 compress begin failed: ", numBytesOrError)); + firstTime_ = false; + output += numBytesOrError; + outputLen -= numBytesOrError; + bytesWritten += static_cast(numBytesOrError); + return Status::OK(); +} + +Status common::LZ4Decompressor::init() { + finished_ = false; + auto ret = LZ4F_createDecompressionContext(&ctx_, LZ4F_VERSION); + VELOX_RETURN_IF(LZ4F_isError(ret), lz4Error("LZ4 init failed: ", ret)); + return Status::OK(); +} + +Status LZ4Decompressor::reset() { +#if defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER >= 10800 + // LZ4F_resetDecompressionContext appeared in 1.8.0 + if (ctx_ == nullptr) { + return Status::Invalid("LZ4 decompression context is null."); + } + LZ4F_resetDecompressionContext(ctx_); + finished_ = false; + return Status::OK(); +#else + if (ctx_ != nullptr) { + LZ4F_freeDecompressionContext(ctx_); + } + return init(); +#endif +} + +Expected LZ4Decompressor::decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + auto inputSize = static_cast(inputLength); + auto outputSize = static_cast(outputLength); + + auto ret = LZ4F_decompress( + ctx_, output, &outputSize, input, &inputSize, nullptr /* options */); + VELOX_RETURN_UNEXPECTED_IF( + LZ4F_isError(ret), lz4Error("LZ4 decompress failed: ", ret)); + finished_ = (ret == 0); + return DecompressResult{ + static_cast(inputSize), + static_cast(outputSize), + (inputSize == 0 && outputSize == 0)}; +} + +bool LZ4Decompressor::isFinished() { + return finished_; +} + +Lz4CodecBase::Lz4CodecBase(int32_t compressionLevel) + : compressionLevel_( + compressionLevel == kUseDefaultCompressionLevel + ? kLz4DefaultCompressionLevel + : compressionLevel) {} + +int32_t Lz4CodecBase::minimumCompressionLevel() const { + return kLz4MinCompressionLevel; +} + +int32_t Lz4CodecBase::maximumCompressionLevel() const { +#if (defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER < 10800) + return kLegacyLz4MaxCompressionLevel; +#else + return LZ4F_compressionLevel_max(); +#endif +} + +int32_t Lz4CodecBase::defaultCompressionLevel() const { + return kLz4DefaultCompressionLevel; +} + +int32_t Lz4CodecBase::compressionLevel() const { + return compressionLevel_; +} + +CompressionKind Lz4CodecBase::compressionKind() const { + return CompressionKind::CompressionKind_LZ4; +} + +Lz4FrameCodec::Lz4FrameCodec(int32_t compressionLevel) + : Lz4CodecBase(compressionLevel), + prefs_(defaultPreferences(compressionLevel_)) {} + +uint64_t Lz4FrameCodec::maxCompressedLength(uint64_t inputLen) { + return static_cast( + LZ4F_compressFrameBound(static_cast(inputLen), &prefs_)); +} + +Expected Lz4FrameCodec::compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + auto ret = LZ4F_compressFrame( + output, + static_cast(outputLength), + input, + static_cast(inputLength), + &prefs_); + VELOX_RETURN_UNEXPECTED_IF( + LZ4F_isError(ret), lz4Error("Lz4 compression failure: ", ret)); + return static_cast(ret); +} + +Expected Lz4FrameCodec::decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + return makeStreamingDecompressor().then( + [&](const auto& decompressor) -> Expected { + uint64_t bytesWritten = 0; + while (!decompressor->isFinished() && inputLength != 0) { + auto maybeResult = decompressor->decompress( + input, inputLength, output, outputLength); + VELOX_RETURN_UNEXPECTED(maybeResult); + + const auto& result = maybeResult.value(); + input += result.bytesRead; + inputLength -= result.bytesRead; + output += result.bytesWritten; + outputLength -= result.bytesWritten; + bytesWritten += result.bytesWritten; + VELOX_RETURN_UNEXPECTED_IF( + result.outputTooSmall, + Status::IOError("Lz4 decompression buffer too small.")); + } + VELOX_RETURN_UNEXPECTED_IF( + !decompressor->isFinished() || inputLength != 0, + Status::IOError( + "Lz4 compressed input contains less than one frame.")); + return bytesWritten; + }); +} + +Expected> +Lz4FrameCodec::makeStreamingCompressor() { + auto ptr = std::make_shared(compressionLevel_); + VELOX_RETURN_UNEXPECTED_NOT_OK(ptr->init()); + return ptr; +} + +Expected> +Lz4FrameCodec::makeStreamingDecompressor() { + auto ptr = std::make_shared(); + VELOX_RETURN_UNEXPECTED_NOT_OK(ptr->init()); + return ptr; +} + +Lz4RawCodec::Lz4RawCodec(int32_t compressionLevel) + : Lz4CodecBase(compressionLevel) {} + +uint64_t Lz4RawCodec::maxCompressedLength(uint64_t inputLength) { + return static_cast( + LZ4_compressBound(static_cast(inputLength))); +} + +Expected Lz4RawCodec::compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + uint64_t compressedSize; +#ifdef LZ4HC_CLEVEL_MIN + constexpr int32_t kMinHcClevel = LZ4HC_CLEVEL_MIN; +#else // For older versions of the lz4 library. + constexpr int32_t kMinHcClevel = 3; +#endif + if (compressionLevel_ < kMinHcClevel) { + compressedSize = LZ4_compress_default( + reinterpret_cast(input), + reinterpret_cast(output), + static_cast(inputLength), + static_cast(outputLength)); + } else { + compressedSize = LZ4_compress_HC( + reinterpret_cast(input), + reinterpret_cast(output), + static_cast(inputLength), + static_cast(outputLength), + compressionLevel_); + } + VELOX_RETURN_UNEXPECTED_IF( + compressedSize == 0, Status::IOError("Lz4 compression failure.")); + return static_cast(compressedSize); +} + +Expected Lz4RawCodec::decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + auto decompressedSize = LZ4_decompress_safe( + reinterpret_cast(input), + reinterpret_cast(output), + static_cast(inputLength), + static_cast(outputLength)); + VELOX_RETURN_UNEXPECTED_IF( + decompressedSize < 0, Status::IOError("Lz4 decompression failure.")); + return static_cast(decompressedSize); +} + +Lz4HadoopCodec::Lz4HadoopCodec() : Lz4RawCodec(kLz4DefaultCompressionLevel) {} + +uint64_t Lz4HadoopCodec::maxCompressedLength(uint64_t inputLength) { + return kPrefixLength + Lz4RawCodec::maxCompressedLength(inputLength); +} + +Expected Lz4HadoopCodec::compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + VELOX_RETURN_UNEXPECTED_IF( + outputLength < kPrefixLength, + Status::IOError( + "Output buffer too small for Lz4HadoopCodec compression.")); + + return Lz4RawCodec::compress( + input, + inputLength, + output + kPrefixLength, + outputLength - kPrefixLength) + .then([&](const auto& compressedSize) { + // Prepend decompressed size in bytes and compressed size in bytes + // to be compatible with Hadoop Lz4RawCodec. + const uint32_t decompressedLength = + folly::Endian::big(static_cast(inputLength)); + const uint32_t compressedLength = + folly::Endian::big(static_cast(compressedSize)); + folly::storeUnaligned(output, decompressedLength); + folly::storeUnaligned(output + sizeof(uint32_t), compressedLength); + return kPrefixLength + compressedSize; + }); +} + +Expected Lz4HadoopCodec::decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + uint64_t decompressedSize; + if (tryDecompressHadoop( + input, inputLength, output, outputLength, decompressedSize)) { + return decompressedSize; + } + // Fall back on raw LZ4 codec (for files produces by earlier versions of + // Parquet C++). + return Lz4RawCodec::decompress(input, inputLength, output, outputLength); +} + +int32_t Lz4HadoopCodec::minimumCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +int32_t Lz4HadoopCodec::maximumCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +int32_t Lz4HadoopCodec::defaultCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +Expected Lz4HadoopCodec::decompressInternal( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + return Lz4RawCodec::decompress(input, inputLength, output, outputLength); +} + +std::unique_ptr makeLz4FrameCodec(int32_t compressionLevel) { + return std::make_unique(compressionLevel); +} + +std::unique_ptr makeLz4RawCodec(int32_t compressionLevel) { + return std::make_unique(compressionLevel); +} + +std::unique_ptr makeLz4HadoopCodec() { + return std::make_unique(); +} +} // namespace facebook::velox::common diff --git a/velox/common/compression/Lz4Compression.h b/velox/common/compression/Lz4Compression.h new file mode 100644 index 000000000000..0d0f238d979d --- /dev/null +++ b/velox/common/compression/Lz4Compression.h @@ -0,0 +1,148 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include "velox/common/compression/Compression.h" +#include "velox/common/compression/HadoopCompressionFormat.h" + +namespace facebook::velox::common { + +struct Lz4CodecOptions : CodecOptions { + enum Type { kLz4Frame, kLz4Raw, kLz4Hadoop }; + + Lz4CodecOptions( + Lz4CodecOptions::Type type, + int32_t compressionLevel = kUseDefaultCompressionLevel) + : CodecOptions(compressionLevel), type(type) {} + + Lz4CodecOptions::Type type; +}; + +class Lz4CodecBase : public Codec { + public: + explicit Lz4CodecBase(int32_t compressionLevel); + + int32_t minimumCompressionLevel() const override; + + int32_t maximumCompressionLevel() const override; + + int32_t defaultCompressionLevel() const override; + + int32_t compressionLevel() const override; + + CompressionKind compressionKind() const override; + + protected: + const int32_t compressionLevel_; +}; + +class Lz4FrameCodec : public Lz4CodecBase { + public: + explicit Lz4FrameCodec(int32_t compressionLevel); + + uint64_t maxCompressedLength(uint64_t inputLength) override; + + Expected compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + Expected decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + Expected> makeStreamingCompressor() + override; + + Expected> makeStreamingDecompressor() + override; + + protected: + const LZ4F_preferences_t prefs_; +}; + +class Lz4RawCodec : public Lz4CodecBase { + public: + explicit Lz4RawCodec(int32_t compressionLevel); + + uint64_t maxCompressedLength(uint64_t inputLength) override; + + Expected compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + Expected decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; +}; + +/// The Hadoop Lz4Codec source code can be found here: +/// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc +class Lz4HadoopCodec : public Lz4RawCodec, public HadoopCompressionFormat { + public: + Lz4HadoopCodec(); + + uint64_t maxCompressedLength(uint64_t inputLength) override; + + Expected compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + Expected decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + int32_t minimumCompressionLevel() const override; + + int32_t maximumCompressionLevel() const override; + + int32_t defaultCompressionLevel() const override; + + private: + Expected decompressInternal( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; +}; + +// Lz4 frame format codec. +std::unique_ptr makeLz4FrameCodec( + int32_t compressionLevel = kUseDefaultCompressionLevel); + +// Lz4 "raw" format codec. +std::unique_ptr makeLz4RawCodec( + int32_t compressionLevel = kUseDefaultCompressionLevel); + +// Lz4 "Hadoop" format codec (Lz4 raw codec prefixed with lengths header). +std::unique_ptr makeLz4HadoopCodec(); +} // namespace facebook::velox::common diff --git a/velox/common/compression/tests/CompressionTest.cpp b/velox/common/compression/tests/CompressionTest.cpp index 59dfc3e43947..7be796a62225 100644 --- a/velox/common/compression/tests/CompressionTest.cpp +++ b/velox/common/compression/tests/CompressionTest.cpp @@ -14,14 +14,316 @@ * limitations under the License. */ +#include +#include +#include +#include +#include +#include + #include -#include "velox/common/base/VeloxException.h" +#include "velox/common/base/Exceptions.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/compression/Compression.h" +#include "velox/common/compression/Lz4Compression.h" namespace facebook::velox::common { +namespace { + +void throwsNotOk(const Status& status) { + VELOX_USER_FAIL("{}", status.message()); +} + +const std::shared_ptr kDefaultCodecOptions = + std::make_shared(); + +struct TestParams { + CompressionKind compressionKind; + std::shared_ptr codecOptions; + + explicit TestParams( + common::CompressionKind compressionKind, + std::shared_ptr codecOptions = kDefaultCodecOptions) + : compressionKind(compressionKind), + codecOptions(std::move(codecOptions)) {} +}; + +std::vector generateLz4TestParams() { + std::vector params; + for (auto type : + {Lz4CodecOptions::kLz4Raw, + Lz4CodecOptions::kLz4Frame, + Lz4CodecOptions::kLz4Hadoop}) { + params.emplace_back( + CompressionKind::CompressionKind_LZ4, + std::make_shared(type)); + } + return params; +} + +std::vector makeRandomData(size_t n) { + std::vector data(n); + std::default_random_engine engine(42); + std::uniform_int_distribution dist(0, 255); + std::generate(data.begin(), data.end(), [&]() { return dist(engine); }); + return data; +} + +std::vector makeCompressibleData(size_t size) { + std::string baseData = "The quick brown fox jumps over the lazy dog"; + auto repeats = static_cast(1 + size / baseData.size()); + + std::vector data(baseData.size() * repeats); + for (int i = 0; i < repeats; ++i) { + std::memcpy( + data.data() + i * baseData.size(), baseData.data(), baseData.size()); + } + data.resize(size); + return data; +} + +std::function makeRandomInputSize() { + std::default_random_engine engine(42); + std::uniform_int_distribution sizeDistribution(10, 40); + return [=]() mutable -> uint64_t { return sizeDistribution(engine); }; +} + +// Check roundtrip of one-shot compression and decompression functions. +void checkCodecRoundtrip( + Codec* c1, + Codec* c2, + const std::vector& data) { + auto maxCompressedLen = + static_cast(c1->maxCompressedLength(data.size())); + std::vector compressed(maxCompressedLen); + std::vector decompressed(data.size()); + + // Compress with codec c1. + auto compressionLength = + c1->compress( + data.data(), data.size(), compressed.data(), maxCompressedLen) + .thenOrThrow(folly::identity, throwsNotOk); + compressed.resize(compressionLength); + + // Decompress with codec c2. + auto decompressedLength = c2->decompress( + compressed.data(), + compressed.size(), + decompressed.data(), + decompressed.size()) + .thenOrThrow(folly::identity, throwsNotOk); + ASSERT_EQ(data, decompressed); + ASSERT_EQ(data.size(), decompressedLength); +} + +// Use same codec for both compression and decompression. +void checkCodecRoundtrip( + const std::unique_ptr& codec, + const std::vector& data) { + checkCodecRoundtrip(codec.get(), codec.get(), data); +} + +// Compress with codec c1 and decompress with codec c2. +void checkCodecRoundtrip( + const std::unique_ptr& c1, + const std::unique_ptr& c2, + const std::vector& data) { + checkCodecRoundtrip(c1.get(), c2.get(), data); +} + +void streamingCompress( + const std::shared_ptr& compressor, + const std::vector& uncompressed, + std::vector& compressed) { + const uint8_t* input = uncompressed.data(); + uint64_t remaining = uncompressed.size(); + uint64_t compressedSize = 0; + compressed.resize(10); + bool doFlush = false; + + // Generate small random input buffer size. + auto randomInputSize = makeRandomInputSize(); + + // Continue decompressing until consuming all compressed data . + while (remaining > 0) { + // Feed a small amount each time. + auto inputLength = std::min(remaining, randomInputSize()); + auto outputLength = compressed.size() - compressedSize; + uint8_t* output = compressed.data() + compressedSize; + + // Compress once. + auto compressResult = + compressor->compress(input, inputLength, output, outputLength) + .thenOrThrow(folly::identity, throwsNotOk); + ASSERT_LE(compressResult.bytesRead, inputLength); + ASSERT_LE(compressResult.bytesWritten, outputLength); + + // Update result. + compressedSize += compressResult.bytesWritten; + input += compressResult.bytesRead; + remaining -= compressResult.bytesRead; + + // Grow compressed buffer if it's too small. + if (compressResult.outputTooSmall) { + compressed.resize(compressed.capacity() * 2); + } + + // Once every two iterations, do a flush. + if (doFlush) { + bool outputTooSmall; + do { + outputLength = compressed.size() - compressedSize; + output = compressed.data() + compressedSize; + auto flushResult = compressor->flush(output, outputLength) + .thenOrThrow(folly::identity, throwsNotOk); + ASSERT_LE(flushResult.bytesWritten, outputLength); + compressedSize += flushResult.bytesWritten; + + outputTooSmall = flushResult.outputTooSmall; + if (outputTooSmall) { + compressed.resize(compressed.capacity() * 2); + } + } while (outputTooSmall); + } + doFlush = !doFlush; + } + + // End the compressed stream. + { + bool outputTooSmall; + do { + int64_t outputLength = compressed.size() - compressedSize; + uint8_t* output = compressed.data() + compressedSize; + auto endResult = compressor->end(output, outputLength) + .thenOrThrow(folly::identity, throwsNotOk); + ASSERT_LE(endResult.bytesWritten, outputLength); + compressedSize += endResult.bytesWritten; + + outputTooSmall = endResult.outputTooSmall; + if (outputTooSmall) { + compressed.resize(compressed.capacity() * 2); + } + } while (outputTooSmall); + } + compressed.resize(compressedSize); +} + +void streamingDecompress( + const std::shared_ptr& decompressor, + const std::vector& compressed, + std::vector& decompressed) { + const uint8_t* input = compressed.data(); + uint64_t remaining = compressed.size(); + uint64_t decompressedSize = 0; + decompressed.resize(10); + + // Generate small random input buffer size. + auto ramdomInputSize = makeRandomInputSize(); + + // Continue decompressing until finishes. + while (!decompressor->isFinished()) { + // Feed a small amount each time. + auto inputLength = std::min(remaining, ramdomInputSize()); + auto outputLength = decompressed.size() - decompressedSize; + uint8_t* output = decompressed.data() + decompressedSize; + + // Decompress once. + auto decompressResult = + decompressor->decompress(input, inputLength, output, outputLength) + .thenOrThrow(folly::identity, throwsNotOk); + ASSERT_LE(decompressResult.bytesRead, inputLength); + ASSERT_LE(decompressResult.bytesWritten, outputLength); + ASSERT_TRUE( + decompressResult.outputTooSmall || decompressResult.bytesWritten > 0 || + decompressResult.bytesRead > 0) + << "Decompression not progressing anymore"; + + // Update decompressResult. + decompressedSize += decompressResult.bytesWritten; + input += decompressResult.bytesRead; + remaining -= decompressResult.bytesRead; + + // Grow decompressed buffer if it's too small. + if (decompressResult.outputTooSmall) { + decompressed.resize(decompressed.capacity() * 2); + } + } + ASSERT_TRUE(decompressor->isFinished()); + ASSERT_EQ(remaining, 0); + decompressed.resize(decompressedSize); +} + +std::shared_ptr makeStreamingCompressor(Codec* codec) { + return codec->makeStreamingCompressor().thenOrThrow( + folly::identity, throwsNotOk); +} + +std::shared_ptr makeStreamingDecompressor(Codec* codec) { + return codec->makeStreamingDecompressor().thenOrThrow( + folly::identity, throwsNotOk); +} + +// Check the streaming compressor against one-shot decompression. +void checkStreamingCompressor(Codec* codec, const std::vector& data) { + // Run streaming compression. + std::vector compressed; + const auto& compressor = makeStreamingCompressor(codec); + streamingCompress(compressor, data, compressed); + + // Check decompressing the compressed data. + std::vector decompressed(data.size()); + ASSERT_NO_THROW(codec->decompress( + compressed.data(), + compressed.size(), + decompressed.data(), + decompressed.size())); + ASSERT_EQ(data, decompressed); +} + +// Check the streaming decompressor against one-shot compression. +void checkStreamingDecompressor( + Codec* codec, + const std::vector& data) { + // Create compressed data. + auto maxCompressedLen = codec->maxCompressedLength(data.size()); + std::vector compressed(maxCompressedLen); + auto compressedLength = + codec + ->compress( + data.data(), data.size(), compressed.data(), maxCompressedLen) + .thenOrThrow(folly::identity, throwsNotOk); + compressed.resize(compressedLength); + + // Run streaming decompression. + std::vector decompressed; + const auto& decompressor = makeStreamingDecompressor(codec); + streamingDecompress(decompressor, compressed, decompressed); + + // Check the decompressed data. + ASSERT_EQ(data.size(), decompressed.size()); + ASSERT_EQ(data, decompressed); +} + +// Check the streaming compressor and decompressor together. +void checkStreamingRoundtrip( + const std::shared_ptr& compressor, + const std::shared_ptr& decompressor, + const std::vector& data) { + std::vector compressed; + streamingCompress(compressor, data, compressed); + std::vector decompressed; + streamingDecompress(decompressor, compressed, decompressed); + ASSERT_EQ(data, decompressed); +} + +void checkStreamingRoundtrip(Codec* codec, const std::vector& data) { + checkStreamingRoundtrip( + makeStreamingCompressor(codec), makeStreamingDecompressor(codec), data); +} +} // namespace + class CompressionTest : public testing::Test {}; TEST_F(CompressionTest, testCompressionNames) { @@ -31,6 +333,7 @@ TEST_F(CompressionTest, testCompressionNames) { EXPECT_EQ("lzo", compressionKindToString(CompressionKind_LZO)); EXPECT_EQ("lz4", compressionKindToString(CompressionKind_LZ4)); EXPECT_EQ("zstd", compressionKindToString(CompressionKind_ZSTD)); + EXPECT_EQ("gzip", compressionKindToString(CompressionKind_GZIP)); EXPECT_EQ( "unknown - 99", compressionKindToString(static_cast(99))); @@ -59,4 +362,154 @@ TEST_F(CompressionTest, stringToCompressionKind) { VELOX_ASSERT_THROW( stringToCompressionKind("bz2"), "Not support compression kind bz2"); } + +class CodecTest : public ::testing::TestWithParam { + protected: + static CompressionKind getCompressionKind() { + return GetParam().compressionKind; + } + + static const CodecOptions& getCodecOptions() { + return *GetParam().codecOptions; + } + + static std::unique_ptr makeCodec() { + return Codec::create(getCompressionKind(), getCodecOptions()) + .thenOrThrow( + [](auto codec) { return codec; }, + [](const Status& invalid) { + VELOX_FAIL("Failed to create codec: {}", invalid); + }); + } +}; + +TEST_P(CodecTest, specifyCompressionLevel) { + std::vector data = makeRandomData(2000); + const auto kind = getCompressionKind(); + if (!Codec::isAvailable(kind)) { + // Support for this codec hasn't been built. + VELOX_ASSERT_THROW( + Codec::create(kind, kUseDefaultCompressionLevel), + "Support for codec '" + compressionKindToString(kind) + + "' is either not built or not implemented."); + return; + } + auto codecDefault = + Codec::create(kind).thenOrThrow(folly::identity, throwsNotOk); + checkCodecRoundtrip(codecDefault, data); + + for (const auto& compressionLevel : + {codecDefault->defaultCompressionLevel(), + codecDefault->minimumCompressionLevel(), + codecDefault->maximumCompressionLevel()}) { + auto codec = Codec::create(kind, compressionLevel) + .thenOrThrow(folly::identity, throwsNotOk); + checkCodecRoundtrip(codec, data); + } +} + +TEST_P(CodecTest, getUncompressedLength) { + auto codec = makeCodec(); + auto inputLength = 100; + auto input = makeRandomData(inputLength); + std::vector compressed(codec->maxCompressedLength(input.size())); + auto compressedLength = + codec + ->compress( + input.data(), inputLength, compressed.data(), compressed.size()) + .thenOrThrow(folly::identity, throwsNotOk); + compressed.resize(compressedLength); + + if (Codec::supportsGetUncompressedLength(getCompressionKind())) { + ASSERT_EQ( + codec->getUncompressedLength(compressed.data(), compressedLength), + inputLength); + } else { + ASSERT_EQ( + codec->getUncompressedLength(compressed.data(), compressedLength), + std::nullopt); + } +} + +TEST_P(CodecTest, codecRoundtrip) { + auto codec = makeCodec(); + for (int dataSize : {0, 10, 10000, 100000}) { + checkCodecRoundtrip(codec, makeRandomData(dataSize)); + checkCodecRoundtrip(codec, makeCompressibleData(dataSize)); + } +} + +TEST_P(CodecTest, streamingCompressor) { + if (!Codec::supportsStreamingCompression(getCompressionKind())) { + return; + } + + for (auto dataSize : {0, 10, 10000, 100000}) { + auto codec = makeCodec(); + checkStreamingCompressor(codec.get(), makeRandomData(dataSize)); + checkStreamingCompressor(codec.get(), makeCompressibleData(dataSize)); + } +} + +TEST_P(CodecTest, streamingDecompressor) { + if (!Codec::supportsStreamingCompression(getCompressionKind())) { + return; + } + + for (auto dataSize : {0, 10, 10000, 100000}) { + auto codec = makeCodec(); + checkStreamingDecompressor(codec.get(), makeRandomData(dataSize)); + checkStreamingDecompressor(codec.get(), makeCompressibleData(dataSize)); + } +} + +TEST_P(CodecTest, streamingRoundtrip) { + if (!Codec::supportsStreamingCompression(getCompressionKind())) { + return; + } + + for (auto dataSize : {0, 10, 10000, 100000}) { + auto codec = makeCodec(); + checkStreamingRoundtrip(codec.get(), makeRandomData(dataSize)); + checkStreamingRoundtrip(codec.get(), makeCompressibleData(dataSize)); + } +} + +TEST_P(CodecTest, streamingDecompressorReuse) { + if (!Codec::supportsStreamingCompression(getCompressionKind())) { + return; + } + + auto codec = makeCodec(); + const auto& decompressor = makeStreamingDecompressor(codec.get()); + checkStreamingRoundtrip( + makeStreamingCompressor(codec.get()), decompressor, makeRandomData(100)); + + // StreamingDecompressor::reset() should allow reusing decompressor for a + // new stream. + ASSERT_TRUE(decompressor->reset().ok()); + checkStreamingRoundtrip( + makeStreamingCompressor(codec.get()), decompressor, makeRandomData(200)); +} + +INSTANTIATE_TEST_SUITE_P( + TestLz4, + CodecTest, + ::testing::ValuesIn(generateLz4TestParams())); + +TEST(CodecLZ4HadoopTest, compatibility) { + // LZ4 Hadoop codec should be able to read back LZ4 raw blocks. + auto c1 = Codec::create( + CompressionKind::CompressionKind_LZ4, + Lz4CodecOptions{Lz4CodecOptions::kLz4Raw}) + .thenOrThrow([](auto codec) { return codec; }, throwsNotOk); + auto c2 = Codec::create( + CompressionKind::CompressionKind_LZ4, + Lz4CodecOptions{Lz4CodecOptions::kLz4Hadoop}) + .thenOrThrow([](auto codec) { return codec; }, throwsNotOk); + + for (auto dataSize : {0, 10, 10000, 100000}) { + checkCodecRoundtrip(c1, c2, makeRandomData(dataSize)); + } +} } // namespace facebook::velox::common diff --git a/velox/dwio/dwrf/test/TestDecompression.cpp b/velox/dwio/dwrf/test/TestDecompression.cpp index c2de52a9d7b2..77d7c0725dd0 100644 --- a/velox/dwio/dwrf/test/TestDecompression.cpp +++ b/velox/dwio/dwrf/test/TestDecompression.cpp @@ -847,8 +847,12 @@ void writeHeader(char* buffer, size_t compressedSize, bool original) { buffer[2] = static_cast(compressedSize >> 15); } -size_t -compress(char* buf, size_t size, char* output, size_t offset, Codec& codec) { +size_t compress( + char* buf, + size_t size, + char* output, + size_t offset, + folly::io::Codec& codec) { auto ioBuf = folly::IOBuf::wrapBuffer(buf, size); auto compressed = codec.compress(ioBuf.get()); auto str = compressed->moveToFbString(); @@ -873,7 +877,7 @@ class TestSeek : public ::testing::Test { kind, std::move(input), bufferSize, *pool_, "Test Decompression"); } - void runTest(Codec& codec, CompressionKind kind) { + void runTest(folly::io::Codec& codec, CompressionKind kind) { constexpr size_t inputSize = 1024; constexpr size_t outputSize = 4096; char output[outputSize]; @@ -914,7 +918,7 @@ class TestSeek : public ::testing::Test { } static void prepareTestData( - Codec& codec, + folly::io::Codec& codec, char* input1, char* input2, size_t inputSize, From 59ad345ab1167838f9c17f8c0438bd0bb6cc8252 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Thu, 27 Feb 2025 09:34:12 +0000 Subject: [PATCH 2/4] refine comments for outputTooSmall --- velox/common/compression/Compression.cpp | 8 ++++---- velox/common/compression/Compression.h | 14 +++++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/velox/common/compression/Compression.cpp b/velox/common/compression/Compression.cpp index fc4eedc760e5..16151a4e17c6 100644 --- a/velox/common/compression/Compression.cpp +++ b/velox/common/compression/Compression.cpp @@ -115,7 +115,7 @@ bool Codec::supportsGetUncompressedLength(CompressionKind kind) { bool Codec::supportsStreamingCompression(CompressionKind kind) { switch (kind) { #ifdef VELOX_ENABLE_COMPRESSION_LZ4 - case CompressionKind::CompressionKind_LZ4: + case CompressionKind_LZ4: return true; #endif default: @@ -145,7 +145,7 @@ Expected> Codec::create( std::unique_ptr codec; switch (kind) { #ifdef VELOX_ENABLE_COMPRESSION_LZ4 - case CompressionKind::CompressionKind_LZ4: + case CompressionKind_LZ4: if (auto options = dynamic_cast(&codecOptions)) { switch (options->type) { case Lz4CodecOptions::kLz4Frame: @@ -185,10 +185,10 @@ Expected> Codec::create( bool Codec::isAvailable(CompressionKind kind) { switch (kind) { - case CompressionKind::CompressionKind_NONE: + case CompressionKind_NONE: return true; #ifdef VELOX_ENABLE_COMPRESSION_LZ4 - case CompressionKind::CompressionKind_LZ4: + case CompressionKind_LZ4: return true; #endif default: diff --git a/velox/common/compression/Compression.h b/velox/common/compression/Compression.h index 925b5e27891b..b0bb7bb81aec 100644 --- a/velox/common/compression/Compression.h +++ b/velox/common/compression/Compression.h @@ -72,8 +72,8 @@ class StreamingCompressor { }; /// Compress some input. - /// If CompressResult.outputTooSmall is true on return, then a larger output - /// buffer should be supplied. + /// If CompressResult.outputTooSmall is true on return, compress() should be + /// called again with a larger output buffer. virtual Expected compress( const uint8_t* input, uint64_t inputLength, @@ -82,15 +82,15 @@ class StreamingCompressor { /// Flush part of the compressed output. /// If FlushResult.outputTooSmall is true on return, flush() should be called - /// again with a larger buffer. + /// again with a larger output buffer. virtual Expected flush( uint8_t* output, uint64_t outputLength) = 0; /// End compressing, doing whatever is necessary to end the stream. /// If EndResult.outputTooSmall is true on return, end() should be called - /// again with a larger buffer. Otherwise, the StreamingCompressor should not - /// be used anymore. end() will flush the compressed output. + /// again with a larger output buffer. Otherwise, the StreamingCompressor + /// should not be used anymore. end() will flush the compressed output. virtual Expected end(uint8_t* output, uint64_t outputLength) = 0; }; @@ -105,8 +105,8 @@ class StreamingDecompressor { }; /// Decompress some input. - /// If outputTooSmall is true on return, a larger output buffer needs - /// to be supplied. + /// If DecompressResult.outputTooSmall is true on return, decompress() should + /// be called again with a larger output buffer. virtual Expected decompress( const uint8_t* input, uint64_t inputLength, From 09600c77c62494dc9623d3b22b82077c0ba8c9b5 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Thu, 27 Feb 2025 13:41:14 +0000 Subject: [PATCH 3/4] address comments & refine test --- velox/common/compression/Compression.cpp | 30 +++------ velox/common/compression/Compression.h | 41 +++++++----- velox/common/compression/Lz4Compression.cpp | 16 ++++- velox/common/compression/Lz4Compression.h | 10 ++- .../compression/tests/CompressionTest.cpp | 63 ++++++++++++------- 5 files changed, 95 insertions(+), 65 deletions(-) diff --git a/velox/common/compression/Compression.cpp b/velox/common/compression/Compression.cpp index 16151a4e17c6..08f4df055eae 100644 --- a/velox/common/compression/Compression.cpp +++ b/velox/common/compression/Compression.cpp @@ -112,17 +112,6 @@ bool Codec::supportsGetUncompressedLength(CompressionKind kind) { return false; } -bool Codec::supportsStreamingCompression(CompressionKind kind) { - switch (kind) { -#ifdef VELOX_ENABLE_COMPRESSION_LZ4 - case CompressionKind_LZ4: - return true; -#endif - default: - return false; - } -} - bool Codec::supportsCompressFixedLength(CompressionKind kind) { // TODO: Return true if it's supported by compression kind. return false; @@ -133,9 +122,6 @@ Expected> Codec::create( const CodecOptions& codecOptions) { if (!isAvailable(kind)) { auto name = compressionKindToString(kind); - VELOX_RETURN_UNEXPECTED_IF( - folly::StringPiece({name}).startsWith("unknown"), - Status::Invalid("Unrecognized codec: ", name)); return folly::makeUnexpected(Status::Invalid( "Support for codec '{}' is either not built or not implemented.", name)); @@ -158,9 +144,10 @@ Expected> Codec::create( codec = makeLz4HadoopCodec(); break; } + } else { + // By default, create LZ4 Frame codec. + codec = makeLz4FrameCodec(compressionLevel); } - // By default, create LZ4 Frame codec. - codec = makeLz4FrameCodec(compressionLevel); break; #endif default: @@ -185,8 +172,6 @@ Expected> Codec::create( bool Codec::isAvailable(CompressionKind kind) { switch (kind) { - case CompressionKind_NONE: - return true; #ifdef VELOX_ENABLE_COMPRESSION_LZ4 case CompressionKind_LZ4: return true; @@ -196,10 +181,11 @@ bool Codec::isAvailable(CompressionKind kind) { } } -std::optional Codec::getUncompressedLength( +Expected Codec::getUncompressedLength( const uint8_t* input, uint64_t inputLength) const { - return std::nullopt; + return folly::makeUnexpected(Status::Invalid( + "getUncompressedLength is unsupported with {} format.", name())); } Expected Codec::compressFixedLength( @@ -211,6 +197,10 @@ Expected Codec::compressFixedLength( Status::Invalid("'{}' doesn't support fixed-length compression", name())); } +bool Codec::supportsStreamingCompression() const { + return false; +} + Expected> Codec::makeStreamingCompressor() { return folly::makeUnexpected(Status::Invalid( diff --git a/velox/common/compression/Compression.h b/velox/common/compression/Compression.h index b0bb7bb81aec..c29d2196e0a6 100644 --- a/velox/common/compression/Compression.h +++ b/velox/common/compression/Compression.h @@ -73,7 +73,7 @@ class StreamingCompressor { /// Compress some input. /// If CompressResult.outputTooSmall is true on return, compress() should be - /// called again with a larger output buffer. + /// called again with a larger output buffer, such as doubling its size. virtual Expected compress( const uint8_t* input, uint64_t inputLength, @@ -82,15 +82,16 @@ class StreamingCompressor { /// Flush part of the compressed output. /// If FlushResult.outputTooSmall is true on return, flush() should be called - /// again with a larger output buffer. + /// again with a larger output buffer, such as doubling its size. virtual Expected flush( uint8_t* output, uint64_t outputLength) = 0; - /// End compressing, doing whatever is necessary to end the stream. + /// End compressing, doing whatever is necessary to end the stream, and + /// flushing the compressed output. /// If EndResult.outputTooSmall is true on return, end() should be called - /// again with a larger output buffer. Otherwise, the StreamingCompressor - /// should not be used anymore. end() will flush the compressed output. + /// again with a larger output buffer, such as doubling its size. + /// Otherwise, the StreamingCompressor should not be used anymore. virtual Expected end(uint8_t* output, uint64_t outputLength) = 0; }; @@ -106,7 +107,7 @@ class StreamingDecompressor { /// Decompress some input. /// If DecompressResult.outputTooSmall is true on return, decompress() should - /// be called again with a larger output buffer. + /// be called again with a larger output buffer, such as doubling its size. virtual Expected decompress( const uint8_t* input, uint64_t inputLength, @@ -154,9 +155,6 @@ class Codec { /// compressed length. static bool supportsCompressFixedLength(CompressionKind kind); - // Return true if indicated kind supports creating streaming de/compressor. - static bool supportsStreamingCompression(CompressionKind kind); - /// Return the smallest supported compression level. /// If the codec doesn't support compression level, /// `kUseDefaultCompressionLevel` will be returned. @@ -214,19 +212,28 @@ class Codec { // Maximum compressed length of given input length. virtual uint64_t maxCompressedLength(uint64_t inputLength) = 0; - /// Retrieves the actual uncompressed length of data using the specified - /// compression library. - /// Note: This functionality is not universally supported by all compression - /// libraries. If not supported, `std::nullopt` will be returned. - virtual std::optional getUncompressedLength( + /// Retrieves the uncompressed length of the given compressed data using the + /// specified compression library. + /// If the input data is corrupted, returns `Unexpected` with + /// `Status::IOError`. Not all compression libraries support this + /// functionality. Use supportsGetUncompressedLength() to check before + /// calling. If unsupported, returns `Unexpected` with `Status::Invalid`. + virtual Expected getUncompressedLength( const uint8_t* input, uint64_t inputLength) const; - // Create a streaming compressor instance. + // Return true if indicated kind supports creating streaming de/compressor. + virtual bool supportsStreamingCompression() const; + + /// Create a streaming compressor instance. + /// Use supportsStreamingCompression() to check before calling. + /// If unsupported, returns `Unexpected` with `Status::Invalid`. virtual Expected> makeStreamingCompressor(); - // Create a streaming compressor instance. + /// Create a streaming decompressor instance. + /// Use supportsStreamingCompression() to check before calling. + /// If unsupported, returns `Unexpected` with `Status::Invalid`. virtual Expected> makeStreamingDecompressor(); @@ -237,7 +244,7 @@ class Codec { virtual int32_t compressionLevel() const; // The name of this Codec's compression type. - std::string name() const; + virtual std::string name() const; private: // Initializes the codec's resources. diff --git a/velox/common/compression/Lz4Compression.cpp b/velox/common/compression/Lz4Compression.cpp index 463e2963b634..8a991521114d 100644 --- a/velox/common/compression/Lz4Compression.cpp +++ b/velox/common/compression/Lz4Compression.cpp @@ -230,7 +230,7 @@ Status LZ4Compressor::compressBegin( return Status::OK(); } -Status common::LZ4Decompressor::init() { +Status LZ4Decompressor::init() { finished_ = false; auto ret = LZ4F_createDecompressionContext(&ctx_, LZ4F_VERSION); VELOX_RETURN_IF(LZ4F_isError(ret), lz4Error("LZ4 init failed: ", ret)); @@ -304,7 +304,7 @@ int32_t Lz4CodecBase::compressionLevel() const { } CompressionKind Lz4CodecBase::compressionKind() const { - return CompressionKind::CompressionKind_LZ4; + return CompressionKind_LZ4; } Lz4FrameCodec::Lz4FrameCodec(int32_t compressionLevel) @@ -363,6 +363,10 @@ Expected Lz4FrameCodec::decompress( }); } +bool Lz4FrameCodec::supportsStreamingCompression() const { + return true; +} + Expected> Lz4FrameCodec::makeStreamingCompressor() { auto ptr = std::make_shared(compressionLevel_); @@ -430,6 +434,10 @@ Expected Lz4RawCodec::decompress( return static_cast(decompressedSize); } +std::string Lz4RawCodec::name() const { + return "lz4_raw"; +} + Lz4HadoopCodec::Lz4HadoopCodec() : Lz4RawCodec(kLz4DefaultCompressionLevel) {} uint64_t Lz4HadoopCodec::maxCompressedLength(uint64_t inputLength) { @@ -491,6 +499,10 @@ int32_t Lz4HadoopCodec::defaultCompressionLevel() const { return kUseDefaultCompressionLevel; } +std::string Lz4HadoopCodec::name() const { + return "lz4_hadoop"; +} + Expected Lz4HadoopCodec::decompressInternal( const uint8_t* input, uint64_t inputLength, diff --git a/velox/common/compression/Lz4Compression.h b/velox/common/compression/Lz4Compression.h index 0d0f238d979d..1b4edc015ec0 100644 --- a/velox/common/compression/Lz4Compression.h +++ b/velox/common/compression/Lz4Compression.h @@ -29,11 +29,11 @@ struct Lz4CodecOptions : CodecOptions { enum Type { kLz4Frame, kLz4Raw, kLz4Hadoop }; Lz4CodecOptions( - Lz4CodecOptions::Type type, + Type type, int32_t compressionLevel = kUseDefaultCompressionLevel) : CodecOptions(compressionLevel), type(type) {} - Lz4CodecOptions::Type type; + Type type; }; class Lz4CodecBase : public Codec { @@ -72,6 +72,8 @@ class Lz4FrameCodec : public Lz4CodecBase { uint8_t* output, uint64_t outputLength) override; + bool supportsStreamingCompression() const override; + Expected> makeStreamingCompressor() override; @@ -99,6 +101,8 @@ class Lz4RawCodec : public Lz4CodecBase { uint64_t inputLength, uint8_t* output, uint64_t outputLength) override; + + std::string name() const override; }; /// The Hadoop Lz4Codec source code can be found here: @@ -127,6 +131,8 @@ class Lz4HadoopCodec : public Lz4RawCodec, public HadoopCompressionFormat { int32_t defaultCompressionLevel() const override; + std::string name() const override; + private: Expected decompressInternal( const uint8_t* input, diff --git a/velox/common/compression/tests/CompressionTest.cpp b/velox/common/compression/tests/CompressionTest.cpp index 7be796a62225..3a4fe3891031 100644 --- a/velox/common/compression/tests/CompressionTest.cpp +++ b/velox/common/compression/tests/CompressionTest.cpp @@ -57,9 +57,10 @@ std::vector generateLz4TestParams() { Lz4CodecOptions::kLz4Frame, Lz4CodecOptions::kLz4Hadoop}) { params.emplace_back( - CompressionKind::CompressionKind_LZ4, - std::make_shared(type)); + CompressionKind_LZ4, std::make_shared(type)); } + // Add default CodecOptions. + params.emplace_back(CompressionKind_LZ4); return params; } @@ -386,14 +387,8 @@ class CodecTest : public ::testing::TestWithParam { TEST_P(CodecTest, specifyCompressionLevel) { std::vector data = makeRandomData(2000); const auto kind = getCompressionKind(); - if (!Codec::isAvailable(kind)) { - // Support for this codec hasn't been built. - VELOX_ASSERT_THROW( - Codec::create(kind, kUseDefaultCompressionLevel), - "Support for codec '" + compressionKindToString(kind) + - "' is either not built or not implemented."); - return; - } + ASSERT_TRUE(Codec::isAvailable(kind)); + auto codecDefault = Codec::create(kind).thenOrThrow(folly::identity, throwsNotOk); checkCodecRoundtrip(codecDefault, data); @@ -421,14 +416,22 @@ TEST_P(CodecTest, getUncompressedLength) { compressed.resize(compressedLength); if (Codec::supportsGetUncompressedLength(getCompressionKind())) { - ASSERT_EQ( - codec->getUncompressedLength(compressed.data(), compressedLength), - inputLength); + auto uncompressedLength = + codec->getUncompressedLength(compressed.data(), compressedLength) + .thenOrThrow(folly::identity, throwsNotOk); + ASSERT_EQ(uncompressedLength, inputLength); } else { - ASSERT_EQ( - codec->getUncompressedLength(compressed.data(), compressedLength), - std::nullopt); + VELOX_ASSERT_ERROR_STATUS( + codec->getUncompressedLength(compressed.data(), compressedLength) + .error(), + StatusCode::kInvalid, + fmt::format( + "getUncompressedLength is unsupported with {} format.", + codec->name())); } + + // TODO: For codecs that support getUncompressedLength(), verify the error + // message for corrupted data. } TEST_P(CodecTest, codecRoundtrip) { @@ -440,47 +443,47 @@ TEST_P(CodecTest, codecRoundtrip) { } TEST_P(CodecTest, streamingCompressor) { - if (!Codec::supportsStreamingCompression(getCompressionKind())) { + const auto codec = makeCodec(); + if (!codec->supportsStreamingCompression()) { return; } for (auto dataSize : {0, 10, 10000, 100000}) { - auto codec = makeCodec(); checkStreamingCompressor(codec.get(), makeRandomData(dataSize)); checkStreamingCompressor(codec.get(), makeCompressibleData(dataSize)); } } TEST_P(CodecTest, streamingDecompressor) { - if (!Codec::supportsStreamingCompression(getCompressionKind())) { + const auto codec = makeCodec(); + if (!codec->supportsStreamingCompression()) { return; } for (auto dataSize : {0, 10, 10000, 100000}) { - auto codec = makeCodec(); checkStreamingDecompressor(codec.get(), makeRandomData(dataSize)); checkStreamingDecompressor(codec.get(), makeCompressibleData(dataSize)); } } TEST_P(CodecTest, streamingRoundtrip) { - if (!Codec::supportsStreamingCompression(getCompressionKind())) { + const auto codec = makeCodec(); + if (!codec->supportsStreamingCompression()) { return; } for (auto dataSize : {0, 10, 10000, 100000}) { - auto codec = makeCodec(); checkStreamingRoundtrip(codec.get(), makeRandomData(dataSize)); checkStreamingRoundtrip(codec.get(), makeCompressibleData(dataSize)); } } TEST_P(CodecTest, streamingDecompressorReuse) { - if (!Codec::supportsStreamingCompression(getCompressionKind())) { + const auto codec = makeCodec(); + if (!codec->supportsStreamingCompression()) { return; } - auto codec = makeCodec(); const auto& decompressor = makeStreamingDecompressor(codec.get()); checkStreamingRoundtrip( makeStreamingCompressor(codec.get()), decompressor, makeRandomData(100)); @@ -512,4 +515,16 @@ TEST(CodecLZ4HadoopTest, compatibility) { checkCodecRoundtrip(c1, c2, makeRandomData(dataSize)); } } + +TEST(CodecTestInvalid, invalidKind) { + CompressionKind kind = CompressionKind_NONE; + ASSERT_FALSE(Codec::isAvailable(kind)); + + VELOX_ASSERT_ERROR_STATUS( + Codec::create(kind, kUseDefaultCompressionLevel).error(), + StatusCode::kInvalid, + fmt::format( + "Support for codec '{}' is either not built or not implemented.", + compressionKindToString(kind))); +} } // namespace facebook::velox::common From 767b97dfd88a9dea591e3357015142837bd7c1a3 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Tue, 4 Mar 2025 22:03:17 +0000 Subject: [PATCH 4/4] address comments --- velox/common/compression/Compression.cpp | 12 +- velox/common/compression/Compression.h | 174 ++++++++++-------- velox/common/compression/Lz4Compression.cpp | 137 ++++++++++++-- velox/common/compression/Lz4Compression.h | 119 +----------- .../compression/tests/CompressionTest.cpp | 12 +- velox/dwio/dwrf/reader/ReaderBase.h | 2 +- 6 files changed, 239 insertions(+), 217 deletions(-) diff --git a/velox/common/compression/Compression.cpp b/velox/common/compression/Compression.cpp index 08f4df055eae..8473b4fe119a 100644 --- a/velox/common/compression/Compression.cpp +++ b/velox/common/compression/Compression.cpp @@ -131,9 +131,9 @@ Expected> Codec::create( std::unique_ptr codec; switch (kind) { #ifdef VELOX_ENABLE_COMPRESSION_LZ4 - case CompressionKind_LZ4: + case CompressionKind_LZ4: { if (auto options = dynamic_cast(&codecOptions)) { - switch (options->type) { + switch (options->lz4Type) { case Lz4CodecOptions::kLz4Frame: codec = makeLz4FrameCodec(compressionLevel); break; @@ -148,7 +148,7 @@ Expected> Codec::create( // By default, create LZ4 Frame codec. codec = makeLz4FrameCodec(compressionLevel); } - break; + } break; #endif default: break; @@ -214,10 +214,6 @@ Codec::makeStreamingDecompressor() { } int32_t Codec::compressionLevel() const { - return kUseDefaultCompressionLevel; -} - -std::string Codec::name() const { - return compressionKindToString(compressionKind()); + return kDefaultCompressionLevel; } } // namespace facebook::velox::common diff --git a/velox/common/compression/Compression.h b/velox/common/compression/Compression.h index c29d2196e0a6..efbd35ea05d6 100644 --- a/velox/common/compression/Compression.h +++ b/velox/common/compression/Compression.h @@ -46,90 +46,30 @@ std::string compressionKindToString(CompressionKind kind); CompressionKind stringToCompressionKind(const std::string& kind); -constexpr uint64_t DEFAULT_COMPRESSION_BLOCK_SIZE = 256 * 1024; +static constexpr uint64_t kDefaultCompressionBlockSize = 256 * 1024; -static constexpr int32_t kUseDefaultCompressionLevel = +static constexpr int32_t kDefaultCompressionLevel = std::numeric_limits::min(); -class StreamingCompressor { - public: - virtual ~StreamingCompressor() = default; - - struct CompressResult { - uint64_t bytesRead; - uint64_t bytesWritten; - bool outputTooSmall; - }; - - struct FlushResult { - uint64_t bytesWritten; - bool outputTooSmall; - }; - - struct EndResult { - uint64_t bytesWritten; - bool outputTooSmall; - }; - - /// Compress some input. - /// If CompressResult.outputTooSmall is true on return, compress() should be - /// called again with a larger output buffer, such as doubling its size. - virtual Expected compress( - const uint8_t* input, - uint64_t inputLength, - uint8_t* output, - uint64_t outputLength) = 0; - - /// Flush part of the compressed output. - /// If FlushResult.outputTooSmall is true on return, flush() should be called - /// again with a larger output buffer, such as doubling its size. - virtual Expected flush( - uint8_t* output, - uint64_t outputLength) = 0; - - /// End compressing, doing whatever is necessary to end the stream, and - /// flushing the compressed output. - /// If EndResult.outputTooSmall is true on return, end() should be called - /// again with a larger output buffer, such as doubling its size. - /// Otherwise, the StreamingCompressor should not be used anymore. - virtual Expected end(uint8_t* output, uint64_t outputLength) = 0; -}; - -class StreamingDecompressor { - public: - virtual ~StreamingDecompressor() = default; - - struct DecompressResult { - uint64_t bytesRead; - uint64_t bytesWritten; - bool outputTooSmall; - }; - - /// Decompress some input. - /// If DecompressResult.outputTooSmall is true on return, decompress() should - /// be called again with a larger output buffer, such as doubling its size. - virtual Expected decompress( - const uint8_t* input, - uint64_t inputLength, - uint8_t* output, - uint64_t outputLength) = 0; - - // Return whether the compressed stream is finished. - virtual bool isFinished() = 0; - - // Reinitialize decompressor, making it ready for a new compressed stream. - virtual Status reset() = 0; -}; +class StreamingCompressor; +class StreamingDecompressor; struct CodecOptions { int32_t compressionLevel; - CodecOptions(int32_t compressionLevel = kUseDefaultCompressionLevel) + CodecOptions(int32_t compressionLevel = kDefaultCompressionLevel) : compressionLevel(compressionLevel) {} virtual ~CodecOptions() = default; }; +/// Codec interface for compression and decompression. +/// The Codec class provides a common interface for various compression +/// algorithms to support one-shot compression and decompression. +/// +/// For codecs that support streaming compression and decompression, the +/// makeStreamingCompressor() and makeStreamingDecompressor() functions can be +/// used to create streaming compressor and decompressor instances. class Codec { public: virtual ~Codec() = default; @@ -158,12 +98,12 @@ class Codec { /// Return the smallest supported compression level. /// If the codec doesn't support compression level, /// `kUseDefaultCompressionLevel` will be returned. - virtual int32_t minimumCompressionLevel() const = 0; + virtual int32_t minCompressionLevel() const = 0; /// Return the largest supported compression level. /// If the codec doesn't support compression level, /// `kUseDefaultCompressionLevel` will be returned. - virtual int32_t maximumCompressionLevel() const = 0; + virtual int32_t maxCompressionLevel() const = 0; /// Return the default compression level. /// If the codec doesn't support compression level, @@ -244,12 +184,96 @@ class Codec { virtual int32_t compressionLevel() const; // The name of this Codec's compression type. - virtual std::string name() const; + virtual std::string_view name() const = 0; private: // Initializes the codec's resources. virtual Status init(); }; + +/// Base class for streaming compressors. Unlike one-shot compression, streaming +/// compression can compress data with arbitrary length and write the compressed +/// data through multiple calls to compress(). +/// The caller is responsible for providing a sufficiently large output buffer +/// for compress(), flush(), and finalize(), and checking the `outputTooSmall` +/// from the returned result. If `outputTooSmall` is true, the caller should +/// provide a larger output buffer and call the corresponding function again. +class StreamingCompressor { + public: + virtual ~StreamingCompressor() = default; + + struct CompressResult { + uint64_t bytesRead; + uint64_t bytesWritten; + bool outputTooSmall; + }; + + struct FlushResult { + uint64_t bytesWritten; + bool outputTooSmall; + }; + + struct EndResult : FlushResult {}; + + /// Compress some input. + /// If CompressResult.outputTooSmall is true on return, compress() should be + /// called again with a larger output buffer, such as doubling its size. + virtual Expected compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) = 0; + + /// Flush part of the compressed output. + /// If FlushResult.outputTooSmall is true on return, flush() should be called + /// again with a larger output buffer, such as doubling its size. + virtual Expected flush( + uint8_t* output, + uint64_t outputLength) = 0; + + /// End compressing, doing whatever is necessary to end the stream, and + /// flushing the compressed output. + /// If EndResult.outputTooSmall is true on return, end() should be called + /// again with a larger output buffer, such as doubling its size. + /// Otherwise, the StreamingCompressor should not be used anymore. + virtual Expected finalize( + uint8_t* output, + uint64_t outputLength) = 0; +}; + +/// Base class for streaming decompressors. Streaming decompression can process +/// data with arbitrary length and write the decompressed data through multiple +/// calls to decompress(). +/// The caller is responsible for providing a sufficiently large output buffer +/// for decompress(), and checking the `outputTooSmall` in the returned result. +/// If `outputTooSmall` is true, the caller should provide a larger output +/// buffer and call decompress() again. +class StreamingDecompressor { + public: + virtual ~StreamingDecompressor() = default; + + struct DecompressResult { + uint64_t bytesRead; + uint64_t bytesWritten; + bool outputTooSmall; + }; + + /// Decompress some input. + /// If DecompressResult.outputTooSmall is true on return, decompress() should + /// be called again with a larger output buffer, such as doubling its size. + virtual Expected decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) = 0; + + // Return whether the compressed stream is finished. + virtual bool isFinished() = 0; + + // Reinitialize decompressor, making it ready for a new compressed stream. + virtual Status reset() = 0; +}; + } // namespace facebook::velox::common template <> diff --git a/velox/common/compression/Lz4Compression.cpp b/velox/common/compression/Lz4Compression.cpp index 8a991521114d..aa925d05978d 100644 --- a/velox/common/compression/Lz4Compression.cpp +++ b/velox/common/compression/Lz4Compression.cpp @@ -27,9 +27,7 @@ constexpr int32_t kLz4MinCompressionLevel = 1; constexpr int32_t kLegacyLz4MaxCompressionLevel = 12; #endif -static inline Status lz4Error( - const char* prefixMessage, - LZ4F_errorCode_t errorCode) { +Status lz4Error(const char* prefixMessage, LZ4F_errorCode_t errorCode) { return Status::IOError(prefixMessage, LZ4F_getErrorName(errorCode)); } @@ -46,6 +44,115 @@ LZ4F_preferences_t defaultPreferences(int32_t compressionLevel) { } } // namespace +class Lz4CodecBase : public Codec { + public: + explicit Lz4CodecBase(int32_t compressionLevel); + + int32_t minCompressionLevel() const override; + + int32_t maxCompressionLevel() const override; + + int32_t defaultCompressionLevel() const override; + + int32_t compressionLevel() const override; + + CompressionKind compressionKind() const override; + + protected: + const int32_t compressionLevel_; +}; + +class Lz4FrameCodec : public Lz4CodecBase { + public: + explicit Lz4FrameCodec(int32_t compressionLevel); + + uint64_t maxCompressedLength(uint64_t inputLength) override; + + Expected compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + Expected decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + bool supportsStreamingCompression() const override; + + Expected> makeStreamingCompressor() + override; + + Expected> makeStreamingDecompressor() + override; + + std::string_view name() const override { + return "lz4"; + } + + protected: + const LZ4F_preferences_t prefs_; +}; + +class Lz4RawCodec : public Lz4CodecBase { + public: + explicit Lz4RawCodec(int32_t compressionLevel); + + uint64_t maxCompressedLength(uint64_t inputLength) override; + + Expected compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + Expected decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + std::string_view name() const override; +}; + +/// The Hadoop Lz4Codec source code can be found here: +/// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc +class Lz4HadoopCodec : public Lz4RawCodec, public HadoopCompressionFormat { + public: + Lz4HadoopCodec(); + + uint64_t maxCompressedLength(uint64_t inputLength) override; + + Expected compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + Expected decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + int32_t minCompressionLevel() const override; + + int32_t maxCompressionLevel() const override; + + int32_t defaultCompressionLevel() const override; + + std::string_view name() const override; + + private: + Expected decompressInternal( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; +}; + class LZ4Compressor : public StreamingCompressor { public: explicit LZ4Compressor(int32_t compressionLevel); @@ -62,7 +169,7 @@ class LZ4Compressor : public StreamingCompressor { Expected flush(uint8_t* output, uint64_t outputLength) override; - Expected end(uint8_t* output, uint64_t outputLength) override; + Expected finalize(uint8_t* output, uint64_t outputLength) override; protected: Status @@ -184,7 +291,7 @@ Expected LZ4Compressor::flush( return FlushResult{bytesWritten, false}; } -Expected LZ4Compressor::end( +Expected LZ4Compressor::finalize( uint8_t* output, uint64_t outputLength) { auto outputSize = static_cast(outputLength); @@ -279,15 +386,15 @@ bool LZ4Decompressor::isFinished() { Lz4CodecBase::Lz4CodecBase(int32_t compressionLevel) : compressionLevel_( - compressionLevel == kUseDefaultCompressionLevel + compressionLevel == kDefaultCompressionLevel ? kLz4DefaultCompressionLevel : compressionLevel) {} -int32_t Lz4CodecBase::minimumCompressionLevel() const { +int32_t Lz4CodecBase::minCompressionLevel() const { return kLz4MinCompressionLevel; } -int32_t Lz4CodecBase::maximumCompressionLevel() const { +int32_t Lz4CodecBase::maxCompressionLevel() const { #if (defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER < 10800) return kLegacyLz4MaxCompressionLevel; #else @@ -434,7 +541,7 @@ Expected Lz4RawCodec::decompress( return static_cast(decompressedSize); } -std::string Lz4RawCodec::name() const { +std::string_view Lz4RawCodec::name() const { return "lz4_raw"; } @@ -487,19 +594,19 @@ Expected Lz4HadoopCodec::decompress( return Lz4RawCodec::decompress(input, inputLength, output, outputLength); } -int32_t Lz4HadoopCodec::minimumCompressionLevel() const { - return kUseDefaultCompressionLevel; +int32_t Lz4HadoopCodec::minCompressionLevel() const { + return kDefaultCompressionLevel; } -int32_t Lz4HadoopCodec::maximumCompressionLevel() const { - return kUseDefaultCompressionLevel; +int32_t Lz4HadoopCodec::maxCompressionLevel() const { + return kDefaultCompressionLevel; } int32_t Lz4HadoopCodec::defaultCompressionLevel() const { - return kUseDefaultCompressionLevel; + return kDefaultCompressionLevel; } -std::string Lz4HadoopCodec::name() const { +std::string_view Lz4HadoopCodec::name() const { return "lz4_hadoop"; } diff --git a/velox/common/compression/Lz4Compression.h b/velox/common/compression/Lz4Compression.h index 1b4edc015ec0..7043f1a82235 100644 --- a/velox/common/compression/Lz4Compression.h +++ b/velox/common/compression/Lz4Compression.h @@ -26,128 +26,23 @@ namespace facebook::velox::common { struct Lz4CodecOptions : CodecOptions { - enum Type { kLz4Frame, kLz4Raw, kLz4Hadoop }; + enum Lz4Type { kLz4Frame, kLz4Raw, kLz4Hadoop }; Lz4CodecOptions( - Type type, - int32_t compressionLevel = kUseDefaultCompressionLevel) - : CodecOptions(compressionLevel), type(type) {} + Lz4Type lz4Type, + int32_t compressionLevel = kDefaultCompressionLevel) + : CodecOptions(compressionLevel), lz4Type(lz4Type) {} - Type type; -}; - -class Lz4CodecBase : public Codec { - public: - explicit Lz4CodecBase(int32_t compressionLevel); - - int32_t minimumCompressionLevel() const override; - - int32_t maximumCompressionLevel() const override; - - int32_t defaultCompressionLevel() const override; - - int32_t compressionLevel() const override; - - CompressionKind compressionKind() const override; - - protected: - const int32_t compressionLevel_; -}; - -class Lz4FrameCodec : public Lz4CodecBase { - public: - explicit Lz4FrameCodec(int32_t compressionLevel); - - uint64_t maxCompressedLength(uint64_t inputLength) override; - - Expected compress( - const uint8_t* input, - uint64_t inputLength, - uint8_t* output, - uint64_t outputLength) override; - - Expected decompress( - const uint8_t* input, - uint64_t inputLength, - uint8_t* output, - uint64_t outputLength) override; - - bool supportsStreamingCompression() const override; - - Expected> makeStreamingCompressor() - override; - - Expected> makeStreamingDecompressor() - override; - - protected: - const LZ4F_preferences_t prefs_; -}; - -class Lz4RawCodec : public Lz4CodecBase { - public: - explicit Lz4RawCodec(int32_t compressionLevel); - - uint64_t maxCompressedLength(uint64_t inputLength) override; - - Expected compress( - const uint8_t* input, - uint64_t inputLength, - uint8_t* output, - uint64_t outputLength) override; - - Expected decompress( - const uint8_t* input, - uint64_t inputLength, - uint8_t* output, - uint64_t outputLength) override; - - std::string name() const override; -}; - -/// The Hadoop Lz4Codec source code can be found here: -/// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc -class Lz4HadoopCodec : public Lz4RawCodec, public HadoopCompressionFormat { - public: - Lz4HadoopCodec(); - - uint64_t maxCompressedLength(uint64_t inputLength) override; - - Expected compress( - const uint8_t* input, - uint64_t inputLength, - uint8_t* output, - uint64_t outputLength) override; - - Expected decompress( - const uint8_t* input, - uint64_t inputLength, - uint8_t* output, - uint64_t outputLength) override; - - int32_t minimumCompressionLevel() const override; - - int32_t maximumCompressionLevel() const override; - - int32_t defaultCompressionLevel() const override; - - std::string name() const override; - - private: - Expected decompressInternal( - const uint8_t* input, - uint64_t inputLength, - uint8_t* output, - uint64_t outputLength) override; + Lz4Type lz4Type; }; // Lz4 frame format codec. std::unique_ptr makeLz4FrameCodec( - int32_t compressionLevel = kUseDefaultCompressionLevel); + int32_t compressionLevel = kDefaultCompressionLevel); // Lz4 "raw" format codec. std::unique_ptr makeLz4RawCodec( - int32_t compressionLevel = kUseDefaultCompressionLevel); + int32_t compressionLevel = kDefaultCompressionLevel); // Lz4 "Hadoop" format codec (Lz4 raw codec prefixed with lengths header). std::unique_ptr makeLz4HadoopCodec(); diff --git a/velox/common/compression/tests/CompressionTest.cpp b/velox/common/compression/tests/CompressionTest.cpp index 3a4fe3891031..e8efeace6bf9 100644 --- a/velox/common/compression/tests/CompressionTest.cpp +++ b/velox/common/compression/tests/CompressionTest.cpp @@ -52,12 +52,12 @@ struct TestParams { std::vector generateLz4TestParams() { std::vector params; - for (auto type : + for (auto lz4Type : {Lz4CodecOptions::kLz4Raw, Lz4CodecOptions::kLz4Frame, Lz4CodecOptions::kLz4Hadoop}) { params.emplace_back( - CompressionKind_LZ4, std::make_shared(type)); + CompressionKind_LZ4, std::make_shared(lz4Type)); } // Add default CodecOptions. params.emplace_back(CompressionKind_LZ4); @@ -197,7 +197,7 @@ void streamingCompress( do { int64_t outputLength = compressed.size() - compressedSize; uint8_t* output = compressed.data() + compressedSize; - auto endResult = compressor->end(output, outputLength) + auto endResult = compressor->finalize(output, outputLength) .thenOrThrow(folly::identity, throwsNotOk); ASSERT_LE(endResult.bytesWritten, outputLength); compressedSize += endResult.bytesWritten; @@ -395,8 +395,8 @@ TEST_P(CodecTest, specifyCompressionLevel) { for (const auto& compressionLevel : {codecDefault->defaultCompressionLevel(), - codecDefault->minimumCompressionLevel(), - codecDefault->maximumCompressionLevel()}) { + codecDefault->minCompressionLevel(), + codecDefault->maxCompressionLevel()}) { auto codec = Codec::create(kind, compressionLevel) .thenOrThrow(folly::identity, throwsNotOk); checkCodecRoundtrip(codec, data); @@ -521,7 +521,7 @@ TEST(CodecTestInvalid, invalidKind) { ASSERT_FALSE(Codec::isAvailable(kind)); VELOX_ASSERT_ERROR_STATUS( - Codec::create(kind, kUseDefaultCompressionLevel).error(), + Codec::create(kind, kDefaultCompressionLevel).error(), StatusCode::kInvalid, fmt::format( "Support for codec '{}' is either not built or not implemented.", diff --git a/velox/dwio/dwrf/reader/ReaderBase.h b/velox/dwio/dwrf/reader/ReaderBase.h index d43c660dc1ab..561d88e56841 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.h +++ b/velox/dwio/dwrf/reader/ReaderBase.h @@ -177,7 +177,7 @@ class ReaderBase { uint64_t compressionBlockSize() const { return postScript_->hasCompressionBlockSize() ? postScript_->compressionBlockSize() - : common::DEFAULT_COMPRESSION_BLOCK_SIZE; + : common::kDefaultCompressionBlockSize; } common::CompressionKind compressionKind() const {