Skip to content

Commit

Permalink
Fix integer overflow while skipping on a stream (facebookincubator#11477
Browse files Browse the repository at this point in the history
)

Summary: Pull Request resolved: facebookincubator#11477

Reviewed By: xiaoxmeng, spershin

Differential Revision: D65632868
  • Loading branch information
Yuhta authored and facebook-github-bot committed Nov 8, 2024
1 parent 31aa90c commit b92be4a
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 40 deletions.
2 changes: 1 addition & 1 deletion velox/dwio/common/SeekableInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class SeekableInputStream : public google::protobuf::io::ZeroCopyInputStream {
virtual bool SkipInt64(int64_t count) = 0;

bool Skip(int32_t count) final override {
return SkipInt64(count);
VELOX_FAIL("Use SkipInt64 instead: {}", count);
}

void readFully(char* buffer, size_t bufferSize);
Expand Down
4 changes: 2 additions & 2 deletions velox/dwio/common/SelectiveColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -675,9 +675,9 @@ class SelectiveColumnReader {
// returned as the null flags of the vector in getValues().
bool returnReaderNulls_ = false;
// Total writable bytes in 'rawStringBuffer_'.
int32_t rawStringSize_ = 0;
int64_t rawStringSize_ = 0;
// Number of written bytes in 'rawStringBuffer_'.
uint32_t rawStringUsed_ = 0;
int64_t rawStringUsed_ = 0;

// True if last read() added any nulls.
bool anyNulls_ = false;
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/StreamUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ inline void skipBytes(
return;
}
numBytes -= bufferEnd - bufferStart;
input->Skip(numBytes);
input->SkipInt64(numBytes);
bufferStart = bufferEnd;
}

Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/common/ByteRLE.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ void ByteRleDecoder::skipBytes(size_t count) {
count -= skipSize;
}
if (count > 0) {
inputStream_->Skip(count);
inputStream_->SkipInt64(count);
}
}

