Skip to content

Commit

Permalink
Fix Presto serde bug when deserializing large payload (facebookincuba…
Browse files Browse the repository at this point in the history
…tor#11177)

Summary:
codec gives back chained iobufs when payload is very large. We only take care of unchained scenario and was able to get away with it. Fix the case for large payload and added unit test for it. The estimated buffer size to trigger this bug is around 80MB in GZIP, ZSTD and ZLIB

Pull Request resolved: facebookincubator#11177

Reviewed By: xiaoxmeng

Differential Revision: D63962642

Pulled By: tanjialiang

fbshipit-source-id: cbaf2bb5518de786c69461b5f8d725732c9f6fe8
  • Loading branch information
tanjialiang authored and athmaja-n committed Jan 10, 2025
1 parent 8b94d92 commit d0dc6ef
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 4 deletions.
14 changes: 14 additions & 0 deletions velox/common/memory/ByteStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@

namespace facebook::velox {

std::vector<ByteRange> byteRangesFromIOBuf(folly::IOBuf* iobuf) {
if (iobuf == nullptr) {
return {};
}
std::vector<ByteRange> byteRanges;
auto* current = iobuf;
do {
byteRanges.push_back(
{current->writableData(), (int32_t)current->length(), 0});
current = current->next();
} while (current != iobuf);
return byteRanges;
}

uint32_t ByteRange::availableBytes() const {
return std::max(0, size - position);
}
Expand Down
2 changes: 2 additions & 0 deletions velox/common/memory/ByteStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ struct ByteRange {
std::string toString() const;
};

std::vector<ByteRange> byteRangesFromIOBuf(folly::IOBuf* iobuf);

class OutputStreamListener {
public:
virtual void onWrite(const char* /* s */, std::streamsize /* count */) {}
Expand Down
38 changes: 38 additions & 0 deletions velox/common/memory/tests/ByteStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,50 @@ class ByteStreamTest : public testing::Test {
return std::make_unique<StreamArena>(pool_.get());
}

std::unique_ptr<folly::IOBuf> createIOBuf(uint64_t size) {
auto buf = folly::IOBuf::create(size);
auto* writableData = buf->writableData();
std::memset(writableData, '6', size);
buf->append(size);
return buf;
}

folly::Random::DefaultGenerator rng_;
std::unique_ptr<MemoryManager> memoryManager_;
MmapAllocator* mmapAllocator_;
std::shared_ptr<memory::MemoryPool> pool_;
};

TEST_F(ByteStreamTest, iobufConsume) {
// Empty buffer test
auto emptyBufList = byteRangesFromIOBuf(nullptr);
ASSERT_TRUE(emptyBufList.empty());

// Single buffer test
const uint64_t bufCapacity = 1024;
auto iobuf = createIOBuf(bufCapacity);
auto oneBufList = byteRangesFromIOBuf(iobuf.get());
ASSERT_EQ(oneBufList.size(), 1);
ASSERT_EQ(oneBufList[0].size, bufCapacity);

// Multiple buffer test
const uint64_t numChainedBuf = 64;
auto head = createIOBuf(bufCapacity);
uint32_t count{1};
folly::IOBuf* cur = head.get();
while (count < numChainedBuf) {
cur->insertAfterThisOne(createIOBuf(bufCapacity));
cur = cur->next();
count++;
}

auto rangeList = byteRangesFromIOBuf(head.get());
ASSERT_EQ(rangeList.size(), numChainedBuf);
for (const auto& range : rangeList) {
ASSERT_EQ(range.size, bufCapacity);
}
}

TEST_F(ByteStreamTest, outputStream) {
auto out = std::make_unique<IOBufOutputStream>(*pool_, nullptr, 10000);
std::stringstream referenceSStream;
Expand Down
9 changes: 5 additions & 4 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <folly/lang/Bits.h>

#include "velox/common/base/Crc.h"
#include "velox/common/base/IOUtils.h"
#include "velox/common/base/RawVector.h"
#include "velox/common/memory/ByteStream.h"
#include "velox/vector/BiasVector.h"
Expand Down Expand Up @@ -4220,12 +4221,12 @@ void PrestoVectorSerde::deserialize(
auto compressBuf = folly::IOBuf::create(header.compressedSize);
source->readBytes(compressBuf->writableData(), header.compressedSize);
compressBuf->append(header.compressedSize);

// Process chained uncompressed results IOBufs.
auto uncompress =
codec->uncompress(compressBuf.get(), header.uncompressedSize);
ByteRange byteRange{
uncompress->writableData(), (int32_t)uncompress->length(), 0};
auto uncompressedSource =
std::make_unique<BufferInputStream>(std::vector<ByteRange>{byteRange});
auto uncompressedSource = std::make_unique<BufferInputStream>(
byteRangesFromIOBuf(uncompress.get()));
readTopColumns(
*uncompressedSource, type, pool, *result, resultOffset, prestoOptions);
}
Expand Down
9 changes: 9 additions & 0 deletions velox/serializers/tests/PrestoSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,15 @@ TEST_P(PrestoSerializerTest, basic) {
testRoundTrip(rowVector);
}

TEST_P(PrestoSerializerTest, basicLarge) {
const vector_size_t numRows = 80'000;
auto rowVector = makeRowVector(
{makeFlatVector<int64_t>(numRows, [](vector_size_t row) { return row; }),
makeFlatVector<std::string>(
numRows, [](vector_size_t row) { return std::string(1024, 'x'); })});
testRoundTrip(rowVector);
}

/// Test serialization of a dictionary vector that adds nulls to the base
/// vector.
TEST_P(PrestoSerializerTest, dictionaryWithExtraNulls) {
Expand Down

0 comments on commit d0dc6ef

Please sign in to comment.