diff --git a/velox/serializers/CompactRowSerializer.cpp b/velox/serializers/CompactRowSerializer.cpp index b2471e09fbea..0730ca546e33 100644 --- a/velox/serializers/CompactRowSerializer.cpp +++ b/velox/serializers/CompactRowSerializer.cpp @@ -39,6 +39,21 @@ VectorSerde::Options toValidOptions(const VectorSerde::Options* options) { return *options; } +std::unique_ptr buffersToIOBuf( + const std::vector& buffers) { + std::unique_ptr iobuf; + for (const auto& buffer : buffers) { + auto newBuf = + folly::IOBuf::wrapBuffer(buffer->asMutable(), buffer->size()); + if (iobuf) { + iobuf->prev()->appendChain(std::move(newBuf)); + } else { + iobuf = std::move(newBuf); + } + } + return iobuf; +} + struct CompactRowHeader { int32_t uncompressedSize; int32_t compressedSize; @@ -181,8 +196,8 @@ class CompactRowVectorSerializer : public IterativeVectorSerializer { return CompactRowHeader::size() + codec_->maxCompressedLength(size); } - // The serialization format is | uncompressedSize | compressedSize | data | - // when compressed. + // The serialization format is | uncompressedSize | compressedSize | + // compressed | data | when compressed. void flush(OutputStream* stream) override { constexpr int32_t kMaxCompressionAttemptsToSkip = 30; const auto size = uncompressedSize(); @@ -195,12 +210,8 @@ class CompactRowVectorSerializer : public IterativeVectorSerializer { ++stats_.numCompressionSkipped; } else { // Compress the buffer if satisfied condition. - IOBufOutputStream out( - *pool_, nullptr, buffers_.size() * sizeof(std::string_view)); - for (const auto& buffer : buffers_) { - out.write(buffer->asMutable(), buffer->size()); - } - const auto compressedBuffer = codec_->compress(out.getIOBuf().get()); + const auto toCompress = buffersToIOBuf(buffers_); + const auto compressedBuffer = codec_->compress(toCompress.get()); const auto compressedSize = compressedBuffer->length(); stats_.compressionInputBytes += size; stats_.compressedBytes += compressedSize; diff --git a/velox/serializers/UnsafeRowSerializer.cpp b/velox/serializers/UnsafeRowSerializer.cpp index 4c46a16c39f3..28ad994b84d9 100644 --- a/velox/serializers/UnsafeRowSerializer.cpp +++ b/velox/serializers/UnsafeRowSerializer.cpp @@ -38,6 +38,21 @@ VectorSerde::Options toValidOptions(const VectorSerde::Options* options) { return *options; } +std::unique_ptr buffersToIOBuf( + const std::vector& buffers) { + std::unique_ptr iobuf; + for (const auto& buffer : buffers) { + auto newBuf = + folly::IOBuf::wrapBuffer(buffer->asMutable(), buffer->size()); + if (iobuf) { + iobuf->prev()->appendChain(std::move(newBuf)); + } else { + iobuf = std::move(newBuf); + } + } + return iobuf; +} + // The compressedSize is equal to uncompressedSize when not compressed. struct UnsafeRowHeader { int32_t uncompressedSize; @@ -160,8 +175,8 @@ class UnsafeRowVectorSerializer : public IterativeVectorSerializer { return UnsafeRowHeader::size() + codec_->maxCompressedLength(size); } - // The serialization format is | uncompressedSize | compressedSize | data | - // when compressed. + // The serialization format is | uncompressedSize | compressedSize | + // compressed | data | when compressed. void flush(OutputStream* stream) override { constexpr int32_t kMaxCompressionAttemptsToSkip = 30; const auto size = uncompressedSize(); @@ -174,12 +189,8 @@ class UnsafeRowVectorSerializer : public IterativeVectorSerializer { ++stats_.numCompressionSkipped; } else { // Compress the buffer if satisfied condition. - IOBufOutputStream out( - *pool_, nullptr, buffers_.size() * sizeof(std::string_view)); - for (const auto& buffer : buffers_) { - out.write(buffer->asMutable(), buffer->size()); - } - const auto compressedBuffer = codec_->compress(out.getIOBuf().get()); + const auto toCompress = buffersToIOBuf(buffers_); + const auto compressedBuffer = codec_->compress(toCompress.get()); const auto compressedSize = compressedBuffer->length(); stats_.compressionInputBytes += size; stats_.compressedBytes += compressedSize;