diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index b0ccb52c22c0..06b705d7b023 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -43,11 +43,14 @@ class DataSource; // as a RowVectorPtr, potentially after processing pushdowns. struct ConnectorSplit { const std::string connectorId; + const int64_t splitWeight{0}; std::unique_ptr> dataSource; - explicit ConnectorSplit(const std::string& _connectorId) - : connectorId(_connectorId) {} + explicit ConnectorSplit( + const std::string& _connectorId, + int64_t _splitWeight = 0) + : connectorId(_connectorId), splitWeight(_splitWeight) {} virtual ~ConnectorSplit() {} diff --git a/velox/connectors/hive/HiveConnectorSplit.h b/velox/connectors/hive/HiveConnectorSplit.h index 15be998884b0..10fa9206ec2d 100644 --- a/velox/connectors/hive/HiveConnectorSplit.h +++ b/velox/connectors/hive/HiveConnectorSplit.h @@ -50,8 +50,9 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { std::optional _tableBucketNumber = std::nullopt, const std::unordered_map& _customSplitInfo = {}, const std::shared_ptr& _extraFileInfo = {}, - const std::unordered_map& _serdeParameters = {}) - : ConnectorSplit(connectorId), + const std::unordered_map& _serdeParameters = {}, + int64_t _splitWeight = 0) + : ConnectorSplit(connectorId, _splitWeight), filePath(_filePath), fileFormat(_fileFormat), start(_start), diff --git a/velox/exec/Exchange.cpp b/velox/exec/Exchange.cpp index 775d79613191..6f09aaf5a9dd 100644 --- a/velox/exec/Exchange.cpp +++ b/velox/exec/Exchange.cpp @@ -50,7 +50,7 @@ bool Exchange::getSplits(ContinueFuture* future) { noMoreSplits_ = true; if (atEnd_) { operatorCtx_->task()->multipleSplitsFinished( - stats_.rlock()->numSplits); + false, stats_.rlock()->numSplits, 0); recordExchangeClientStats(); } return false; @@ -83,7 +83,7 @@ BlockingReason Exchange::isBlocked(ContinueFuture* future) { if (!currentPages_.empty() || atEnd_) { if (atEnd_ && noMoreSplits_) { const auto numSplits = stats_.rlock()->numSplits; - operatorCtx_->task()->multipleSplitsFinished(numSplits); + operatorCtx_->task()->multipleSplitsFinished(false, numSplits, 0); } recordExchangeClientStats(); return BlockingReason::kNotBlocked; diff --git a/velox/exec/Merge.cpp b/velox/exec/Merge.cpp index a5ef800232f3..dcca6d28e2c4 100644 --- a/velox/exec/Merge.cpp +++ b/velox/exec/Merge.cpp @@ -355,7 +355,7 @@ BlockingReason MergeExchange::addMergeSources(ContinueFuture* future) { } // TODO Delay this call until all input data has been processed. operatorCtx_->task()->multipleSplitsFinished( - remoteSourceTaskIds_.size()); + false, remoteSourceTaskIds_.size(), 0); return BlockingReason::kNotBlocked; } } else { diff --git a/velox/exec/TableScan.cpp b/velox/exec/TableScan.cpp index d23c0c5f15fa..08ec4129f5af 100644 --- a/velox/exec/TableScan.cpp +++ b/velox/exec/TableScan.cpp @@ -127,8 +127,13 @@ RowVectorPtr TableScan::getOutput() { } const auto& connectorSplit = split.connectorSplit; + currentSplitWeight_ = connectorSplit->splitWeight; needNewSplit_ = false; + // A point for test code injection. + TestValue::adjust( + "facebook::velox::exec::TableScan::getOutput::gotSplit", this); + VELOX_CHECK_EQ( connector_->connectorId(), connectorSplit->connectorId, @@ -261,7 +266,7 @@ RowVectorPtr TableScan::getOutput() { } curStatus_ = "getOutput: task->splitFinished"; - driverCtx_->task->splitFinished(); + driverCtx_->task->splitFinished(true, currentSplitWeight_); needNewSplit_ = true; } } diff --git a/velox/exec/TableScan.h b/velox/exec/TableScan.h index debd8f8d8b38..42f14eb52baf 100644 --- a/velox/exec/TableScan.h +++ b/velox/exec/TableScan.h @@ -80,6 +80,7 @@ class TableScan : public SourceOperator { memory::MemoryPool* const connectorPool_; ContinueFuture blockingFuture_{ContinueFuture::makeEmpty()}; BlockingReason blockingReason_; + int64_t currentSplitWeight_{0}; bool needNewSplit_ = true; std::shared_ptr connector_; std::shared_ptr connectorQueryCtx_; diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index ced22d540cc6..35b64ffcc533 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -134,7 +134,8 @@ void buildSplitStates( // Not all leaf nodes require splits. ValuesNode doesn't. Check if this plan // node requires splits. if (planNode->requiresSplits()) { - splitStateMap[planNode->id()]; + splitStateMap[planNode->id()].sourceIsTableScan = + (dynamic_cast(planNode) != nullptr); } return; } @@ -1220,6 +1221,11 @@ std::unique_ptr Task::addSplitLocked( if (split.connectorSplit) { VELOX_CHECK_NULL(split.connectorSplit->dataSource); + if (splitsState.sourceIsTableScan) { + ++taskStats_.numQueuedTableScanSplits; + taskStats_.queuedTableScanSplitWeights += + split.connectorSplit->splitWeight; + } } if (!split.hasGroup()) { @@ -1359,10 +1365,13 @@ BlockingReason Task::getSplitOrFuture( exec::Split& split, ContinueFuture& future, int32_t maxPreloadSplits, - std::function)> preload) { + const std::function)>& + preload) { std::lock_guard l(mutex_); + auto& splitsState = getPlanNodeSplitsStateLocked(planNodeId); return getSplitOrFutureLocked( - getPlanNodeSplitsStateLocked(planNodeId).groupSplitsStores[splitGroupId], + splitsState.sourceIsTableScan, + splitsState.groupSplitsStores[splitGroupId], split, future, maxPreloadSplits, @@ -1370,11 +1379,13 @@ BlockingReason Task::getSplitOrFuture( } BlockingReason Task::getSplitOrFutureLocked( + bool forTableScan, SplitsStore& splitsStore, exec::Split& split, ContinueFuture& future, int32_t maxPreloadSplits, - std::function)> preload) { + const std::function)>& + preload) { if (splitsStore.splits.empty()) { if (splitsStore.noMoreSplits) { return BlockingReason::kNotBlocked; @@ -1386,23 +1397,26 @@ BlockingReason Task::getSplitOrFutureLocked( return BlockingReason::kWaitForSplit; } - split = getSplitLocked(splitsStore, maxPreloadSplits, preload); + split = getSplitLocked(forTableScan, splitsStore, maxPreloadSplits, preload); return BlockingReason::kNotBlocked; } exec::Split Task::getSplitLocked( + bool forTableScan, SplitsStore& splitsStore, int32_t maxPreloadSplits, - std::function)> preload) { + const std::function)>& + preload) { int32_t readySplitIndex = -1; if (maxPreloadSplits) { for (auto i = 0; i < splitsStore.splits.size() && i < maxPreloadSplits; ++i) { - auto& split = splitsStore.splits[i].connectorSplit; - if (!split->dataSource) { + auto& connectorSplit = splitsStore.splits[i].connectorSplit; + if (!connectorSplit->dataSource) { // Initializes split->dataSource - preload(split); - } else if (readySplitIndex == -1 && split->dataSource->hasValue()) { + preload(connectorSplit); + } else if ( + readySplitIndex == -1 && connectorSplit->dataSource->hasValue()) { readySplitIndex = i; } } @@ -1416,6 +1430,13 @@ exec::Split Task::getSplitLocked( --taskStats_.numQueuedSplits; ++taskStats_.numRunningSplits; + if (forTableScan && split.connectorSplit) { + --taskStats_.numQueuedTableScanSplits; + ++taskStats_.numRunningTableScanSplits; + taskStats_.queuedTableScanSplitWeights -= split.connectorSplit->splitWeight; + taskStats_.runningTableScanSplitWeights += + split.connectorSplit->splitWeight; + } taskStats_.lastSplitStartTimeMs = getCurrentTimeMs(); if (taskStats_.firstSplitStartTimeMs == 0) { taskStats_.firstSplitStartTimeMs = taskStats_.lastSplitStartTimeMs; @@ -1424,19 +1445,30 @@ exec::Split Task::getSplitLocked( return split; } -void Task::splitFinished() { +void Task::splitFinished(bool fromTableScan, int64_t splitWeight) { std::lock_guard l(mutex_); ++taskStats_.numFinishedSplits; --taskStats_.numRunningSplits; + if (fromTableScan) { + --taskStats_.numRunningTableScanSplits; + taskStats_.runningTableScanSplitWeights -= splitWeight; + } if (isAllSplitsFinishedLocked()) { taskStats_.executionEndTimeMs = getCurrentTimeMs(); } } -void Task::multipleSplitsFinished(int32_t numSplits) { +void Task::multipleSplitsFinished( + bool fromTableScan, + int32_t numSplits, + int64_t splitsWeight) { std::lock_guard l(mutex_); taskStats_.numFinishedSplits += numSplits; taskStats_.numRunningSplits -= numSplits; + if (fromTableScan) { + taskStats_.numRunningTableScanSplits -= numSplits; + taskStats_.runningTableScanSplitWeights -= splitsWeight; + } if (isAllSplitsFinishedLocked()) { taskStats_.executionEndTimeMs = getCurrentTimeMs(); } @@ -1847,7 +1879,8 @@ ContinueFuture Task::terminate(TaskState terminalState) { std::vector splits; for (auto& [groupId, store] : splitState.groupSplitsStores) { while (!store.splits.empty()) { - splits.emplace_back(getSplitLocked(store, 0, nullptr)); + splits.emplace_back(getSplitLocked( + splitState.sourceIsTableScan, store, 0, nullptr)); } } if (!splits.empty()) { diff --git a/velox/exec/Task.h b/velox/exec/Task.h index cb4a8507f13a..a8ed9ddc775d 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -360,12 +360,15 @@ class Task : public std::enable_shared_from_this { exec::Split& split, ContinueFuture& future, int32_t maxPreloadSplits = 0, - std::function)> preload = - nullptr); + const std::function)>& + preload = nullptr); - void splitFinished(); + void splitFinished(bool fromTableScan, int64_t splitWeight); - void multipleSplitsFinished(int32_t numSplits); + void multipleSplitsFinished( + bool fromTableScan, + int32_t numSplits, + int64_t splitsWeight); /// Adds a MergeSource for the specified splitGroupId and planNodeId. std::shared_ptr addLocalMergeSource( @@ -784,19 +787,22 @@ class Task : public std::enable_shared_from_this { /// Retrieve a split or split future from the given split store structure. BlockingReason getSplitOrFutureLocked( + bool forTableScan, SplitsStore& splitsStore, exec::Split& split, ContinueFuture& future, int32_t maxPreloadSplits = 0, - std::function)> preload = - nullptr); + const std::function)>& + preload = nullptr); /// Returns next split from the store. The caller must ensure the store is not /// empty. exec::Split getSplitLocked( + bool forTableScan, SplitsStore& splitsStore, int32_t maxPreloadSplits, - std::function)> preload); + const std::function)>& + preload); // Creates for the given split group and fills up the 'SplitGroupState' // structure, which stores inter-operator state (local exchange, bridges). diff --git a/velox/exec/TaskStats.h b/velox/exec/TaskStats.h index be5475b37788..85e7690d6c50 100644 --- a/velox/exec/TaskStats.h +++ b/velox/exec/TaskStats.h @@ -52,6 +52,12 @@ struct TaskStats { int32_t numQueuedSplits{0}; std::unordered_set completedSplitGroups; + /// Table scan split stats. + int32_t numRunningTableScanSplits{0}; + int32_t numQueuedTableScanSplits{0}; + int64_t runningTableScanSplitWeights{0}; + int64_t queuedTableScanSplitWeights{0}; + /// The subscript is given by each Operator's /// DriverCtx::pipelineId. This is a sum total reflecting fully /// processed Splits for Drivers of this pipeline. diff --git a/velox/exec/TaskStructs.h b/velox/exec/TaskStructs.h index 5ceec37289bb..a266902cd4cc 100644 --- a/velox/exec/TaskStructs.h +++ b/velox/exec/TaskStructs.h @@ -64,6 +64,9 @@ struct SplitsStore { /// Structure contains the current info on splits for a particular plan node. struct SplitsState { + /// True if the source node is a table scan. + bool sourceIsTableScan{false}; + /// Plan node-wide 'no more splits'. bool noMoreSplits{false}; diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index 76485ebb37fa..b1fe42f504e9 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -14,17 +14,20 @@ * limitations under the License. */ #include "velox/exec/TableScan.h" +#include #include "velox/common/base/Fs.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/testutil/TestValue.h" #include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/dwio/common/tests/utils/DataFiles.h" +#include "velox/exec/Exchange.h" #include "velox/exec/OutputBufferManager.h" #include "velox/exec/PlanNodeStats.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/Cursor.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/LocalExchangeSource.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/expression/ExprToSubfieldFilter.h" #include "velox/type/Timestamp.h" @@ -54,6 +57,8 @@ class TableScanTest : public virtual HiveConnectorTestBase { protected: void SetUp() override { HiveConnectorTestBase::SetUp(); + exec::ExchangeSource::factories().clear(); + exec::ExchangeSource::registerFactory(createLocalExchangeSource); } static void SetUpTestCase() { @@ -68,8 +73,9 @@ class TableScanTest : public virtual HiveConnectorTestBase { return HiveConnectorTestBase::makeVectors(inputs, count, rowsPerVector); } - exec::Split makeHiveSplit(std::string path) { - return exec::Split(makeHiveConnectorSplit(std::move(path))); + exec::Split makeHiveSplit(std::string path, int64_t splitWeight = 0) { + return exec::Split(makeHiveConnectorSplit( + std::move(path), 0, std::numeric_limits::max(), splitWeight)); } std::shared_ptr assertQuery( @@ -1369,6 +1375,152 @@ TEST_F(TableScanTest, waitForSplit) { duckDbQueryRunner_); } +TEST_F(TableScanTest, tableScanSplitsAndWeights) { + // Create 10 data files for 10 splits. + const size_t numSplits{10}; + const auto filePaths = makeFilePaths(numSplits); + auto vectors = makeVectors(numSplits, 100); + for (auto i = 0; i < numSplits; i++) { + writeToFile(filePaths[i]->path, vectors[i]); + } + + // Set the table scan operators wait twice: + // First, before acquiring a split and then after. + std::atomic_uint32_t numAcquiredSplits{0}; + std::shared_mutex pauseTableScan; + std::shared_mutex pauseSplitProcessing; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::TableScan::getOutput", + std::function( + ([&](const TableScan* /*tableScan*/) { + pauseTableScan.lock_shared(); + pauseTableScan.unlock_shared(); + }))); + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::TableScan::getOutput::gotSplit", + std::function( + ([&](const TableScan* /*tableScan*/) { + ++numAcquiredSplits; + pauseSplitProcessing.lock_shared(); + pauseSplitProcessing.unlock_shared(); + }))); + // This will stop table scan operators from proceeding reading from the + // acquired splits. + pauseTableScan.lock(); + pauseSplitProcessing.lock(); + + // Prepare leaf task for the remote exchange node to pull data from. + auto leafTaskId = "local://leaf-0"; + auto leafPlan = PlanBuilder() + .values(vectors) + .partitionedOutput({}, 1, {"c0", "c1", "c2"}) + .planNode(); + std::unordered_map config; + auto queryCtx = std::make_shared( + executor_.get(), core::QueryConfig(std::move(config))); + core::PlanFragment planFragment{leafPlan}; + Consumer consumer = nullptr; + auto leafTask = Task::create( + leafTaskId, + core::PlanFragment{leafPlan}, + 0, + std::move(queryCtx), + std::move(consumer)); + leafTask->start(4); + + // Main task plan with table scan and remote exchange. + auto planNodeIdGenerator = std::make_shared(); + core::PlanNodeId scanNodeId, exchangeNodeId; + auto planNode = PlanBuilder(planNodeIdGenerator, pool_.get()) + .tableScan(rowType_) + .capturePlanNodeId(scanNodeId) + .project({"c0 AS t0", "c1 AS t1", "c2 AS t2"}) + .hashJoin( + {"t0"}, + {"u0"}, + PlanBuilder(planNodeIdGenerator, pool_.get()) + .exchange(leafPlan->outputType()) + .capturePlanNodeId(exchangeNodeId) + // .values(vectors) + // .partitionedOutput({}, 1, {"c0", "c1", "c2"}) + .project({"c0 AS u0", "c1 AS u1", "c2 AS u2"}) + .planNode(), + "", + {"t1"}, + core::JoinType::kAnti) + .planNode(); + + // Create task, cursor, start the task and supply the table scan splits. + const int32_t numDrivers = 6; + CursorParameters params; + params.planNode = planNode; + params.maxDrivers = numDrivers; + auto cursor = TaskCursor::create(params); + cursor->start(); + auto task = cursor->task(); + int64_t totalSplitWeights{0}; + for (auto fileIndex = 0; fileIndex < numSplits; ++fileIndex) { + const int64_t splitWeight = fileIndex * 10 + 1; + totalSplitWeights += splitWeight; + auto split = makeHiveSplit(filePaths[fileIndex]->path, splitWeight); + task->addSplit(scanNodeId, std::move(split)); + } + task->noMoreSplits(scanNodeId); + // Manage remote exchange splits. + task->addSplit( + exchangeNodeId, + exec::Split(std::make_shared(leafTaskId))); + task->noMoreSplits(exchangeNodeId); + + // Check the task stats. + auto stats = task->taskStats(); + EXPECT_EQ(stats.numRunningTableScanSplits, 0); + EXPECT_EQ(stats.numQueuedTableScanSplits, numSplits); + EXPECT_EQ(stats.runningTableScanSplitWeights, 0); + EXPECT_EQ(stats.queuedTableScanSplitWeights, totalSplitWeights); + EXPECT_EQ(stats.numTotalSplits, numSplits + 1); + + // Let all the operators proceed to acquire splits. + pauseTableScan.unlock(); + + // Wait till 6 out of 10 splits are acquired by the operators in 6 threads + while (numAcquiredSplits < numDrivers) { + /* sleep override */ + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + // Check the task stats. + int64_t runningSplitWeights{0}; + for (auto i = 0; i < numAcquiredSplits; ++i) { + runningSplitWeights += i * 10 + 1; + } + stats = task->taskStats(); + const auto queuedSplitWeights = totalSplitWeights - runningSplitWeights; + EXPECT_EQ(stats.numRunningTableScanSplits, numDrivers); + EXPECT_EQ(stats.numQueuedTableScanSplits, numSplits - numDrivers); + EXPECT_EQ(stats.runningTableScanSplitWeights, runningSplitWeights); + EXPECT_EQ(stats.queuedTableScanSplitWeights, queuedSplitWeights); + + // Let all the operators proceed. + pauseSplitProcessing.unlock(); + + // Finish the task. + std::vector result; + while (cursor->moveNext()) { + result.push_back(cursor->current()); + } + EXPECT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); + EXPECT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); + + // Check task stats again. + stats = task->taskStats(); + EXPECT_EQ(stats.numRunningTableScanSplits, 0); + EXPECT_EQ(stats.numQueuedTableScanSplits, 0); + EXPECT_EQ(stats.runningTableScanSplitWeights, 0); + EXPECT_EQ(stats.queuedTableScanSplitWeights, 0); + EXPECT_EQ(numAcquiredSplits, numSplits); +} + TEST_F(TableScanTest, splitOffsetAndLength) { auto vectors = makeVectors(10, 1'000); auto filePath = TempFilePath::create(); diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.cpp b/velox/exec/tests/utils/HiveConnectorTestBase.cpp index 70d7c630abc9..ece0d3545bdf 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.cpp +++ b/velox/exec/tests/utils/HiveConnectorTestBase.cpp @@ -180,10 +180,12 @@ std::shared_ptr HiveConnectorTestBase::makeHiveConnectorSplit( const std::string& filePath, uint64_t start, - uint64_t length) { + uint64_t length, + int64_t splitWeight) { return HiveConnectorSplitBuilder(filePath) .start(start) .length(length) + .splitWeight(splitWeight) .build(); } diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.h b/velox/exec/tests/utils/HiveConnectorTestBase.h index 34ad0be17713..8f7a03fe6cff 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.h +++ b/velox/exec/tests/utils/HiveConnectorTestBase.h @@ -71,7 +71,8 @@ class HiveConnectorTestBase : public OperatorTestBase { static std::shared_ptr makeHiveConnectorSplit( const std::string& filePath, uint64_t start = 0, - uint64_t length = std::numeric_limits::max()); + uint64_t length = std::numeric_limits::max(), + int64_t splitWeight = 0); /// Split file at path 'filePath' into 'splitCount' splits. If not local file, /// file size can be given as 'externalSize'. @@ -197,6 +198,11 @@ class HiveConnectorSplitBuilder { return *this; } + HiveConnectorSplitBuilder& splitWeight(int64_t splitWeight) { + splitWeight_ = splitWeight; + return *this; + } + HiveConnectorSplitBuilder& fileFormat(dwio::common::FileFormat format) { fileFormat_ = format; return *this; @@ -220,6 +226,9 @@ class HiveConnectorSplitBuilder { } std::shared_ptr build() const { + static const std::unordered_map customSplitInfo; + static const std::shared_ptr extraFileInfo; + static const std::unordered_map serdeParameters; return std::make_shared( connectorId_, filePath_.find("/") == 0 ? "file:" + filePath_ : filePath_, @@ -227,7 +236,11 @@ class HiveConnectorSplitBuilder { start_, length_, partitionKeys_, - tableBucketNumber_); + tableBucketNumber_, + customSplitInfo, + extraFileInfo, + serdeParameters, + splitWeight_); } private: @@ -238,6 +251,7 @@ class HiveConnectorSplitBuilder { std::unordered_map> partitionKeys_; std::optional tableBucketNumber_; std::string connectorId_ = kHiveConnectorId; + int64_t splitWeight_{0}; }; } // namespace facebook::velox::exec::test