Skip to content

Commit

Permalink
Fix Presto serde bug when deserializing large payload
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Oct 7, 2024
1 parent 96944d5 commit 46f36f4
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 4 deletions.
1 change: 1 addition & 0 deletions velox/common/base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ velox_add_library(
BitUtil.cpp
Counters.cpp
Fs.cpp
IOUtils.cpp
PeriodicStatsReporter.cpp
RandomUtil.cpp
RawVector.cpp
Expand Down
36 changes: 36 additions & 0 deletions velox/common/base/IOUtils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/common/base/IOUtils.h"
#include "velox/common/memory/ByteStream.h"

namespace facebook::velox::common {

std::vector<ByteRange> byteRangesFromIOBuf(folly::IOBuf* iobuf) {
if (iobuf == nullptr) {
return {};
}
std::vector<ByteRange> byteRanges;
auto* current = iobuf;
do {
byteRanges.push_back(
{current->writableData(), (int32_t)current->length(), 0});
current = current->next();
} while (current != iobuf);
return byteRanges;
}

} // namespace facebook::velox::common
9 changes: 9 additions & 0 deletions velox/common/base/IOUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,16 @@
#include <cstdint>
#include <cstring>

#include <folly/io/IOBuf.h>

namespace facebook::velox {
struct ByteRange;
}

namespace facebook::velox::common {

std::vector<ByteRange> byteRangesFromIOBuf(folly::IOBuf* iobuf);

struct OutputByteStream {
explicit OutputByteStream(char* data, int32_t offset = 0)
: data_(data), offset_{offset} {}
Expand Down
1 change: 1 addition & 0 deletions velox/common/base/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ add_executable(
ConcurrentCounterTest.cpp
ExceptionTest.cpp
FsTest.cpp
IOUtilsTest.cpp
RangeTest.cpp
RawVectorTest.cpp
RuntimeMetricsTest.cpp
Expand Down
65 changes: 65 additions & 0 deletions velox/common/base/tests/IOUtilsTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <gtest/gtest.h>

#include "velox/common/base/IOUtils.h"
#include "velox/common/memory/ByteStream.h"

namespace facebook::velox::common {

class IOUtilsTest : public testing::Test {
public:
std::unique_ptr<folly::IOBuf> createIOBuf(uint64_t size) {
auto buf = folly::IOBuf::create(size);
auto* writableData = buf->writableData();
std::memset(writableData, '6', size);
buf->append(size);
return buf;
}
};

TEST_F(IOUtilsTest, iobufConsume) {
// Empty buffer test
auto emptyBufList = byteRangesFromIOBuf(nullptr);
ASSERT_TRUE(emptyBufList.empty());

// Single buffer test
const uint64_t bufCapacity = 1024;
auto iobuf = createIOBuf(bufCapacity);
auto oneBufList = byteRangesFromIOBuf(iobuf.get());
ASSERT_EQ(oneBufList.size(), 1);
ASSERT_EQ(oneBufList[0].size, bufCapacity);

// Multiple buffer test
const uint64_t numChainedBuf = 64;
auto head = createIOBuf(bufCapacity);
uint32_t count{1};
folly::IOBuf* cur = head.get();
while (count < numChainedBuf) {
cur->insertAfterThisOne(createIOBuf(bufCapacity));
cur = cur->next();
count++;
}

auto rangeList = byteRangesFromIOBuf(head.get());
ASSERT_EQ(rangeList.size(), numChainedBuf);
for (const auto& range : rangeList) {
ASSERT_EQ(range.size, bufCapacity);
}
}

} // namespace facebook::velox::common
9 changes: 5 additions & 4 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <folly/lang/Bits.h>

#include "velox/common/base/Crc.h"
#include "velox/common/base/IOUtils.h"
#include "velox/common/base/RawVector.h"
#include "velox/common/memory/ByteStream.h"
#include "velox/vector/BiasVector.h"
Expand Down Expand Up @@ -4220,12 +4221,12 @@ void PrestoVectorSerde::deserialize(
auto compressBuf = folly::IOBuf::create(header.compressedSize);
source->readBytes(compressBuf->writableData(), header.compressedSize);
compressBuf->append(header.compressedSize);

// Process chained uncompressed results IOBufs.
auto uncompress =
codec->uncompress(compressBuf.get(), header.uncompressedSize);
ByteRange byteRange{
uncompress->writableData(), (int32_t)uncompress->length(), 0};
auto uncompressedSource =
std::make_unique<BufferInputStream>(std::vector<ByteRange>{byteRange});
auto uncompressedSource = std::make_unique<BufferInputStream>(
common::byteRangesFromIOBuf(uncompress.get()));
readTopColumns(
*uncompressedSource, type, pool, *result, resultOffset, prestoOptions);
}
Expand Down
9 changes: 9 additions & 0 deletions velox/serializers/tests/PrestoSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,15 @@ TEST_P(PrestoSerializerTest, basic) {
testRoundTrip(rowVector);
}

TEST_P(PrestoSerializerTest, basicLarge) {
const vector_size_t numRows = 80'000;
auto rowVector = makeRowVector(
{makeFlatVector<int64_t>(numRows, [](vector_size_t row) { return row; }),
makeFlatVector<std::string>(
numRows, [](vector_size_t row) { return std::string(1024, 'x'); })});
testRoundTrip(rowVector);
}

/// Test serialization of a dictionary vector that adds nulls to the base
/// vector.
TEST_P(PrestoSerializerTest, dictionaryWithExtraNulls) {
Expand Down

0 comments on commit 46f36f4

Please sign in to comment.