diff --git a/velox/dwio/common/SeekableInputStream.h b/velox/dwio/common/SeekableInputStream.h index ab402753b8c56..08a433e2dbd19 100644 --- a/velox/dwio/common/SeekableInputStream.h +++ b/velox/dwio/common/SeekableInputStream.h @@ -45,7 +45,7 @@ class SeekableInputStream : public google::protobuf::io::ZeroCopyInputStream { virtual bool SkipInt64(int64_t count) = 0; bool Skip(int32_t count) final override { - return SkipInt64(count); + VELOX_FAIL("Use SkipInt64 instead"); } void readFully(char* buffer, size_t bufferSize); diff --git a/velox/dwio/common/SelectiveColumnReader.h b/velox/dwio/common/SelectiveColumnReader.h index 438bb469bc0b5..b5219763b4473 100644 --- a/velox/dwio/common/SelectiveColumnReader.h +++ b/velox/dwio/common/SelectiveColumnReader.h @@ -675,9 +675,9 @@ class SelectiveColumnReader { // returned as the null flags of the vector in getValues(). bool returnReaderNulls_ = false; // Total writable bytes in 'rawStringBuffer_'. - int32_t rawStringSize_ = 0; + int64_t rawStringSize_ = 0; // Number of written bytes in 'rawStringBuffer_'. - uint32_t rawStringUsed_ = 0; + int64_t rawStringUsed_ = 0; // True if last read() added any nulls. bool anyNulls_ = false; diff --git a/velox/dwio/common/StreamUtil.h b/velox/dwio/common/StreamUtil.h index 3c8d774b43471..0a0a2b2b0e866 100644 --- a/velox/dwio/common/StreamUtil.h +++ b/velox/dwio/common/StreamUtil.h @@ -37,7 +37,7 @@ inline void skipBytes( return; } numBytes -= bufferEnd - bufferStart; - input->Skip(numBytes); + input->SkipInt64(numBytes); bufferStart = bufferEnd; } diff --git a/velox/dwio/dwrf/common/ByteRLE.cpp b/velox/dwio/dwrf/common/ByteRLE.cpp index c07f96f87c7c8..6d2793c484f0f 100644 --- a/velox/dwio/dwrf/common/ByteRLE.cpp +++ b/velox/dwio/dwrf/common/ByteRLE.cpp @@ -375,7 +375,7 @@ void ByteRleDecoder::skipBytes(size_t count) { count -= skipSize; } if (count > 0) { - inputStream_->Skip(count); + inputStream_->SkipInt64(count); } } diff --git a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp index ddd5283d1d4bc..f3cce0365b0ad 100644 --- a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp @@ -47,7 +47,7 @@ SelectiveStringDirectColumnReader::SelectiveStringDirectColumnReader( uint64_t SelectiveStringDirectColumnReader::skip(uint64_t numValues) { numValues = SelectiveColumnReader::skip(numValues); - dwio::common::ensureCapacity(lengths_, numValues, memoryPool_); + dwio::common::ensureCapacity(lengths_, numValues, memoryPool_); lengthDecoder_->nextLengths(lengths_->asMutable(), numValues); rawLengths_ = lengths_->as(); for (auto i = 0; i < numValues; ++i) { @@ -60,10 +60,10 @@ uint64_t SelectiveStringDirectColumnReader::skip(uint64_t numValues) { void SelectiveStringDirectColumnReader::extractCrossBuffers( const int32_t* lengths, - const int32_t* starts, + const int64_t* starts, int32_t rowIndex, int32_t numValues) { - int32_t current = 0; + int64_t current = 0; bool scatter = !outerNonNullRows_.empty(); for (auto i = 0; i < numValues; ++i) { auto gap = starts[i] - current; @@ -92,8 +92,8 @@ void SelectiveStringDirectColumnReader::extractCrossBuffers( } } -inline int32_t -rangeSum(const uint32_t* rows, int32_t start, int32_t begin, int32_t end) { +inline int64_t +rangeSum(const uint32_t* rows, int64_t start, int32_t begin, int32_t end) { for (auto i = begin; i < end; ++i) { start += rows[i]; } @@ -104,10 +104,10 @@ inline void SelectiveStringDirectColumnReader::makeSparseStarts( int32_t startRow, const int32_t* rows, int32_t numRows, - int32_t* starts) { + int64_t* starts) { auto previousRow = lengthIndex_; int32_t i = 0; - int32_t startOffset = 0; + int64_t startOffset = 0; for (; i < numRows; ++i) { int targetRow = rows[startRow + i]; startOffset = rangeSum(rawLengths_, startOffset, previousRow, targetRow); @@ -121,7 +121,7 @@ void SelectiveStringDirectColumnReader::extractNSparse( const int32_t* rows, int32_t row, int32_t numValues) { - int32_t starts[8]; + int64_t starts[8]; if (numValues == 8 && (outerNonNullRows_.empty() ? try8Consecutive(0, rows, row) : try8Consecutive(0, rows, row))) { @@ -224,7 +224,7 @@ bool SelectiveStringDirectColumnReader::try8ConsecutiveSmall( template inline bool SelectiveStringDirectColumnReader::try8Consecutive( - int32_t start, + int64_t start, const int32_t* rows, int32_t row) { // If we haven't read in a buffer yet, or there is not enough data left. This @@ -317,7 +317,7 @@ void SelectiveStringDirectColumnReader::extractSparse( numRows, 8, [&](int32_t row) { - int32_t start = rangeSum(rawLengths_, 0, lengthIndex_, rows[row]); + auto start = rangeSum(rawLengths_, 0, lengthIndex_, rows[row]); lengthIndex_ = rows[row]; auto lengths = reinterpret_cast(rawLengths_) + lengthIndex_; @@ -327,7 +327,7 @@ void SelectiveStringDirectColumnReader::extractSparse( : try8Consecutive(start, rows, row)) { return; } - int32_t starts[8]; + int64_t starts[8]; for (auto i = 0; i < 8; ++i) { starts[i] = start; start += lengths[i]; diff --git a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h index 4eb75fdd0f764..8da1e77401d27 100644 --- a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h @@ -68,7 +68,7 @@ class SelectiveStringDirectColumnReader void extractCrossBuffers( const int32_t* lengths, - const int32_t* starts, + const int64_t* starts, int32_t rowIndex, int32_t numValues); @@ -76,14 +76,14 @@ class SelectiveStringDirectColumnReader int32_t startRow, const int32_t* rows, int32_t numRows, - int32_t* starts); + int64_t* starts); inline void extractNSparse(const int32_t* rows, int32_t row, int numRows); void extractSparse(const int32_t* rows, int32_t numRows); template - bool try8Consecutive(int32_t start, const int32_t* rows, int32_t row); + bool try8Consecutive(int64_t start, const int32_t* rows, int32_t row); template bool diff --git a/velox/dwio/dwrf/reader/StripeMetadataCache.h b/velox/dwio/dwrf/reader/StripeMetadataCache.h index eaa9d671f9861..4a7a1125610dd 100644 --- a/velox/dwio/dwrf/reader/StripeMetadataCache.h +++ b/velox/dwio/dwrf/reader/StripeMetadataCache.h @@ -72,7 +72,7 @@ class StripeMetadataCache { auto clone = reinterpret_cast(input_.get()) ->clone(); - clone->Skip(offset); + clone->SkipInt64(offset); clone->setRemainingBytes(offsets_[index + 1] - offset); return clone; } diff --git a/velox/dwio/dwrf/reader/StripeStream.cpp b/velox/dwio/dwrf/reader/StripeStream.cpp index 0ad5d7fc5125d..3a243d7cad181 100644 --- a/velox/dwio/dwrf/reader/StripeStream.cpp +++ b/velox/dwio/dwrf/reader/StripeStream.cpp @@ -354,7 +354,7 @@ StripeStreamsImpl::getIndexStreamFromCache( const auto length = info.getLength(); if (auto* cacheInput = dynamic_cast(indexBase.get())) { - cacheInput->Skip(offset); + cacheInput->SkipInt64(offset); cacheInput->setRemainingBytes(length); return indexBase; } diff --git a/velox/dwio/dwrf/test/CacheInputTest.cpp b/velox/dwio/dwrf/test/CacheInputTest.cpp index f6c076cb993b5..c545bc6ee93ce 100644 --- a/velox/dwio/dwrf/test/CacheInputTest.cpp +++ b/velox/dwio/dwrf/test/CacheInputTest.cpp @@ -490,7 +490,7 @@ TEST_F(CacheTest, window) { // We make a second stream that ranges over a subset of the range of the first // one. auto clone = cacheInput->clone(); - clone->Skip(100); + clone->SkipInt64(100); clone->setRemainingBytes(kMB); auto previousRead = ioStats_->rawBytesRead(); EXPECT_TRUE(clone->Next(&buffer, &size)); diff --git a/velox/dwio/dwrf/test/ReaderTest.cpp b/velox/dwio/dwrf/test/ReaderTest.cpp index 1f9fac33ab0a0..fa2ee0259aa2b 100644 --- a/velox/dwio/dwrf/test/ReaderTest.cpp +++ b/velox/dwio/dwrf/test/ReaderTest.cpp @@ -2621,3 +2621,47 @@ TEST_F(TestReader, selectiveFlatMapFastPathAllInlinedStringKeys) { ASSERT_EQ(rowReader->next(10, batch), 2); assertEqualVectors(batch, row); } + +TEST_F(TestReader, skipLongString) { + // c0 in long_string.dwrf has 25 rows of 200,000,000 character long strings, + // whose values are repeated 'a' to 'y' respectively. + auto input = std::make_unique( + std::make_shared(getExampleFilePath("long_string.dwrf")), + *pool()); + dwio::common::ReaderOptions readerOpts(pool()); + readerOpts.setFileFormat(FileFormat::DWRF); + auto reader = DwrfReader::create(std::move(input), readerOpts); + auto spec = std::make_shared(""); + spec->addField("c0", 0); + spec->getOrCreateChild("c1")->setFilter( + std::make_unique(true, false)); + RowReaderOptions rowReaderOpts; + rowReaderOpts.setScanSpec(spec); + VectorPtr batch = BaseVector::create(ROW({"c0"}, {VARCHAR()}), 0, pool()); + auto validate = [](const VectorPtr& batch) { + ASSERT_EQ(batch->size(), 1); + auto string = batch->asChecked() + ->childAt(0) + ->loadedVector() + ->asChecked>() + ->valueAt(0); + ASSERT_EQ(string.size(), 200'000'000); + for (char c : string) { + ASSERT_EQ(c, 'y'); + } + }; + { + SCOPED_TRACE("Skip"); + auto rowReader = reader->createRowReader(rowReaderOpts); + ASSERT_EQ(rowReader->next(24, batch), 24); + ASSERT_EQ(batch->size(), 0); + ASSERT_EQ(rowReader->next(2, batch), 1); + validate(batch); + } + { + SCOPED_TRACE("Filter"); + auto rowReader = reader->createRowReader(rowReaderOpts); + ASSERT_EQ(rowReader->next(26, batch), 25); + validate(batch); + } +} diff --git a/velox/dwio/dwrf/test/TestDecompression.cpp b/velox/dwio/dwrf/test/TestDecompression.cpp index 8d7c9f5b80ab9..fcba33d80cfdc 100644 --- a/velox/dwio/dwrf/test/TestDecompression.cpp +++ b/velox/dwio/dwrf/test/TestDecompression.cpp @@ -157,14 +157,14 @@ TEST_F(DecompressionTest, testArraySkip) { ASSERT_EQ(true, stream.Next(&ptr, &len)); EXPECT_EQ(bytes.data(), static_cast(ptr)); EXPECT_EQ(20, len); - ASSERT_EQ(true, !stream.Skip(-10)); - ASSERT_EQ(true, stream.Skip(80)); + ASSERT_EQ(true, !stream.SkipInt64(-10)); + ASSERT_EQ(true, stream.SkipInt64(80)); ASSERT_EQ(true, stream.Next(&ptr, &len)); EXPECT_EQ(bytes.data() + 100, static_cast(ptr)); EXPECT_EQ(20, len); - ASSERT_EQ(true, stream.Skip(80)); + ASSERT_EQ(true, stream.SkipInt64(80)); ASSERT_EQ(true, !stream.Next(&ptr, &len)); - ASSERT_EQ(true, !stream.Skip(181)); + ASSERT_EQ(true, !stream.SkipInt64(181)); EXPECT_EQ("SeekableArrayInputStream 200 of 200", stream.getName()); } @@ -181,11 +181,11 @@ TEST_F(DecompressionTest, testArrayCombo) { EXPECT_EQ(20, len); stream.BackUp(10); EXPECT_EQ(10, stream.ByteCount()); - stream.Skip(4); + stream.SkipInt64(4); EXPECT_EQ(14, stream.ByteCount()); ASSERT_EQ(true, stream.Next(&ptr, &len)); EXPECT_EQ(bytes.data() + 14, static_cast(ptr)); - EXPECT_EQ(true, !stream.Skip(320)); + EXPECT_EQ(true, !stream.SkipInt64(320)); EXPECT_EQ(200, stream.ByteCount()); EXPECT_EQ(true, !stream.Next(&ptr, &len)); } @@ -234,14 +234,14 @@ TEST_F(DecompressionTest, testFileSkip) { ASSERT_EQ(true, stream.Next(&ptr, &len)); checkBytes(static_cast(ptr), len, 0); EXPECT_EQ(20, len); - ASSERT_EQ(true, !stream.Skip(-10)); - ASSERT_EQ(true, stream.Skip(80)); + ASSERT_EQ(true, !stream.SkipInt64(-10)); + ASSERT_EQ(true, stream.SkipInt64(80)); ASSERT_EQ(true, stream.Next(&ptr, &len)); checkBytes(static_cast(ptr), len, 100); EXPECT_EQ(20, len); - ASSERT_EQ(true, !stream.Skip(80)); + ASSERT_EQ(true, !stream.SkipInt64(80)); ASSERT_EQ(true, !stream.Next(&ptr, &len)); - ASSERT_EQ(true, !stream.Skip(181)); + ASSERT_EQ(true, !stream.SkipInt64(181)); EXPECT_EQ(std::string(simpleFile) + " from 0 for 200", stream.getName()); } @@ -254,11 +254,11 @@ TEST_F(DecompressionTest, testFileCombo) { EXPECT_EQ(20, len); stream.BackUp(10); EXPECT_EQ(10, stream.ByteCount()); - stream.Skip(4); + stream.SkipInt64(4); EXPECT_EQ(14, stream.ByteCount()); ASSERT_EQ(true, stream.Next(&ptr, &len)); checkBytes(static_cast(ptr), len, 14); - EXPECT_EQ(true, !stream.Skip(320)); + EXPECT_EQ(true, !stream.SkipInt64(320)); EXPECT_EQ(200, stream.ByteCount()); EXPECT_EQ(true, !stream.Next(&ptr, &len)); } @@ -600,7 +600,7 @@ TEST_F(DecompressionTest, testSkipZlib) { int32_t length; ASSERT_EQ(true, result->Next(&ptr, &length)); ASSERT_EQ(2, length); - result->Skip(2); + result->SkipInt64(2); ASSERT_EQ(true, result->Next(&ptr, &length)); ASSERT_EQ(3, length); EXPECT_EQ(4, static_cast(ptr)[0]); @@ -611,7 +611,7 @@ TEST_F(DecompressionTest, testSkipZlib) { ASSERT_EQ(2, length); EXPECT_EQ(5, static_cast(ptr)[0]); EXPECT_EQ(6, static_cast(ptr)[1]); - result->Skip(8); + result->SkipInt64(8); ASSERT_EQ(true, result->Next(&ptr, &length)); ASSERT_EQ(2, length); EXPECT_EQ(15, static_cast(ptr)[0]); @@ -773,8 +773,9 @@ TEST_F(DecompressionTest, testSkipSnappy) { const void* data; int32_t length; // skip 1/2; in 2 jumps - ASSERT_TRUE(result->Skip(static_cast(((N / 2) - 2) * sizeof(int)))); - ASSERT_TRUE(result->Skip(static_cast(2 * sizeof(int)))); + ASSERT_TRUE( + result->SkipInt64(static_cast(((N / 2) - 2) * sizeof(int)))); + ASSERT_TRUE(result->SkipInt64(static_cast(2 * sizeof(int)))); ASSERT_TRUE(result->Next(&data, &length)); ASSERT_EQ((N / 2) * sizeof(int), length); for (int32_t i = N / 2; i < N; ++i) { @@ -820,8 +821,8 @@ TEST_F(DecompressionTest, testDelayedSkip) { bufByteSize); const void* data; int32_t length; - ASSERT_TRUE(result->Skip(bufByteSize / 2)); - ASSERT_TRUE(result->Skip(bufByteSize / 2)); + ASSERT_TRUE(result->SkipInt64(bufByteSize / 2)); + ASSERT_TRUE(result->SkipInt64(bufByteSize / 2)); ASSERT_TRUE(result->Next(&data, &length)); ASSERT_EQ(length, bufByteSize); auto* dataAsInt = reinterpret_cast(data); diff --git a/velox/dwio/dwrf/test/examples/long_string.dwrf b/velox/dwio/dwrf/test/examples/long_string.dwrf new file mode 100644 index 0000000000000..6c5af38fd9208 Binary files /dev/null and b/velox/dwio/dwrf/test/examples/long_string.dwrf differ