From 7cfade13b7d388ddae85ca1ec5fea4e49ed10480 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Thu, 12 Dec 2024 11:06:32 +0000 Subject: [PATCH] feat: Optimize PrestoSerializer compress buffer --- velox/serializers/PrestoSerializer.cpp | 33 +++++++++++++++++++------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 7e09c038a4bd..bb26a3adc2f7 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -4359,6 +4359,30 @@ void readTopColumns( readColumns( &source, childTypes, resultOffset, nullptr, 0, pool, opts, children); } + +std::unique_ptr uncompressBuffer( + ByteInputStream* source, + const PrestoHeader& header, + common::CompressionKind compressionKind) { + const auto codec = common::compressionKindToCodec(compressionKind); + std::unique_ptr iobuf; + int32_t readCount = 0; + while (readCount < header.compressedSize) { + const auto remaining = header.compressedSize - readCount; + auto view = source->nextView(remaining); + readCount += view.size(); + auto newBuf = folly::IOBuf::wrapBuffer(view.data(), view.size()); + if (iobuf) { + iobuf->prev()->appendChain(std::move(newBuf)); + } else { + iobuf = std::move(newBuf); + } + } + + VELOX_DCHECK_EQ(readCount, header.compressedSize); + // Process chained uncompressed results IOBufs. + return codec->uncompress(iobuf.get(), header.uncompressedSize); +} } // namespace void PrestoVectorSerde::deserialize( @@ -4369,8 +4393,6 @@ void PrestoVectorSerde::deserialize( vector_size_t resultOffset, const Options* options) { const auto prestoOptions = toPrestoOptions(options); - const auto codec = - common::compressionKindToCodec(prestoOptions.compressionKind); auto maybeHeader = PrestoHeader::read(source); VELOX_CHECK( maybeHeader.hasValue(), @@ -4413,13 +4435,8 @@ void PrestoVectorSerde::deserialize( if (!isCompressedBitSet(header.pageCodecMarker)) { readTopColumns(*source, type, pool, *result, resultOffset, prestoOptions); } else { - auto compressBuf = folly::IOBuf::create(header.compressedSize); - source->readBytes(compressBuf->writableData(), header.compressedSize); - compressBuf->append(header.compressedSize); - - // Process chained uncompressed results IOBufs. auto uncompress = - codec->uncompress(compressBuf.get(), header.uncompressedSize); + uncompressBuffer(source, header, prestoOptions.compressionKind); auto uncompressedSource = std::make_unique( byteRangesFromIOBuf(uncompress.get())); readTopColumns(