From 08528e3231e60be1849c87b13a6fe679ec24789e Mon Sep 17 00:00:00 2001 From: Daniel Munoz Date: Wed, 1 May 2024 08:21:32 -0700 Subject: [PATCH] Create UnitLoader::onSeek Summary: https://github.com/facebookincubator/velox/pull/9656 So loaders can prefetch the stripe. Reviewed By: Yuhta, helfman Differential Revision: D56681720 fbshipit-source-id: 9aa9ea1bc014ffe074260c48ba90883cb8824820 --- velox/dwio/common/OnDemandUnitLoader.cpp | 16 +++-- velox/dwio/common/UnitLoader.h | 5 ++ .../common/tests/OnDemandUnitLoaderTests.cpp | 66 +++++++++++++++++++ .../tests/utils/UnitLoaderTestTools.cpp | 30 +++++++-- .../common/tests/utils/UnitLoaderTestTools.h | 2 + velox/dwio/dwrf/reader/DwrfReader.cpp | 3 + 6 files changed, 113 insertions(+), 9 deletions(-) diff --git a/velox/dwio/common/OnDemandUnitLoader.cpp b/velox/dwio/common/OnDemandUnitLoader.cpp index a15998002cc8..ee21a2b44233 100644 --- a/velox/dwio/common/OnDemandUnitLoader.cpp +++ b/velox/dwio/common/OnDemandUnitLoader.cpp @@ -58,10 +58,18 @@ class OnDemandUnitLoader : public UnitLoader { return *loadUnits_[unit]; } - void onRead( - uint32_t /* unit */, - uint64_t /* rowOffsetInUnit */, - uint64_t /* rowCount */) override {} + void onRead(uint32_t unit, uint64_t rowOffsetInUnit, uint64_t /* rowCount */) + override { + VELOX_CHECK_LT(unit, loadUnits_.size(), "Unit out of range"); + VELOX_CHECK_LT( + rowOffsetInUnit, loadUnits_[unit]->getNumRows(), "Row out of range"); + } + + void onSeek(uint32_t unit, uint64_t rowOffsetInUnit) override { + VELOX_CHECK_LT(unit, loadUnits_.size(), "Unit out of range"); + VELOX_CHECK_LE( + rowOffsetInUnit, loadUnits_[unit]->getNumRows(), "Row out of range"); + } private: std::vector> loadUnits_; diff --git a/velox/dwio/common/UnitLoader.h b/velox/dwio/common/UnitLoader.h index f536a2d5eef1..3ea9653d521f 100644 --- a/velox/dwio/common/UnitLoader.h +++ b/velox/dwio/common/UnitLoader.h @@ -49,8 +49,13 @@ class UnitLoader { virtual LoadUnit& getLoadedUnit(uint32_t unit) = 0; // Reader reports progress calling this method + // The call must be done **after** getLoadedUnit for unit virtual void onRead(uint32_t unit, uint64_t rowOffsetInUnit, uint64_t rowCount) = 0; + + // Reader reports seek calling this method. + // The call must be done **before** getLoadedUnit for the new unit + virtual void onSeek(uint32_t unit, uint64_t rowOffsetInUnit) = 0; }; class UnitLoaderFactory { diff --git a/velox/dwio/common/tests/OnDemandUnitLoaderTests.cpp b/velox/dwio/common/tests/OnDemandUnitLoaderTests.cpp index 4775303057cb..492b2517712b 100644 --- a/velox/dwio/common/tests/OnDemandUnitLoaderTests.cpp +++ b/velox/dwio/common/tests/OnDemandUnitLoaderTests.cpp @@ -95,6 +95,72 @@ TEST(OnDemandUnitLoaderTests, LoadsCorrectlyWithNoCallback) { EXPECT_EQ(readerMock.unitsLoaded(), std::vector({false, false, true})); } +TEST(OnDemandUnitLoaderTests, CanSeek) { + size_t blockedOnIoCount = 0; + OnDemandUnitLoaderFactory factory([&](auto) { ++blockedOnIoCount; }); + ReaderMock readerMock{{10, 20, 30}, {0, 0, 0}, factory}; + EXPECT_EQ(readerMock.unitsLoaded(), std::vector({false, false, false})); + EXPECT_EQ(blockedOnIoCount, 0); + + EXPECT_NO_THROW(readerMock.seek(10);); + + EXPECT_TRUE(readerMock.read(3)); // Unit: 1, rows: 0-2, load(1) + EXPECT_EQ(readerMock.unitsLoaded(), std::vector({false, true, false})); + EXPECT_EQ(blockedOnIoCount, 1); + + EXPECT_NO_THROW(readerMock.seek(0);); + + EXPECT_TRUE(readerMock.read(3)); // Unit: 0, rows: 0-2, load(0), unload(1) + EXPECT_EQ(readerMock.unitsLoaded(), std::vector({true, false, false})); + EXPECT_EQ(blockedOnIoCount, 2); + + EXPECT_NO_THROW(readerMock.seek(30);); + + EXPECT_TRUE(readerMock.read(3)); // Unit: 2, rows: 0-2, load(2), unload(0) + EXPECT_EQ(readerMock.unitsLoaded(), std::vector({false, false, true})); + EXPECT_EQ(blockedOnIoCount, 3); + + EXPECT_NO_THROW(readerMock.seek(5);); + + EXPECT_TRUE(readerMock.read(5)); // Unit: 0, rows: 5-9, load(0), unload(1) + EXPECT_EQ(readerMock.unitsLoaded(), std::vector({true, false, false})); + EXPECT_EQ(blockedOnIoCount, 4); +} + +TEST(OnDemandUnitLoaderTests, SeekOutOfRangeReaderError) { + size_t blockedOnIoCount = 0; + OnDemandUnitLoaderFactory factory([&](auto) { ++blockedOnIoCount; }); + ReaderMock readerMock{{10, 20, 30}, {0, 0, 0}, factory}; + EXPECT_EQ(readerMock.unitsLoaded(), std::vector({false, false, false})); + EXPECT_EQ(blockedOnIoCount, 0); + readerMock.seek(59); + + readerMock.seek(60); + + EXPECT_THAT( + [&]() { readerMock.seek(61); }, + Throws(Property( + &facebook::velox::VeloxRuntimeError::message, + HasSubstr("Can't seek to possition 61 in file. Must be up to 60.")))); +} + +TEST(OnDemandUnitLoaderTests, SeekOutOfRange) { + OnDemandUnitLoaderFactory factory(nullptr); + std::vector unitsLoaded(getUnitsLoadedWithFalse(1)); + std::vector> units; + units.push_back(std::make_unique(10, 0, unitsLoaded, 0)); + + auto unitLoader = factory.create(std::move(units)); + + unitLoader->onSeek(0, 10); + + EXPECT_THAT( + [&]() { unitLoader->onSeek(0, 11); }, + Throws(Property( + &facebook::velox::VeloxRuntimeError::message, + HasSubstr("Row out of range")))); +} + TEST(OnDemandUnitLoaderTests, UnitOutOfRange) { OnDemandUnitLoaderFactory factory(nullptr); std::vector unitsLoaded(getUnitsLoadedWithFalse(1)); diff --git a/velox/dwio/common/tests/utils/UnitLoaderTestTools.cpp b/velox/dwio/common/tests/utils/UnitLoaderTestTools.cpp index 716327e30149..ed1e9417d48b 100644 --- a/velox/dwio/common/tests/utils/UnitLoaderTestTools.cpp +++ b/velox/dwio/common/tests/utils/UnitLoaderTestTools.cpp @@ -42,6 +42,28 @@ bool ReaderMock::read(uint64_t maxRows) { return true; } +void ReaderMock::seek(uint64_t rowNumber) { + uint64_t totalRows = 0; + uint64_t rowsLeft = rowNumber; + for (size_t unit = 0; unit < rowsPerUnit_.size(); ++unit) { + const uint64_t rowCount = rowsPerUnit_[unit]; + if (rowsLeft < rowCount) { + currentUnit_ = unit; + currentRowInUnit_ = rowsLeft; + loader_->onSeek(currentUnit_, currentRowInUnit_); + return; + } + rowsLeft -= rowCount; + totalRows += rowCount; + } + VELOX_CHECK_EQ( + rowsLeft, + 0, + "Can't seek to possition {} in file. Must be up to {}.", + rowNumber, + totalRows); +} + bool ReaderMock::loadUnit() { VELOX_CHECK(currentRowInUnit_ <= rowsPerUnit_[currentUnit_]); if (currentRowInUnit_ == rowsPerUnit_[currentUnit_]) { @@ -51,11 +73,9 @@ bool ReaderMock::loadUnit() { return false; } } - if (currentRowInUnit_ == 0) { - auto& unit = loader_->getLoadedUnit(currentUnit_); - auto& unitMock = dynamic_cast(unit); - VELOX_CHECK(unitMock.isLoaded()); - } + auto& unit = loader_->getLoadedUnit(currentUnit_); + auto& unitMock = dynamic_cast(unit); + VELOX_CHECK(unitMock.isLoaded()); return true; } diff --git a/velox/dwio/common/tests/utils/UnitLoaderTestTools.h b/velox/dwio/common/tests/utils/UnitLoaderTestTools.h index e7760e75fd49..f606d7db71f1 100644 --- a/velox/dwio/common/tests/utils/UnitLoaderTestTools.h +++ b/velox/dwio/common/tests/utils/UnitLoaderTestTools.h @@ -78,6 +78,8 @@ class ReaderMock { bool read(uint64_t maxRows); + void seek(uint64_t rowNumber); + std::vector unitsLoaded() const { return {unitsLoaded_.begin(), unitsLoaded_.end()}; } diff --git a/velox/dwio/dwrf/reader/DwrfReader.cpp b/velox/dwio/dwrf/reader/DwrfReader.cpp index 258c09bb7f1b..483bde0be4a0 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.cpp +++ b/velox/dwio/dwrf/reader/DwrfReader.cpp @@ -357,6 +357,9 @@ uint64_t DwrfRowReader::seekToRow(uint64_t rowNumber) { currentRowInStripe_ = rowNumber - firstRowOfStripe_[currentStripe_]; previousRow_ = rowNumber; + const auto loadUnitIdx = currentStripe_ - firstStripe_; + unitLoader_->onSeek(loadUnitIdx, currentRowInStripe_); + if (currentStripe_ != previousStripe) { // Different stripe. Let's load the new stripe. currentUnit_ = nullptr;