Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed Dec 14, 2024
1 parent 5dfe257 commit 7c14239
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 32 deletions.
20 changes: 20 additions & 0 deletions velox/common/file/FileInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,26 @@ void FileInputStream::readBytes(uint8_t* bytes, int32_t size) {
}
}

std::unique_ptr<folly::IOBuf> FileInputStream::readBytes(int32_t size) {
VELOX_CHECK_GE(size, 0, "Attempting to read negative number of bytes");
if (size == 0) {
return nullptr;
}

VELOX_CHECK_LE(
size, remainingSize(), "Read past the end of input file {}", fileSize_);
if (current_->availableBytes() >= size) {
auto iobuf =
folly::IOBuf::wrapBuffer(current_->buffer + current_->position, size);
current_->position += size;
return iobuf;
}
auto iobuf = folly::IOBuf::create(size);
iobuf->append(size);
readBytes(iobuf->writableData(), size);
return iobuf;
}

std::string_view FileInputStream::nextView(int32_t size) {
VELOX_CHECK_GE(size, 0, "Attempting to view negative number of bytes");
if (remainingSize() == 0) {
Expand Down
2 changes: 2 additions & 0 deletions velox/common/file/FileInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class FileInputStream : public ByteInputStream {

void readBytes(uint8_t* bytes, int32_t size) override;

std::unique_ptr<folly::IOBuf> readBytes(int32_t size) override;

std::string_view nextView(int32_t size) override;

std::string toString() const override;
Expand Down
44 changes: 44 additions & 0 deletions velox/common/file/tests/FileInputStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,47 @@ TEST_F(FileInputStreamTest, stats) {
ASSERT_GT(byteStream->stats().readTimeNs, 0);
}
}

TEST_F(FileInputStreamTest, iobuf) {
struct {
size_t streamSize;
size_t bufferSize;

std::string debugString() const {
return fmt::format(
"streamSize {}, bufferSize {}", streamSize, bufferSize);
}
} testSettings[] = {
{4096, 1024}, {4096, 4096}, {4096, 8192}, {4096, 4096 + 1024}};

for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());

auto byteStream = createStream(testData.streamSize, testData.bufferSize);
// Read iobuf from buffer.
int offset = 0;
auto iobuf = byteStream->readBytes(3);
ASSERT_EQ(iobuf->length(), 3);
for (; offset < 3; ++offset) {
ASSERT_EQ(iobuf->data()[offset], offset % 256);
}

// Read iobuf from buffer from position 3.
iobuf = byteStream->readBytes(3);
ASSERT_EQ(iobuf->length(), 3);
for (; offset < 6; ++offset) {
ASSERT_EQ(iobuf->data()[offset - 3], offset % 256);
}

// Read iobuf from next buffer.
while (offset < testData.streamSize) {
const auto readBytes =
std::min(testData.streamSize / 8, testData.streamSize - offset);
auto iobuf = byteStream->readBytes(readBytes);
for (int i = 0; i < readBytes; ++i, ++offset) {
ASSERT_EQ(iobuf->data()[i], offset % 256);
}
}
ASSERT_TRUE(byteStream->atEnd());
}
}
2 changes: 1 addition & 1 deletion velox/common/memory/ByteStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ std::unique_ptr<folly::IOBuf> BufferInputStream::readBytes(int32_t size) {
auto newBuf = folly::IOBuf::wrapBuffer(
current_->buffer + current_->position, readBytes);
if (result) {
result->prev()->appendChain(std::move(newBuf));
result->appendToChain(std::move(newBuf));
} else {
result = std::move(newBuf);
}
Expand Down
6 changes: 5 additions & 1 deletion velox/common/memory/ByteStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ class ByteInputStream {
readBytes(reinterpret_cast<uint8_t*>(data), size);
}

virtual std::unique_ptr<folly::IOBuf> readBytes(int32_t size) = 0;

/// Returns a view over the read buffer for up to 'size' next bytes. The size
/// of the value may be less if the current byte range ends within 'size'
/// bytes from the current position. The size will be 0 if at end.
Expand Down Expand Up @@ -191,7 +193,9 @@ class BufferInputStream : public ByteInputStream {

void readBytes(uint8_t* bytes, int32_t size) override;

std::unique_ptr<folly::IOBuf> readBytes(int32_t size);
// We can avoid copying the data by creating an IOBuf from the underlying
// buffer.
std::unique_ptr<folly::IOBuf> readBytes(int32_t size) override;

std::string_view nextView(int32_t size) override;

Expand Down
29 changes: 11 additions & 18 deletions velox/common/memory/tests/ByteStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,9 +432,6 @@ TEST_P(InputByteStreamTest, inputStream) {
}

TEST_P(InputByteStreamTest, iobuf) {
if (!GetParam()) {
return;
}
const auto streamSize = 4096;
std::vector<ByteRange> byteRanges;
std::uint8_t buffer[streamSize];
Expand All @@ -450,23 +447,19 @@ TEST_P(InputByteStreamTest, iobuf) {
byteRanges.push_back(ByteRange{buffer2, streamSize, 0});

auto byteStream = createStream(byteRanges);
auto bufferStream = dynamic_cast<BufferInputStream*>(byteStream.get());
for (int offset = 0; offset < streamSize;) {
auto iobuf = bufferStream->readBytes(streamSize / 8);
ASSERT_EQ(iobuf->computeChainDataLength(), streamSize / 8);
for (int i = 0; i < streamSize / 8; ++i, ++offset) {
ASSERT_EQ(iobuf->data()[i], offset % 256);
}
}

for (int offset = 0; offset < streamSize;) {
auto iobuf = bufferStream->readBytes(streamSize / 8);
ASSERT_EQ(iobuf->computeChainDataLength(), streamSize / 8);
for (int i = 0; i < streamSize / 8; ++i, ++offset) {
ASSERT_EQ(iobuf->data()[i], offset % 13);
for (int offset = 0; offset < streamSize * 2;) {
const int readBytes = std::min(streamSize / 11, streamSize * 2 - offset);
auto iobuf = byteStream->readBytes(readBytes);
ASSERT_EQ(iobuf->computeChainDataLength(), readBytes);
for (int i = 0; i < readBytes; ++i, ++offset) {
if (offset < streamSize) {
ASSERT_EQ(iobuf->data()[i], offset % 256);
} else {
ASSERT_EQ(iobuf->data()[i], (offset - streamSize) % 13);
}
}
}
ASSERT_TRUE(bufferStream->atEnd());
ASSERT_TRUE(byteStream->atEnd());
}

TEST_P(InputByteStreamTest, emptyInputStreamError) {
Expand Down
14 changes: 2 additions & 12 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4366,19 +4366,9 @@ std::unique_ptr<folly::IOBuf> uncompressStream(
common::CompressionKind compressionKind,
memory::MemoryPool& pool) {
const auto codec = common::compressionKindToCodec(compressionKind);
if (dynamic_cast<BufferInputStream*>(source)) {
// If the source is a BufferInputStream, we can avoid copying the data
// by creating an IOBuf from the underlying buffer.
const auto bufferSource = dynamic_cast<BufferInputStream*>(source);
const auto iobuf = bufferSource->readBytes(header.compressedSize);
// Process chained uncompressed results IOBufs.
return codec->uncompress(iobuf.get(), header.uncompressedSize);
}
auto compressBuf = folly::IOBuf::create(header.compressedSize);
source->readBytes(compressBuf->writableData(), header.compressedSize);
compressBuf->append(header.compressedSize);
const auto iobuf = source->readBytes(header.compressedSize);
// Process chained uncompressed results IOBufs.
return codec->uncompress(compressBuf.get(), header.uncompressedSize);
return codec->uncompress(iobuf.get(), header.uncompressedSize);
}
} // namespace

Expand Down

0 comments on commit 7c14239

Please sign in to comment.