From 9711ddfb393b0f4655212cf3cc5cde3ed8661a35 Mon Sep 17 00:00:00 2001 From: Mahadevuni Naveen Kumar Date: Thu, 5 Oct 2023 18:13:08 +0530 Subject: [PATCH] Move LZ4 codec from arrow to velox parquet writer --- .../parquet/writer/arrow/util/CMakeLists.txt | 4 +- .../parquet/writer/arrow/util/Compression.cpp | 10 - .../writer/arrow/util/CompressionLZ4.cpp | 622 ++++++++++++++++++ 3 files changed, 624 insertions(+), 12 deletions(-) create mode 100644 velox/dwio/parquet/writer/arrow/util/CompressionLZ4.cpp diff --git a/velox/dwio/parquet/writer/arrow/util/CMakeLists.txt b/velox/dwio/parquet/writer/arrow/util/CMakeLists.txt index 2b4288d7e58c8..42c746a99fb66 100644 --- a/velox/dwio/parquet/writer/arrow/util/CMakeLists.txt +++ b/velox/dwio/parquet/writer/arrow/util/CMakeLists.txt @@ -15,7 +15,7 @@ add_library( velox_dwio_arrow_parquet_writer_util_lib Compression.cpp CompressionSnappy.cpp Hashing.cpp Crc32.cpp - CompressionZstd.cpp) + CompressionZstd.cpp CompressionLZ4.cpp) target_link_libraries(velox_dwio_arrow_parquet_writer_util_lib parquet arrow - Snappy::snappy zstd::zstd) + Snappy::snappy zstd::zstd lz4::lz4) diff --git a/velox/dwio/parquet/writer/arrow/util/Compression.cpp b/velox/dwio/parquet/writer/arrow/util/Compression.cpp index 4d3533877e9fa..5cc212173986b 100644 --- a/velox/dwio/parquet/writer/arrow/util/Compression.cpp +++ b/velox/dwio/parquet/writer/arrow/util/Compression.cpp @@ -192,19 +192,13 @@ Result> Codec::Create( break; } case Compression::LZ4: -#ifdef ARROW_WITH_LZ4 codec = internal::MakeLz4RawCodec(compression_level); -#endif break; case Compression::LZ4_FRAME: -#ifdef ARROW_WITH_LZ4 codec = internal::MakeLz4FrameCodec(compression_level); -#endif break; case Compression::LZ4_HADOOP: -#ifdef ARROW_WITH_LZ4 codec = internal::MakeLz4HadoopRawCodec(); -#endif break; case Compression::ZSTD: codec = internal::MakeZSTDCodec(compression_level); @@ -256,11 +250,7 @@ bool Codec::IsAvailable(Compression::type codec_type) { case Compression::LZ4: case Compression::LZ4_FRAME: case Compression::LZ4_HADOOP: -#ifdef ARROW_WITH_LZ4 return true; -#else - return false; -#endif case Compression::ZSTD: return true; case Compression::BZ2: diff --git a/velox/dwio/parquet/writer/arrow/util/CompressionLZ4.cpp b/velox/dwio/parquet/writer/arrow/util/CompressionLZ4.cpp new file mode 100644 index 0000000000000..1b433c5797b09 --- /dev/null +++ b/velox/dwio/parquet/writer/arrow/util/CompressionLZ4.cpp @@ -0,0 +1,622 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Adapted from Apache Arrow. + +#include "velox/dwio/parquet/writer/arrow/util/CompressionInternal.h" + +#include + +#include +#include +#include + +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/endian.h" +#include "arrow/util/logging.h" +#include "arrow/util/macros.h" +#include "arrow/util/ubsan.h" + +using std::size_t; + +namespace facebook::velox::parquet::arrow::util::internal { +namespace { + +constexpr int kLz4MinCompressionLevel = 1; + +static Status LZ4Error(LZ4F_errorCode_t ret, const char* prefix_msg) { + return Status::IOError(prefix_msg, LZ4F_getErrorName(ret)); +} + +static LZ4F_preferences_t DefaultPreferences() { + LZ4F_preferences_t prefs; + memset(&prefs, 0, sizeof(prefs)); + return prefs; +} + +static LZ4F_preferences_t PreferencesWithCompressionLevel( + int compression_level) { + LZ4F_preferences_t prefs = DefaultPreferences(); + prefs.compressionLevel = compression_level; + return prefs; +} + +// ---------------------------------------------------------------------- +// Lz4 frame decompressor implementation + +class LZ4Decompressor : public Decompressor { + public: + LZ4Decompressor() {} + + ~LZ4Decompressor() override { + if (ctx_ != nullptr) { + ARROW_UNUSED(LZ4F_freeDecompressionContext(ctx_)); + } + } + + Status Init() { + LZ4F_errorCode_t ret; + finished_ = false; + + ret = LZ4F_createDecompressionContext(&ctx_, LZ4F_VERSION); + if (LZ4F_isError(ret)) { + return LZ4Error(ret, "LZ4 init failed: "); + } else { + return Status::OK(); + } + } + + Status Reset() override { +#if defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER >= 10800 + // LZ4F_resetDecompressionContext appeared in 1.8.0 + DCHECK_NE(ctx_, nullptr); + LZ4F_resetDecompressionContext(ctx_); + finished_ = false; + return Status::OK(); +#else + if (ctx_ != nullptr) { + ARROW_UNUSED(LZ4F_freeDecompressionContext(ctx_)); + } + return Init(); +#endif + } + + Result Decompress( + int64_t input_len, + const uint8_t* input, + int64_t output_len, + uint8_t* output) override { + auto src = input; + auto dst = output; + auto src_size = static_cast(input_len); + auto dst_capacity = static_cast(output_len); + size_t ret; + + ret = LZ4F_decompress( + ctx_, dst, &dst_capacity, src, &src_size, nullptr /* options */); + if (LZ4F_isError(ret)) { + return LZ4Error(ret, "LZ4 decompress failed: "); + } + finished_ = (ret == 0); + return DecompressResult{ + static_cast(src_size), + static_cast(dst_capacity), + (src_size == 0 && dst_capacity == 0)}; + } + + bool IsFinished() override { + return finished_; + } + + protected: + LZ4F_decompressionContext_t ctx_ = nullptr; + bool finished_; +}; + +// ---------------------------------------------------------------------- +// Lz4 frame compressor implementation + +class LZ4Compressor : public Compressor { + public: + explicit LZ4Compressor(int compression_level) + : compression_level_(compression_level) {} + + ~LZ4Compressor() override { + if (ctx_ != nullptr) { + ARROW_UNUSED(LZ4F_freeCompressionContext(ctx_)); + } + } + + Status Init() { + LZ4F_errorCode_t ret; + prefs_ = PreferencesWithCompressionLevel(compression_level_); + first_time_ = true; + + ret = LZ4F_createCompressionContext(&ctx_, LZ4F_VERSION); + if (LZ4F_isError(ret)) { + return LZ4Error(ret, "LZ4 init failed: "); + } else { + return Status::OK(); + } + } + +#define BEGIN_COMPRESS(dst, dst_capacity, output_too_small) \ + if (first_time_) { \ + if (dst_capacity < LZ4F_HEADER_SIZE_MAX) { \ + /* Output too small to write LZ4F header */ \ + return (output_too_small); \ + } \ + ret = LZ4F_compressBegin(ctx_, dst, dst_capacity, &prefs_); \ + if (LZ4F_isError(ret)) { \ + return LZ4Error(ret, "LZ4 compress begin failed: "); \ + } \ + first_time_ = false; \ + dst += ret; \ + dst_capacity -= ret; \ + bytes_written += static_cast(ret); \ + } + + Result Compress( + int64_t input_len, + const uint8_t* input, + int64_t output_len, + uint8_t* output) override { + auto src = input; + auto dst = output; + auto src_size = static_cast(input_len); + auto dst_capacity = static_cast(output_len); + size_t ret; + int64_t bytes_written = 0; + + BEGIN_COMPRESS(dst, dst_capacity, (CompressResult{0, 0})); + + if (dst_capacity < LZ4F_compressBound(src_size, &prefs_)) { + // Output too small to compress into + return CompressResult{0, bytes_written}; + } + ret = LZ4F_compressUpdate( + ctx_, dst, dst_capacity, src, src_size, nullptr /* options */); + if (LZ4F_isError(ret)) { + return LZ4Error(ret, "LZ4 compress update failed: "); + } + bytes_written += static_cast(ret); + DCHECK_LE(bytes_written, output_len); + return CompressResult{input_len, bytes_written}; + } + + Result Flush(int64_t output_len, uint8_t* output) override { + auto dst = output; + auto dst_capacity = static_cast(output_len); + size_t ret; + int64_t bytes_written = 0; + + BEGIN_COMPRESS(dst, dst_capacity, (FlushResult{0, true})); + + if (dst_capacity < LZ4F_compressBound(0, &prefs_)) { + // Output too small to flush into + return FlushResult{bytes_written, true}; + } + + ret = LZ4F_flush(ctx_, dst, dst_capacity, nullptr /* options */); + if (LZ4F_isError(ret)) { + return LZ4Error(ret, "LZ4 flush failed: "); + } + bytes_written += static_cast(ret); + DCHECK_LE(bytes_written, output_len); + return FlushResult{bytes_written, false}; + } + + Result End(int64_t output_len, uint8_t* output) override { + auto dst = output; + auto dst_capacity = static_cast(output_len); + size_t ret; + int64_t bytes_written = 0; + + BEGIN_COMPRESS(dst, dst_capacity, (EndResult{0, true})); + + if (dst_capacity < LZ4F_compressBound(0, &prefs_)) { + // Output too small to end frame into + return EndResult{bytes_written, true}; + } + + ret = LZ4F_compressEnd(ctx_, dst, dst_capacity, nullptr /* options */); + if (LZ4F_isError(ret)) { + return LZ4Error(ret, "LZ4 end failed: "); + } + bytes_written += static_cast(ret); + DCHECK_LE(bytes_written, output_len); + return EndResult{bytes_written, false}; + } + +#undef BEGIN_COMPRESS + + protected: + int compression_level_; + LZ4F_compressionContext_t ctx_ = nullptr; + LZ4F_preferences_t prefs_; + bool first_time_; +}; + +// ---------------------------------------------------------------------- +// Lz4 frame codec implementation + +class Lz4FrameCodec : public Codec { + public: + explicit Lz4FrameCodec(int compression_level) + : compression_level_( + compression_level == kUseDefaultCompressionLevel + ? kLz4DefaultCompressionLevel + : compression_level), + prefs_(PreferencesWithCompressionLevel(compression_level_)) {} + + int64_t MaxCompressedLen( + int64_t input_len, + const uint8_t* ARROW_ARG_UNUSED(input)) override { + return static_cast( + LZ4F_compressFrameBound(static_cast(input_len), &prefs_)); + } + + Result Compress( + int64_t input_len, + const uint8_t* input, + int64_t output_buffer_len, + uint8_t* output_buffer) override { + auto output_len = LZ4F_compressFrame( + output_buffer, + static_cast(output_buffer_len), + input, + static_cast(input_len), + &prefs_); + if (LZ4F_isError(output_len)) { + return LZ4Error(output_len, "Lz4 compression failure: "); + } + return static_cast(output_len); + } + + Result Decompress( + int64_t input_len, + const uint8_t* input, + int64_t output_buffer_len, + uint8_t* output_buffer) override { + ARROW_ASSIGN_OR_RAISE(auto decomp, MakeDecompressor()); + + int64_t total_bytes_written = 0; + while (!decomp->IsFinished() && input_len != 0) { + ARROW_ASSIGN_OR_RAISE( + auto res, + decomp->Decompress( + input_len, input, output_buffer_len, output_buffer)); + input += res.bytes_read; + input_len -= res.bytes_read; + output_buffer += res.bytes_written; + output_buffer_len -= res.bytes_written; + total_bytes_written += res.bytes_written; + if (res.need_more_output) { + return Status::IOError("Lz4 decompression buffer too small"); + } + } + if (!decomp->IsFinished()) { + return Status::IOError( + "Lz4 compressed input contains less than one frame"); + } + if (input_len != 0) { + return Status::IOError( + "Lz4 compressed input contains more than one frame"); + } + return total_bytes_written; + } + + Result> MakeCompressor() override { + auto ptr = std::make_shared(compression_level_); + RETURN_NOT_OK(ptr->Init()); + return ptr; + } + + Result> MakeDecompressor() override { + auto ptr = std::make_shared(); + RETURN_NOT_OK(ptr->Init()); + return ptr; + } + + Compression::type compression_type() const override { + return Compression::LZ4_FRAME; + } + int minimum_compression_level() const override { + return kLz4MinCompressionLevel; + } +#if (defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER < 10800) + int maximum_compression_level() const override { + return 12; + } +#else + int maximum_compression_level() const override { + return LZ4F_compressionLevel_max(); + } +#endif + int default_compression_level() const override { + return kLz4DefaultCompressionLevel; + } + + int compression_level() const override { + return compression_level_; + } + + protected: + const int compression_level_; + const LZ4F_preferences_t prefs_; +}; + +// ---------------------------------------------------------------------- +// Lz4 "raw" codec implementation + +class Lz4Codec : public Codec { + public: + explicit Lz4Codec(int compression_level) + : compression_level_( + compression_level == kUseDefaultCompressionLevel + ? kLz4DefaultCompressionLevel + : compression_level) {} + + Result Decompress( + int64_t input_len, + const uint8_t* input, + int64_t output_buffer_len, + uint8_t* output_buffer) override { + int64_t decompressed_size = LZ4_decompress_safe( + reinterpret_cast(input), + reinterpret_cast(output_buffer), + static_cast(input_len), + static_cast(output_buffer_len)); + if (decompressed_size < 0) { + return Status::IOError("Corrupt Lz4 compressed data."); + } + return decompressed_size; + } + + int64_t MaxCompressedLen( + int64_t input_len, + const uint8_t* ARROW_ARG_UNUSED(input)) override { + return LZ4_compressBound(static_cast(input_len)); + } + + Result Compress( + int64_t input_len, + const uint8_t* input, + int64_t output_buffer_len, + uint8_t* output_buffer) override { + int64_t output_len; +#ifdef LZ4HC_CLEVEL_MIN + constexpr int min_hc_clevel = LZ4HC_CLEVEL_MIN; +#else // For older versions of the lz4 library + constexpr int min_hc_clevel = 3; +#endif + if (compression_level_ < min_hc_clevel) { + output_len = LZ4_compress_default( + reinterpret_cast(input), + reinterpret_cast(output_buffer), + static_cast(input_len), + static_cast(output_buffer_len)); + } else { + output_len = LZ4_compress_HC( + reinterpret_cast(input), + reinterpret_cast(output_buffer), + static_cast(input_len), + static_cast(output_buffer_len), + compression_level_); + } + if (output_len == 0) { + return Status::IOError("Lz4 compression failure."); + } + return output_len; + } + + Result> MakeCompressor() override { + return Status::NotImplemented( + "Streaming compression unsupported with LZ4 raw format. " + "Try using LZ4 frame format instead."); + } + + Result> MakeDecompressor() override { + return Status::NotImplemented( + "Streaming decompression unsupported with LZ4 raw format. " + "Try using LZ4 frame format instead."); + } + + Compression::type compression_type() const override { + return Compression::LZ4; + } + int minimum_compression_level() const override { + return kLz4MinCompressionLevel; + } +#if (defined(LZ4_VERSION_NUMBER) && LZ4_VERSION_NUMBER < 10800) + int maximum_compression_level() const override { + return 12; + } +#else + int maximum_compression_level() const override { + return LZ4F_compressionLevel_max(); + } +#endif + int default_compression_level() const override { + return kLz4DefaultCompressionLevel; + } + + protected: + int compression_level_; +}; + +// ---------------------------------------------------------------------- +// Lz4 Hadoop "raw" codec implementation + +class Lz4HadoopCodec : public Lz4Codec { + public: + Lz4HadoopCodec() : Lz4Codec(kUseDefaultCompressionLevel) {} + + Result Decompress( + int64_t input_len, + const uint8_t* input, + int64_t output_buffer_len, + uint8_t* output_buffer) override { + const int64_t decompressed_size = + TryDecompressHadoop(input_len, input, output_buffer_len, output_buffer); + if (decompressed_size != kNotHadoop) { + return decompressed_size; + } + // Fall back on raw LZ4 codec (for files produces by earlier versions of + // Parquet C++) + return Lz4Codec::Decompress( + input_len, input, output_buffer_len, output_buffer); + } + + int64_t MaxCompressedLen( + int64_t input_len, + const uint8_t* ARROW_ARG_UNUSED(input)) override { + return kPrefixLength + Lz4Codec::MaxCompressedLen(input_len, nullptr); + } + + Result Compress( + int64_t input_len, + const uint8_t* input, + int64_t output_buffer_len, + uint8_t* output_buffer) override { + if (output_buffer_len < kPrefixLength) { + return Status::Invalid( + "Output buffer too small for Lz4HadoopCodec compression"); + } + + ARROW_ASSIGN_OR_RAISE( + int64_t output_len, + Lz4Codec::Compress( + input_len, + input, + output_buffer_len - kPrefixLength, + output_buffer + kPrefixLength)); + + // Prepend decompressed size in bytes and compressed size in bytes + // to be compatible with Hadoop Lz4Codec + const uint32_t decompressed_size = + bit_util::ToBigEndian(static_cast(input_len)); + const uint32_t compressed_size = + bit_util::ToBigEndian(static_cast(output_len)); + ::arrow::util::SafeStore(output_buffer, decompressed_size); + ::arrow::util::SafeStore(output_buffer + sizeof(uint32_t), compressed_size); + + return kPrefixLength + output_len; + } + + Result> MakeCompressor() override { + return Status::NotImplemented( + "Streaming compression unsupported with LZ4 Hadoop raw format. " + "Try using LZ4 frame format instead."); + } + + Result> MakeDecompressor() override { + return Status::NotImplemented( + "Streaming decompression unsupported with LZ4 Hadoop raw format. " + "Try using LZ4 frame format instead."); + } + + Compression::type compression_type() const override { + return Compression::LZ4_HADOOP; + } + + protected: + // Offset starting at which page data can be read/written + static const int64_t kPrefixLength = sizeof(uint32_t) * 2; + + static const int64_t kNotHadoop = -1; + + int64_t TryDecompressHadoop( + int64_t input_len, + const uint8_t* input, + int64_t output_buffer_len, + uint8_t* output_buffer) { + // Parquet files written with the Hadoop Lz4Codec use their own framing. + // The input buffer can contain an arbitrary number of "frames", each + // with the following structure: + // - bytes 0..3: big-endian uint32_t representing the frame decompressed + // size + // - bytes 4..7: big-endian uint32_t representing the frame compressed size + // - bytes 8...: frame compressed data + // + // The Hadoop Lz4Codec source code can be found here: + // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc + int64_t total_decompressed_size = 0; + + while (input_len >= kPrefixLength) { + const uint32_t expected_decompressed_size = + bit_util::FromBigEndian(::arrow::util::SafeLoadAs(input)); + const uint32_t expected_compressed_size = bit_util::FromBigEndian( + ::arrow::util::SafeLoadAs(input + sizeof(uint32_t))); + input += kPrefixLength; + input_len -= kPrefixLength; + + if (input_len < expected_compressed_size) { + // Not enough bytes for Hadoop "frame" + return kNotHadoop; + } + if (output_buffer_len < expected_decompressed_size) { + // Not enough bytes to hold advertised output => probably not Hadoop + return kNotHadoop; + } + // Try decompressing and compare with expected decompressed length + auto maybe_decompressed_size = Lz4Codec::Decompress( + expected_compressed_size, input, output_buffer_len, output_buffer); + if (!maybe_decompressed_size.ok() || + *maybe_decompressed_size != expected_decompressed_size) { + return kNotHadoop; + } + input += expected_compressed_size; + input_len -= expected_compressed_size; + output_buffer += expected_decompressed_size; + output_buffer_len -= expected_decompressed_size; + total_decompressed_size += expected_decompressed_size; + } + + if (input_len == 0) { + return total_decompressed_size; + } else { + return kNotHadoop; + } + } + + int minimum_compression_level() const override { + return kUseDefaultCompressionLevel; + } + int maximum_compression_level() const override { + return kUseDefaultCompressionLevel; + } + int default_compression_level() const override { + return kUseDefaultCompressionLevel; + } +}; + +} // namespace + +std::unique_ptr MakeLz4FrameCodec(int compression_level) { + return std::make_unique(compression_level); +} + +std::unique_ptr MakeLz4HadoopRawCodec() { + return std::make_unique(); +} + +std::unique_ptr MakeLz4RawCodec(int compression_level) { + return std::make_unique(compression_level); +} + +} // namespace facebook::velox::parquet::arrow::util::internal