diff --git a/velox/common/memory/tests/ByteStreamTest.cpp b/velox/common/memory/tests/ByteStreamTest.cpp index fde48baab388..0ba7f260aa0d 100644 --- a/velox/common/memory/tests/ByteStreamTest.cpp +++ b/velox/common/memory/tests/ByteStreamTest.cpp @@ -451,30 +451,19 @@ TEST_P(InputByteStreamTest, iobuf) { auto byteStream = createStream(byteRanges); auto bufferStream = dynamic_cast(byteStream.get()); - for (int offset = 0; offset < streamSize;) { - auto iobuf = bufferStream->readBytes(streamSize / 8); - ASSERT_EQ(iobuf->computeChainDataLength(), streamSize / 8); - for (int i = 0; i < streamSize / 8; ++i, ++offset) { - ASSERT_EQ(iobuf->data()[i], offset % 256); - } - } - - for (int offset = 0; offset < streamSize;) { - auto iobuf = bufferStream->readBytes(streamSize / 8); - ASSERT_EQ(iobuf->computeChainDataLength(), streamSize / 8); - for (int i = 0; i < streamSize / 8; ++i, ++offset) { - ASSERT_EQ(iobuf->data()[i], offset % 13); + for (int offset = 0; offset < streamSize * 2;) { + const int readBytes = std::min(streamSize / 11, streamSize * 2 - offset); + auto iobuf = bufferStream->readBytes(readBytes); + ASSERT_EQ(iobuf->computeChainDataLength(), readBytes); + for (int i = 0; i < readBytes; ++i, ++offset) { + if (offset < streamSize) { + ASSERT_EQ(iobuf->data()[i], offset % 256); + } else { + ASSERT_EQ(iobuf->data()[i], (offset - streamSize) % 13); + } } } - ASSERT_TRUE(bufferStream->atEnd()); -} - -TEST_P(InputByteStreamTest, emptyInputStreamError) { - if (GetParam()) { - VELOX_ASSERT_THROW(createStream({}), "Empty BufferInputStream"); - } else { - VELOX_ASSERT_THROW(createStream({}), "(0 vs. 0) Empty FileInputStream"); - } + ASSERT_TRUE(byteStream->atEnd()); } TEST_P(InputByteStreamTest, remainingSize) { diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 24631b93c0aa..85630b9dfab2 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -4348,13 +4348,11 @@ void readTopColumns( std::unique_ptr uncompressStream( ByteInputStream* source, const PrestoHeader& header, - common::CompressionKind compressionKind, - memory::MemoryPool& pool) { + common::CompressionKind compressionKind) { const auto codec = common::compressionKindToCodec(compressionKind); - if (dynamic_cast(source)) { + if (auto* bufferSource = dynamic_cast(source)) { // If the source is a BufferInputStream, we can avoid copying the data // by creating an IOBuf from the underlying buffer. - const auto bufferSource = dynamic_cast(source); const auto iobuf = bufferSource->readBytes(header.compressedSize); // Process chained uncompressed results IOBufs. return codec->uncompress(iobuf.get(), header.uncompressedSize); @@ -4418,7 +4416,7 @@ void PrestoVectorSerde::deserialize( readTopColumns(*source, type, pool, *result, resultOffset, prestoOptions); } else { auto uncompress = - uncompressStream(source, header, prestoOptions.compressionKind, *pool); + uncompressStream(source, header, prestoOptions.compressionKind); auto uncompressedSource = std::make_unique( byteRangesFromIOBuf(uncompress.get())); readTopColumns( diff --git a/velox/serializers/RowSerializer.h b/velox/serializers/RowSerializer.h index ae062b658fd7..e209e126f5a5 100644 --- a/velox/serializers/RowSerializer.h +++ b/velox/serializers/RowSerializer.h @@ -288,14 +288,7 @@ class RowDeserializer { if (header.compressed) { VELOX_DCHECK_NE( compressionKind, common::CompressionKind::CompressionKind_NONE); - auto compressBuf = folly::IOBuf::create(header.compressedSize); - source->readBytes(compressBuf->writableData(), header.compressedSize); - compressBuf->append(header.compressedSize); - - // Process chained uncompressed results IOBufs. - const auto codec = common::compressionKindToCodec(compressionKind); - uncompressedBuf = - codec->uncompress(compressBuf.get(), header.uncompressedSize); + uncompressedBuf = uncompressStream(source, header, compressionKind); } std::unique_ptr uncompressedStream; @@ -352,6 +345,25 @@ class RowDeserializer { rowBuffer.append(rowFragment.data(), rowFragment.size()); } } + + static std::unique_ptr uncompressStream( + ByteInputStream* source, + const detail::RowHeader& header, + common::CompressionKind compressionKind) { + const auto codec = common::compressionKindToCodec(compressionKind); + if (auto* bufferSource = dynamic_cast(source)) { + // If the source is a BufferInputStream, we can avoid copying the data + // by creating an IOBuf from the underlying buffer. + const auto iobuf = bufferSource->readBytes(header.compressedSize); + // Process chained uncompressed results IOBufs. + return codec->uncompress(iobuf.get(), header.uncompressedSize); + } + auto compressBuf = folly::IOBuf::create(header.compressedSize); + source->readBytes(compressBuf->writableData(), header.compressedSize); + compressBuf->append(header.compressedSize); + // Process chained uncompressed results IOBufs. + return codec->uncompress(compressBuf.get(), header.uncompressedSize); + } }; } // namespace facebook::velox::serializer