Skip to content

Commit

Permalink
Fix bug in DwrfReader prefetch (facebookincubator#6726)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookincubator#6726

What was happening:

- Each prefetch would load with `currentIndex` instead of the stripe index associated with prefetch
- This index was used for getting index streams from metadata cache, so in the case of there being an index stream in the cache, this wrong stream index would be used to load an incorrect stream

This diff fixes that, and adds a regression test.

Reviewed By: Yuhta

Differential Revision: D49419959

fbshipit-source-id: 375337f63f5216d18a40190bf6cf131f1aaeb0b9
  • Loading branch information
Patrick Sullivan authored and facebook-github-bot committed Sep 27, 2023
1 parent 42bb236 commit 4e3a709
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 1 deletion.
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/reader/DwrfReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ DwrfRowReader::FetchResult DwrfRowReader::fetch(uint32_t stripeIndex) {
options_,
getReader().getFooter().stripes(stripeIndex).offset(),
*this,
currentStripe);
stripeIndex);

auto scanSpec = options_.getScanSpec().get();
auto requestedType = getColumnSelector().getSchemaWithId();
Expand Down
115 changes: 115 additions & 0 deletions velox/dwio/dwrf/test/ReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,87 @@ void verifyFlatMapReading(
verifyFlatMapReading(rowReader, seeks, expectedBatchSize, numBatches);
}

/*
Verifies contents of dict_encoded_strings.orc
schema:
struct {
int_column int,
string_column string
string_column_2 string
}
*/
void verifyCachedIndexStreamReads(DwrfRowReader* rowReader) {
VectorPtr batch;
// Stripe 1
ASSERT_TRUE(rowReader->next(100, batch));
auto root = batch->as<RowVector>();
EXPECT_EQ(root->childrenSize(), 4);
auto stringCol1 = root->childAt(1)->as<SimpleVector<StringView>>();
auto stringCol2 = root->childAt(2)->as<SimpleVector<StringView>>();

for (int i = 0; i < 50; i++) {
ASSERT_EQ(stringCol1->valueAt(i), "baz");
ASSERT_EQ(stringCol2->valueAt(i), "abcdefghijklmnop");
}

ASSERT_EQ(stringCol1->valueAt(50), "zax");
ASSERT_EQ(stringCol2->valueAt(50), "unique");

ASSERT_EQ(stringCol1->valueAt(51), "zax");
ASSERT_EQ(stringCol2->valueAt(51), "different");

ASSERT_EQ(stringCol1->valueAt(52), "zax");
ASSERT_EQ(stringCol2->valueAt(52), "special");

for (int i = 53; i < 100; i++) {
ASSERT_EQ(stringCol1->valueAt(i), "baz");
ASSERT_EQ(stringCol2->valueAt(i), "abcdefghijklmnop");
}

// // Stripe 2
ASSERT_TRUE(rowReader->next(100, batch));
root = batch->as<RowVector>();
EXPECT_EQ(root->childrenSize(), 4);
stringCol1 = root->childAt(1)->as<SimpleVector<StringView>>();
stringCol2 = root->childAt(2)->as<SimpleVector<StringView>>();

for (int i = 0; i < 50; i++) {
ASSERT_EQ(stringCol1->valueAt(i), "ee");
ASSERT_EQ(stringCol2->valueAt(i), "pomelo");
}

ASSERT_EQ(stringCol1->valueAt(50), "craz");
ASSERT_EQ(stringCol2->valueAt(50), "unique");

ASSERT_EQ(stringCol1->valueAt(51), "doop");
ASSERT_EQ(stringCol2->valueAt(51), "different");

ASSERT_EQ(stringCol1->valueAt(52), "hello");
ASSERT_EQ(stringCol2->valueAt(52), "special");

for (int i = 53; i < 100; i++) {
ASSERT_EQ(stringCol1->valueAt(i), "baz");
ASSERT_EQ(stringCol2->valueAt(i), "pomelo");
}

// Stripe 3
ASSERT_TRUE(rowReader->next(100, batch));
root = batch->as<RowVector>();
ASSERT_EQ(root->size(), 3);
EXPECT_EQ(root->childrenSize(), 4);
stringCol1 = root->childAt(1)->as<SimpleVector<StringView>>();
stringCol2 = root->childAt(2)->as<SimpleVector<StringView>>();

ASSERT_EQ(stringCol1->valueAt(0), "craz");
ASSERT_EQ(stringCol2->valueAt(0), "dog");

ASSERT_EQ(stringCol1->valueAt(1), "doop");
ASSERT_EQ(stringCol2->valueAt(1), "cat");

ASSERT_EQ(stringCol1->valueAt(2), "hello");
ASSERT_EQ(stringCol2->valueAt(2), "chicken");
}

class TestFlatMapReader : public TestWithParam<bool> {};

TEST_P(TestFlatMapReader, testReadFlatMapEmptyMap) {
Expand Down Expand Up @@ -592,6 +673,40 @@ TEST(TestRowReaderPrefetch, testParallelPrefetchNoPreload) {
expectedBatchSize.size());
}

TEST(TestRowReaderPrefetch, prefetchWithCachedIndexStream) {
ReaderOptions readerOpts{getDefaultPool().get()};
readerOpts.setFilePreloadThreshold(0);
readerOpts.setDirectorySizeGuess(4);
RowReaderOptions rowReaderOpts;

std::shared_ptr<const RowType> requestedType = std::dynamic_pointer_cast<
const RowType>(HiveTypeParser().parse(
"struct<int_column:int,string_column:string,string_column_2:string,ds:string>"));
rowReaderOpts.select(std::make_shared<ColumnSelector>(requestedType));
rowReaderOpts.setEagerFirstStripeLoad(false);

auto reader = DwrfReader::create(
createFileBufferedInput(
getExampleFilePath("dict_encoded_strings.orc"),
readerOpts.getMemoryPool()),
readerOpts);
auto rowReaderOwner = reader->createRowReader(rowReaderOpts);
auto rowReader = dynamic_cast<DwrfRowReader*>(rowReaderOwner.get());

auto units = rowReader->prefetchUnits().value();
std::vector<DwrfRowReader::FetchResult> prefetches;

prefetches.reserve(1);
for (int i = 0; i < 3; i++) {
prefetches.emplace_back(units[i].prefetch());
}

for (auto& fetchResult : prefetches) {
ASSERT_EQ(DwrfRowReader::FetchResult::kFetched, fetchResult);
}
verifyCachedIndexStreamReads(rowReader);
}

// This test just verifies read correctness with the eager first stripe load
// config off for regression purposes. It does not ensure the first stripe is
// not loaded before we explicitly prefetch or start reading.
Expand Down
Binary file not shown.

0 comments on commit 4e3a709

Please sign in to comment.