Expand Down
22 changes: 11 additions & 11 deletions velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ SelectiveStringDirectColumnReader::SelectiveStringDirectColumnReader(

uint64_t SelectiveStringDirectColumnReader::skip(uint64_t numValues) {
numValues = SelectiveColumnReader::skip(numValues);
dwio::common::ensureCapacity<int64_t>(lengths_, numValues, memoryPool_);
dwio::common::ensureCapacity<uint32_t>(lengths_, numValues, memoryPool_);
lengthDecoder_->nextLengths(lengths_->asMutable<int32_t>(), numValues);
rawLengths_ = lengths_->as<uint32_t>();
for (auto i = 0; i < numValues; ++i) {
Expand All @@ -60,10 +60,10 @@ uint64_t SelectiveStringDirectColumnReader::skip(uint64_t numValues) {

void SelectiveStringDirectColumnReader::extractCrossBuffers(
const int32_t* lengths,
const int32_t* starts,
const int64_t* starts,
int32_t rowIndex,
int32_t numValues) {
int32_t current = 0;
int64_t current = 0;
bool scatter = !outerNonNullRows_.empty();
for (auto i = 0; i < numValues; ++i) {
auto gap = starts[i] - current;
Expand Down Expand Up @@ -92,8 +92,8 @@ void SelectiveStringDirectColumnReader::extractCrossBuffers(
}
}

inline int32_t
rangeSum(const uint32_t* rows, int32_t start, int32_t begin, int32_t end) {
inline int64_t
rangeSum(const uint32_t* rows, int64_t start, int32_t begin, int32_t end) {
for (auto i = begin; i < end; ++i) {
start += rows[i];
}
Expand All @@ -104,10 +104,10 @@ inline void SelectiveStringDirectColumnReader::makeSparseStarts(
int32_t startRow,
const int32_t* rows,
int32_t numRows,
int32_t* starts) {
int64_t* starts) {
auto previousRow = lengthIndex_;
int32_t i = 0;
int32_t startOffset = 0;
int64_t startOffset = 0;
for (; i < numRows; ++i) {
int targetRow = rows[startRow + i];
startOffset = rangeSum(rawLengths_, startOffset, previousRow, targetRow);
Expand All @@ -121,7 +121,7 @@ void SelectiveStringDirectColumnReader::extractNSparse(
const int32_t* rows,
int32_t row,
int32_t numValues) {
int32_t starts[8];
int64_t starts[8];
if (numValues == 8 &&
(outerNonNullRows_.empty() ? try8Consecutive<false, true>(0, rows, row)
: try8Consecutive<true, true>(0, rows, row))) {
Expand Down Expand Up @@ -224,7 +224,7 @@ bool SelectiveStringDirectColumnReader::try8ConsecutiveSmall(

template <bool scatter, bool sparse>
inline bool SelectiveStringDirectColumnReader::try8Consecutive(
int32_t start,
int64_t start,
const int32_t* rows,
int32_t row) {
// If we haven't read in a buffer yet, or there is not enough data left. This
Expand Down Expand Up @@ -317,7 +317,7 @@ void SelectiveStringDirectColumnReader::extractSparse(
numRows,
8,
[&](int32_t row) {
int32_t start = rangeSum(rawLengths_, 0, lengthIndex_, rows[row]);
auto start = rangeSum(rawLengths_, 0, lengthIndex_, rows[row]);
lengthIndex_ = rows[row];
auto lengths =
reinterpret_cast<const int32_t*>(rawLengths_) + lengthIndex_;
Expand All @@ -327,7 +327,7 @@ void SelectiveStringDirectColumnReader::extractSparse(
: try8Consecutive<true, false>(start, rows, row)) {
return;
}
int32_t starts[8];
int64_t starts[8];
for (auto i = 0; i < 8; ++i) {
starts[i] = start;
start += lengths[i];
Expand Down
6 changes: 3 additions & 3 deletions velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,22 @@ class SelectiveStringDirectColumnReader

void extractCrossBuffers(
const int32_t* lengths,
const int32_t* starts,
const int64_t* starts,
int32_t rowIndex,
int32_t numValues);

inline void makeSparseStarts(
int32_t startRow,
const int32_t* rows,
int32_t numRows,
int32_t* starts);
int64_t* starts);

inline void extractNSparse(const int32_t* rows, int32_t row, int numRows);

void extractSparse(const int32_t* rows, int32_t numRows);

template <bool scatter, bool skip>
bool try8Consecutive(int32_t start, const int32_t* rows, int32_t row);
bool try8Consecutive(int64_t start, const int32_t* rows, int32_t row);

template <bool kScatter, bool kGreaterThan4>
bool
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/reader/StripeMetadataCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class StripeMetadataCache {

auto clone = reinterpret_cast<dwio::common::CacheInputStream*>(input_.get())
->clone();
clone->Skip(offset);
clone->SkipInt64(offset);
clone->setRemainingBytes(offsets_[index + 1] - offset);
return clone;
}
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/reader/StripeStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ StripeStreamsImpl::getIndexStreamFromCache(
const auto length = info.getLength();
if (auto* cacheInput =
dynamic_cast<dwio::common::CacheInputStream*>(indexBase.get())) {
cacheInput->Skip(offset);
cacheInput->SkipInt64(offset);
cacheInput->setRemainingBytes(length);
return indexBase;
}
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/test/CacheInputTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ TEST_F(CacheTest, window) {
// We make a second stream that ranges over a subset of the range of the first
// one.
auto clone = cacheInput->clone();
clone->Skip(100);
clone->SkipInt64(100);
clone->setRemainingBytes(kMB);
auto previousRead = ioStats_->rawBytesRead();
EXPECT_TRUE(clone->Next(&buffer, &size));
Expand Down
44 changes: 44 additions & 0 deletions velox/dwio/dwrf/test/ReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2621,3 +2621,47 @@ TEST_F(TestReader, selectiveFlatMapFastPathAllInlinedStringKeys) {
ASSERT_EQ(rowReader->next(10, batch), 2);
assertEqualVectors(batch, row);
}

TEST_F(TestReader, skipLongString) {
// c0 in long_string.dwrf has 25 rows of 200,000,000 character long strings,
// whose values are repeated 'a' to 'y' respectively.
auto input = std::make_unique<BufferedInput>(
std::make_shared<LocalReadFile>(getExampleFilePath("long_string.dwrf")),
*pool());
dwio::common::ReaderOptions readerOpts(pool());
readerOpts.setFileFormat(FileFormat::DWRF);
auto reader = DwrfReader::create(std::move(input), readerOpts);
auto spec = std::make_shared<common::ScanSpec>("<root>");
spec->addField("c0", 0);
spec->getOrCreateChild("c1")->setFilter(
std::make_unique<common::BoolValue>(true, false));
RowReaderOptions rowReaderOpts;
rowReaderOpts.setScanSpec(spec);
VectorPtr batch = BaseVector::create(ROW({"c0"}, {VARCHAR()}), 0, pool());
auto validate = [](const VectorPtr& batch) {
ASSERT_EQ(batch->size(), 1);
auto string = batch->asChecked<RowVector>()
->childAt(0)
->loadedVector()
->asChecked<SimpleVector<StringView>>()
->valueAt(0);
ASSERT_EQ(string.size(), 200'000'000);
for (char c : string) {
ASSERT_EQ(c, 'y');
}
};
{
SCOPED_TRACE("Skip");
auto rowReader = reader->createRowReader(rowReaderOpts);
ASSERT_EQ(rowReader->next(24, batch), 24);
ASSERT_EQ(batch->size(), 0);
ASSERT_EQ(rowReader->next(2, batch), 1);
validate(batch);
}
{
SCOPED_TRACE("Filter");
auto rowReader = reader->createRowReader(rowReaderOpts);
ASSERT_EQ(rowReader->next(26, batch), 25);
validate(batch);
}
}
37 changes: 19 additions & 18 deletions velox/dwio/dwrf/test/TestDecompression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,14 @@ TEST_F(DecompressionTest, testArraySkip) {
ASSERT_EQ(true, stream.Next(&ptr, &len));
EXPECT_EQ(bytes.data(), static_cast<const char*>(ptr));
EXPECT_EQ(20, len);
ASSERT_EQ(true, !stream.Skip(-10));
ASSERT_EQ(true, stream.Skip(80));
ASSERT_EQ(true, !stream.SkipInt64(-10));
ASSERT_EQ(true, stream.SkipInt64(80));
ASSERT_EQ(true, stream.Next(&ptr, &len));
EXPECT_EQ(bytes.data() + 100, static_cast<const char*>(ptr));
EXPECT_EQ(20, len);
ASSERT_EQ(true, stream.Skip(80));
ASSERT_EQ(true, stream.SkipInt64(80));
ASSERT_EQ(true, !stream.Next(&ptr, &len));
ASSERT_EQ(true, !stream.Skip(181));
ASSERT_EQ(true, !stream.SkipInt64(181));
EXPECT_EQ("SeekableArrayInputStream 200 of 200", stream.getName());
}

Expand All @@ -181,11 +181,11 @@ TEST_F(DecompressionTest, testArrayCombo) {
EXPECT_EQ(20, len);
stream.BackUp(10);
EXPECT_EQ(10, stream.ByteCount());
stream.Skip(4);
stream.SkipInt64(4);
EXPECT_EQ(14, stream.ByteCount());
ASSERT_EQ(true, stream.Next(&ptr, &len));
EXPECT_EQ(bytes.data() + 14, static_cast<const char*>(ptr));
EXPECT_EQ(true, !stream.Skip(320));
EXPECT_EQ(true, !stream.SkipInt64(320));
EXPECT_EQ(200, stream.ByteCount());
EXPECT_EQ(true, !stream.Next(&ptr, &len));
}
Expand Down Expand Up @@ -234,14 +234,14 @@ TEST_F(DecompressionTest, testFileSkip) {
ASSERT_EQ(true, stream.Next(&ptr, &len));
checkBytes(static_cast<const char*>(ptr), len, 0);
EXPECT_EQ(20, len);
ASSERT_EQ(true, !stream.Skip(-10));
ASSERT_EQ(true, stream.Skip(80));
ASSERT_EQ(true, !stream.SkipInt64(-10));
ASSERT_EQ(true, stream.SkipInt64(80));
ASSERT_EQ(true, stream.Next(&ptr, &len));
checkBytes(static_cast<const char*>(ptr), len, 100);
EXPECT_EQ(20, len);
ASSERT_EQ(true, !stream.Skip(80));
ASSERT_EQ(true, !stream.SkipInt64(80));
ASSERT_EQ(true, !stream.Next(&ptr, &len));
ASSERT_EQ(true, !stream.Skip(181));
ASSERT_EQ(true, !stream.SkipInt64(181));
EXPECT_EQ(std::string(simpleFile) + " from 0 for 200", stream.getName());
}

Expand All @@ -254,11 +254,11 @@ TEST_F(DecompressionTest, testFileCombo) {
EXPECT_EQ(20, len);
stream.BackUp(10);
EXPECT_EQ(10, stream.ByteCount());
stream.Skip(4);
stream.SkipInt64(4);
EXPECT_EQ(14, stream.ByteCount());
ASSERT_EQ(true, stream.Next(&ptr, &len));
checkBytes(static_cast<const char*>(ptr), len, 14);
EXPECT_EQ(true, !stream.Skip(320));
EXPECT_EQ(true, !stream.SkipInt64(320));
EXPECT_EQ(200, stream.ByteCount());
EXPECT_EQ(true, !stream.Next(&ptr, &len));
}
Expand Down Expand Up @@ -600,7 +600,7 @@ TEST_F(DecompressionTest, testSkipZlib) {
int32_t length;
ASSERT_EQ(true, result->Next(&ptr, &length));
ASSERT_EQ(2, length);
result->Skip(2);
result->SkipInt64(2);
ASSERT_EQ(true, result->Next(&ptr, &length));
ASSERT_EQ(3, length);
EXPECT_EQ(4, static_cast<const char*>(ptr)[0]);
Expand All @@ -611,7 +611,7 @@ TEST_F(DecompressionTest, testSkipZlib) {
ASSERT_EQ(2, length);
EXPECT_EQ(5, static_cast<const char*>(ptr)[0]);
EXPECT_EQ(6, static_cast<const char*>(ptr)[1]);
result->Skip(8);
result->SkipInt64(8);
ASSERT_EQ(true, result->Next(&ptr, &length));
ASSERT_EQ(2, length);
EXPECT_EQ(15, static_cast<const char*>(ptr)[0]);
Expand Down Expand Up @@ -773,8 +773,9 @@ TEST_F(DecompressionTest, testSkipSnappy) {
const void* data;
int32_t length;
// skip 1/2; in 2 jumps
ASSERT_TRUE(result->Skip(static_cast<int32_t>(((N / 2) - 2) * sizeof(int))));
ASSERT_TRUE(result->Skip(static_cast<int32_t>(2 * sizeof(int))));
ASSERT_TRUE(
result->SkipInt64(static_cast<int32_t>(((N / 2) - 2) * sizeof(int))));
ASSERT_TRUE(result->SkipInt64(static_cast<int32_t>(2 * sizeof(int))));
ASSERT_TRUE(result->Next(&data, &length));
ASSERT_EQ((N / 2) * sizeof(int), length);
for (int32_t i = N / 2; i < N; ++i) {
Expand Down Expand Up @@ -820,8 +821,8 @@ TEST_F(DecompressionTest, testDelayedSkip) {
bufByteSize);
const void* data;
int32_t length;
ASSERT_TRUE(result->Skip(bufByteSize / 2));
ASSERT_TRUE(result->Skip(bufByteSize / 2));
ASSERT_TRUE(result->SkipInt64(bufByteSize / 2));
ASSERT_TRUE(result->SkipInt64(bufByteSize / 2));
ASSERT_TRUE(result->Next(&data, &length));
ASSERT_EQ(length, bufByteSize);
auto* dataAsInt = reinterpret_cast<const int*>(data);
Expand Down
Binary file added velox/dwio/dwrf/test/examples/long_string.dwrf
Binary file not shown.

0 comments on commit b92be4a

Please sign in to comment.