Skip to content

Commit

Permalink
only apply for BufferInputStream
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed Dec 13, 2024
1 parent 7cfade1 commit 5dfe257
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 19 deletions.
25 changes: 25 additions & 0 deletions velox/common/memory/ByteStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,31 @@ std::string_view BufferInputStream::nextView(int32_t size) {
reinterpret_cast<char*>(current_->buffer) + position, viewSize);
}

std::unique_ptr<folly::IOBuf> BufferInputStream::readBytes(int32_t size) {
VELOX_CHECK_GE(size, 0, "Attempting to read negative number of bytes");
if (size == 0) {
return nullptr;
}
std::unique_ptr<folly::IOBuf> result;
for (;;) {
const int32_t availableBytes = current_->size - current_->position;
const int32_t readBytes = std::min(availableBytes, size);
auto newBuf = folly::IOBuf::wrapBuffer(
current_->buffer + current_->position, readBytes);
if (result) {
result->prev()->appendChain(std::move(newBuf));
} else {
result = std::move(newBuf);
}
size -= readBytes;
current_->position += readBytes;
if (size == 0) {
return result;
}
nextRange();
}
}

void BufferInputStream::skip(int32_t size) {
VELOX_CHECK_GE(size, 0, "Attempting to skip negative number of bytes");
for (;;) {
Expand Down
2 changes: 2 additions & 0 deletions velox/common/memory/ByteStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ class BufferInputStream : public ByteInputStream {

void readBytes(uint8_t* bytes, int32_t size) override;

std::unique_ptr<folly::IOBuf> readBytes(int32_t size);

std::string_view nextView(int32_t size) override;

void skip(int32_t size) override;
Expand Down
38 changes: 38 additions & 0 deletions velox/common/memory/tests/ByteStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,44 @@ TEST_P(InputByteStreamTest, inputStream) {
ASSERT_TRUE(byteStream->atEnd());
}

TEST_P(InputByteStreamTest, iobuf) {
if (!GetParam()) {
return;
}
const auto streamSize = 4096;
std::vector<ByteRange> byteRanges;
std::uint8_t buffer[streamSize];
for (auto i = 0; i < streamSize; ++i) {
buffer[i] = i % 256;
}
byteRanges.push_back(ByteRange{buffer, streamSize, 0});

std::uint8_t buffer2[streamSize];
for (auto i = 0; i < streamSize; ++i) {
buffer2[i] = i % 13;
}
byteRanges.push_back(ByteRange{buffer2, streamSize, 0});

auto byteStream = createStream(byteRanges);
auto bufferStream = dynamic_cast<BufferInputStream*>(byteStream.get());
for (int offset = 0; offset < streamSize;) {
auto iobuf = bufferStream->readBytes(streamSize / 8);
ASSERT_EQ(iobuf->computeChainDataLength(), streamSize / 8);
for (int i = 0; i < streamSize / 8; ++i, ++offset) {
ASSERT_EQ(iobuf->data()[i], offset % 256);
}
}

for (int offset = 0; offset < streamSize;) {
auto iobuf = bufferStream->readBytes(streamSize / 8);
ASSERT_EQ(iobuf->computeChainDataLength(), streamSize / 8);
for (int i = 0; i < streamSize / 8; ++i, ++offset) {
ASSERT_EQ(iobuf->data()[i], offset % 13);
}
}
ASSERT_TRUE(bufferStream->atEnd());
}

TEST_P(InputByteStreamTest, emptyInputStreamError) {
if (GetParam()) {
VELOX_ASSERT_THROW(createStream({}), "Empty BufferInputStream");
Expand Down
35 changes: 16 additions & 19 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4360,28 +4360,25 @@ void readTopColumns(
&source, childTypes, resultOffset, nullptr, 0, pool, opts, children);
}

std::unique_ptr<folly::IOBuf> uncompressBuffer(
std::unique_ptr<folly::IOBuf> uncompressStream(
ByteInputStream* source,
const PrestoHeader& header,
common::CompressionKind compressionKind) {
common::CompressionKind compressionKind,
memory::MemoryPool& pool) {
const auto codec = common::compressionKindToCodec(compressionKind);
std::unique_ptr<folly::IOBuf> iobuf;
int32_t readCount = 0;
while (readCount < header.compressedSize) {
const auto remaining = header.compressedSize - readCount;
auto view = source->nextView(remaining);
readCount += view.size();
auto newBuf = folly::IOBuf::wrapBuffer(view.data(), view.size());
if (iobuf) {
iobuf->prev()->appendChain(std::move(newBuf));
} else {
iobuf = std::move(newBuf);
}
}

VELOX_DCHECK_EQ(readCount, header.compressedSize);
if (dynamic_cast<BufferInputStream*>(source)) {
// If the source is a BufferInputStream, we can avoid copying the data
// by creating an IOBuf from the underlying buffer.
const auto bufferSource = dynamic_cast<BufferInputStream*>(source);
const auto iobuf = bufferSource->readBytes(header.compressedSize);
// Process chained uncompressed results IOBufs.
return codec->uncompress(iobuf.get(), header.uncompressedSize);
}
auto compressBuf = folly::IOBuf::create(header.compressedSize);
source->readBytes(compressBuf->writableData(), header.compressedSize);
compressBuf->append(header.compressedSize);
// Process chained uncompressed results IOBufs.
return codec->uncompress(iobuf.get(), header.uncompressedSize);
return codec->uncompress(compressBuf.get(), header.uncompressedSize);
}
} // namespace

Expand Down Expand Up @@ -4436,7 +4433,7 @@ void PrestoVectorSerde::deserialize(
readTopColumns(*source, type, pool, *result, resultOffset, prestoOptions);
} else {
auto uncompress =
uncompressBuffer(source, header, prestoOptions.compressionKind);
uncompressStream(source, header, prestoOptions.compressionKind, *pool);
auto uncompressedSource = std::make_unique<BufferInputStream>(
byteRangesFromIOBuf(uncompress.get()));
readTopColumns(
Expand Down

0 comments on commit 5dfe257

Please sign in to comment.