diff --git a/velox/common/file/FileInputStream.cpp b/velox/common/file/FileInputStream.cpp index 56efdc4e1c3a..77bb3020d7d8 100644 --- a/velox/common/file/FileInputStream.cpp +++ b/velox/common/file/FileInputStream.cpp @@ -185,6 +185,26 @@ void FileInputStream::readBytes(uint8_t* bytes, int32_t size) { } } +std::unique_ptr FileInputStream::readBytes(int32_t size) { + VELOX_CHECK_GE(size, 0, "Attempting to read negative number of bytes"); + if (size == 0) { + return nullptr; + } + + VELOX_CHECK_LE( + size, remainingSize(), "Read past the end of input file {}", fileSize_); + if (current_->availableBytes() >= size) { + auto iobuf = + folly::IOBuf::wrapBuffer(current_->buffer + current_->position, size); + current_->position += size; + return iobuf; + } + auto iobuf = folly::IOBuf::create(size); + iobuf->append(size); + readBytes(iobuf->writableData(), size); + return iobuf; +} + std::string_view FileInputStream::nextView(int32_t size) { VELOX_CHECK_GE(size, 0, "Attempting to view negative number of bytes"); if (remainingSize() == 0) { diff --git a/velox/common/file/FileInputStream.h b/velox/common/file/FileInputStream.h index 6daf9f84e109..fa3145f70897 100644 --- a/velox/common/file/FileInputStream.h +++ b/velox/common/file/FileInputStream.h @@ -55,6 +55,8 @@ class FileInputStream : public ByteInputStream { void readBytes(uint8_t* bytes, int32_t size) override; + std::unique_ptr readBytes(int32_t size) override; + std::string_view nextView(int32_t size) override; std::string toString() const override; diff --git a/velox/common/file/tests/FileInputStreamTest.cpp b/velox/common/file/tests/FileInputStreamTest.cpp index 639be297ba46..5cc6f65e20ba 100644 --- a/velox/common/file/tests/FileInputStreamTest.cpp +++ b/velox/common/file/tests/FileInputStreamTest.cpp @@ -112,3 +112,47 @@ TEST_F(FileInputStreamTest, stats) { ASSERT_GT(byteStream->stats().readTimeNs, 0); } } + +TEST_F(FileInputStreamTest, iobuf) { + struct { + size_t streamSize; + size_t bufferSize; + + std::string debugString() const { + return fmt::format( + "streamSize {}, bufferSize {}", streamSize, bufferSize); + } + } testSettings[] = { + {4096, 1024}, {4096, 4096}, {4096, 8192}, {4096, 4096 + 1024}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + + auto byteStream = createStream(testData.streamSize, testData.bufferSize); + // Read iobuf from buffer. + int offset = 0; + auto iobuf = byteStream->readBytes(3); + ASSERT_EQ(iobuf->length(), 3); + for (; offset < 3; ++offset) { + ASSERT_EQ(iobuf->data()[offset], offset % 256); + } + + // Read iobuf from buffer from position 3. + iobuf = byteStream->readBytes(3); + ASSERT_EQ(iobuf->length(), 3); + for (; offset < 6; ++offset) { + ASSERT_EQ(iobuf->data()[offset - 3], offset % 256); + } + + // Read iobuf from next buffer. + while (offset < testData.streamSize) { + const auto readBytes = + std::min(testData.streamSize / 8, testData.streamSize - offset); + auto iobuf = byteStream->readBytes(readBytes); + for (int i = 0; i < readBytes; ++i, ++offset) { + ASSERT_EQ(iobuf->data()[i], offset % 256); + } + } + ASSERT_TRUE(byteStream->atEnd()); + } +} diff --git a/velox/common/memory/ByteStream.cpp b/velox/common/memory/ByteStream.cpp index d70886b5cef8..4735255ee136 100644 --- a/velox/common/memory/ByteStream.cpp +++ b/velox/common/memory/ByteStream.cpp @@ -184,7 +184,7 @@ std::unique_ptr BufferInputStream::readBytes(int32_t size) { auto newBuf = folly::IOBuf::wrapBuffer( current_->buffer + current_->position, readBytes); if (result) { - result->prev()->appendChain(std::move(newBuf)); + result->appendToChain(std::move(newBuf)); } else { result = std::move(newBuf); } diff --git a/velox/common/memory/ByteStream.h b/velox/common/memory/ByteStream.h index 931324f02f75..361cf9569c69 100644 --- a/velox/common/memory/ByteStream.h +++ b/velox/common/memory/ByteStream.h @@ -148,6 +148,8 @@ class ByteInputStream { readBytes(reinterpret_cast(data), size); } + virtual std::unique_ptr readBytes(int32_t size) = 0; + /// Returns a view over the read buffer for up to 'size' next bytes. The size /// of the value may be less if the current byte range ends within 'size' /// bytes from the current position. The size will be 0 if at end. @@ -191,7 +193,9 @@ class BufferInputStream : public ByteInputStream { void readBytes(uint8_t* bytes, int32_t size) override; - std::unique_ptr readBytes(int32_t size); + // We can avoid copying the data by creating an IOBuf from the underlying + // buffer. + std::unique_ptr readBytes(int32_t size) override; std::string_view nextView(int32_t size) override; diff --git a/velox/common/memory/tests/ByteStreamTest.cpp b/velox/common/memory/tests/ByteStreamTest.cpp index fde48baab388..4fa11e7e09a8 100644 --- a/velox/common/memory/tests/ByteStreamTest.cpp +++ b/velox/common/memory/tests/ByteStreamTest.cpp @@ -432,9 +432,6 @@ TEST_P(InputByteStreamTest, inputStream) { } TEST_P(InputByteStreamTest, iobuf) { - if (!GetParam()) { - return; - } const auto streamSize = 4096; std::vector byteRanges; std::uint8_t buffer[streamSize]; @@ -450,23 +447,19 @@ TEST_P(InputByteStreamTest, iobuf) { byteRanges.push_back(ByteRange{buffer2, streamSize, 0}); auto byteStream = createStream(byteRanges); - auto bufferStream = dynamic_cast(byteStream.get()); - for (int offset = 0; offset < streamSize;) { - auto iobuf = bufferStream->readBytes(streamSize / 8); - ASSERT_EQ(iobuf->computeChainDataLength(), streamSize / 8); - for (int i = 0; i < streamSize / 8; ++i, ++offset) { - ASSERT_EQ(iobuf->data()[i], offset % 256); - } - } - - for (int offset = 0; offset < streamSize;) { - auto iobuf = bufferStream->readBytes(streamSize / 8); - ASSERT_EQ(iobuf->computeChainDataLength(), streamSize / 8); - for (int i = 0; i < streamSize / 8; ++i, ++offset) { - ASSERT_EQ(iobuf->data()[i], offset % 13); + for (int offset = 0; offset < streamSize * 2;) { + const int readBytes = std::min(streamSize / 11, streamSize * 2 - offset); + auto iobuf = byteStream->readBytes(readBytes); + ASSERT_EQ(iobuf->computeChainDataLength(), readBytes); + for (int i = 0; i < readBytes; ++i, ++offset) { + if (offset < streamSize) { + ASSERT_EQ(iobuf->data()[i], offset % 256); + } else { + ASSERT_EQ(iobuf->data()[i], (offset - streamSize) % 13); + } } } - ASSERT_TRUE(bufferStream->atEnd()); + ASSERT_TRUE(byteStream->atEnd()); } TEST_P(InputByteStreamTest, emptyInputStreamError) { diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index c48568e4eb2d..fb11b4f26a59 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -4366,19 +4366,9 @@ std::unique_ptr uncompressStream( common::CompressionKind compressionKind, memory::MemoryPool& pool) { const auto codec = common::compressionKindToCodec(compressionKind); - if (dynamic_cast(source)) { - // If the source is a BufferInputStream, we can avoid copying the data - // by creating an IOBuf from the underlying buffer. - const auto bufferSource = dynamic_cast(source); - const auto iobuf = bufferSource->readBytes(header.compressedSize); - // Process chained uncompressed results IOBufs. - return codec->uncompress(iobuf.get(), header.uncompressedSize); - } - auto compressBuf = folly::IOBuf::create(header.compressedSize); - source->readBytes(compressBuf->writableData(), header.compressedSize); - compressBuf->append(header.compressedSize); + const auto iobuf = source->readBytes(header.compressedSize); // Process chained uncompressed results IOBufs. - return codec->uncompress(compressBuf.get(), header.uncompressedSize); + return codec->uncompress(iobuf.get(), header.uncompressedSize); } } // namespace