From 46f36f4bf8815cf189877679dde63f644d2cc1aa Mon Sep 17 00:00:00 2001 From: Jialiang Tan Date: Sun, 6 Oct 2024 16:11:09 -0700 Subject: [PATCH] Fix Presto serde bug when deserializing large payload --- velox/common/base/CMakeLists.txt | 1 + velox/common/base/IOUtils.cpp | 36 ++++++++++ velox/common/base/IOUtils.h | 9 +++ velox/common/base/tests/CMakeLists.txt | 1 + velox/common/base/tests/IOUtilsTest.cpp | 65 +++++++++++++++++++ velox/serializers/PrestoSerializer.cpp | 9 +-- .../tests/PrestoSerializerTest.cpp | 9 +++ 7 files changed, 126 insertions(+), 4 deletions(-) create mode 100644 velox/common/base/IOUtils.cpp create mode 100644 velox/common/base/tests/IOUtilsTest.cpp diff --git a/velox/common/base/CMakeLists.txt b/velox/common/base/CMakeLists.txt index cfbfdc90ea669..62a4679e1e582 100644 --- a/velox/common/base/CMakeLists.txt +++ b/velox/common/base/CMakeLists.txt @@ -29,6 +29,7 @@ velox_add_library( BitUtil.cpp Counters.cpp Fs.cpp + IOUtils.cpp PeriodicStatsReporter.cpp RandomUtil.cpp RawVector.cpp diff --git a/velox/common/base/IOUtils.cpp b/velox/common/base/IOUtils.cpp new file mode 100644 index 0000000000000..04e0a05637eb9 --- /dev/null +++ b/velox/common/base/IOUtils.cpp @@ -0,0 +1,36 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/base/IOUtils.h" +#include "velox/common/memory/ByteStream.h" + +namespace facebook::velox::common { + +std::vector byteRangesFromIOBuf(folly::IOBuf* iobuf) { + if (iobuf == nullptr) { + return {}; + } + std::vector byteRanges; + auto* current = iobuf; + do { + byteRanges.push_back( + {current->writableData(), (int32_t)current->length(), 0}); + current = current->next(); + } while (current != iobuf); + return byteRanges; +} + +} // namespace facebook::velox::common diff --git a/velox/common/base/IOUtils.h b/velox/common/base/IOUtils.h index 281c97ed36f71..f349d69d96cbe 100644 --- a/velox/common/base/IOUtils.h +++ b/velox/common/base/IOUtils.h @@ -18,7 +18,16 @@ #include #include +#include + +namespace facebook::velox { +struct ByteRange; +} + namespace facebook::velox::common { + +std::vector byteRangesFromIOBuf(folly::IOBuf* iobuf); + struct OutputByteStream { explicit OutputByteStream(char* data, int32_t offset = 0) : data_(data), offset_{offset} {} diff --git a/velox/common/base/tests/CMakeLists.txt b/velox/common/base/tests/CMakeLists.txt index 9a10ec270b86d..469ff70265af4 100644 --- a/velox/common/base/tests/CMakeLists.txt +++ b/velox/common/base/tests/CMakeLists.txt @@ -22,6 +22,7 @@ add_executable( ConcurrentCounterTest.cpp ExceptionTest.cpp FsTest.cpp + IOUtilsTest.cpp RangeTest.cpp RawVectorTest.cpp RuntimeMetricsTest.cpp diff --git a/velox/common/base/tests/IOUtilsTest.cpp b/velox/common/base/tests/IOUtilsTest.cpp new file mode 100644 index 0000000000000..f8158a9cbfb6f --- /dev/null +++ b/velox/common/base/tests/IOUtilsTest.cpp @@ -0,0 +1,65 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "velox/common/base/IOUtils.h" +#include "velox/common/memory/ByteStream.h" + +namespace facebook::velox::common { + +class IOUtilsTest : public testing::Test { + public: + std::unique_ptr createIOBuf(uint64_t size) { + auto buf = folly::IOBuf::create(size); + auto* writableData = buf->writableData(); + std::memset(writableData, '6', size); + buf->append(size); + return buf; + } +}; + +TEST_F(IOUtilsTest, iobufConsume) { + // Empty buffer test + auto emptyBufList = byteRangesFromIOBuf(nullptr); + ASSERT_TRUE(emptyBufList.empty()); + + // Single buffer test + const uint64_t bufCapacity = 1024; + auto iobuf = createIOBuf(bufCapacity); + auto oneBufList = byteRangesFromIOBuf(iobuf.get()); + ASSERT_EQ(oneBufList.size(), 1); + ASSERT_EQ(oneBufList[0].size, bufCapacity); + + // Multiple buffer test + const uint64_t numChainedBuf = 64; + auto head = createIOBuf(bufCapacity); + uint32_t count{1}; + folly::IOBuf* cur = head.get(); + while (count < numChainedBuf) { + cur->insertAfterThisOne(createIOBuf(bufCapacity)); + cur = cur->next(); + count++; + } + + auto rangeList = byteRangesFromIOBuf(head.get()); + ASSERT_EQ(rangeList.size(), numChainedBuf); + for (const auto& range : rangeList) { + ASSERT_EQ(range.size, bufCapacity); + } +} + +} // namespace facebook::velox::common diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 4036e47912667..f5ad87d1eb368 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -20,6 +20,7 @@ #include #include "velox/common/base/Crc.h" +#include "velox/common/base/IOUtils.h" #include "velox/common/base/RawVector.h" #include "velox/common/memory/ByteStream.h" #include "velox/vector/BiasVector.h" @@ -4220,12 +4221,12 @@ void PrestoVectorSerde::deserialize( auto compressBuf = folly::IOBuf::create(header.compressedSize); source->readBytes(compressBuf->writableData(), header.compressedSize); compressBuf->append(header.compressedSize); + + // Process chained uncompressed results IOBufs. auto uncompress = codec->uncompress(compressBuf.get(), header.uncompressedSize); - ByteRange byteRange{ - uncompress->writableData(), (int32_t)uncompress->length(), 0}; - auto uncompressedSource = - std::make_unique(std::vector{byteRange}); + auto uncompressedSource = std::make_unique( + common::byteRangesFromIOBuf(uncompress.get())); readTopColumns( *uncompressedSource, type, pool, *result, resultOffset, prestoOptions); } diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index 9f4817a00c76b..d195ca5f79a29 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -775,6 +775,15 @@ TEST_P(PrestoSerializerTest, basic) { testRoundTrip(rowVector); } +TEST_P(PrestoSerializerTest, basicLarge) { + const vector_size_t numRows = 80'000; + auto rowVector = makeRowVector( + {makeFlatVector(numRows, [](vector_size_t row) { return row; }), + makeFlatVector( + numRows, [](vector_size_t row) { return std::string(1024, 'x'); })}); + testRoundTrip(rowVector); +} + /// Test serialization of a dictionary vector that adds nulls to the base /// vector. TEST_P(PrestoSerializerTest, dictionaryWithExtraNulls) {