From 5e077909f912dea6902b6ddf07ec24639134e757 Mon Sep 17 00:00:00 2001 From: xiaoxmeng Date: Tue, 19 Mar 2024 20:14:26 -0700 Subject: [PATCH] Table scan cleanup (#9156) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/9156 Reviewed By: kevinwilfong Differential Revision: D55086413 Pulled By: xiaoxmeng fbshipit-source-id: 60c557f7635e7eb498e17e46f1a15534d862b8bf --- velox/connectors/Connector.h | 19 +++++----- velox/connectors/hive/HiveDataSource.cpp | 6 +-- velox/exec/TableScan.cpp | 48 ++++++++++++------------ velox/exec/TableScan.h | 32 ++++++++-------- velox/exec/Task.cpp | 8 ++-- velox/exec/tests/TableScanTest.cpp | 2 +- 6 files changed, 56 insertions(+), 59 deletions(-) diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index 3e8027ea99a6..b591603e9c31 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -207,18 +207,17 @@ class DataSource { virtual std::unordered_map runtimeStats() = 0; - // Returns true if 'this' has initiated all the prefetch this will - // initiate. This means that the caller should schedule next splits - // to prefetch in the background. false if the source does not - // prefetch. + /// Returns true if 'this' has initiated all the prefetch this will initiate. + /// This means that the caller should schedule next splits to prefetch in the + /// background. false if the source does not prefetch. virtual bool allPrefetchIssued() const { return false; } - // Initializes this from 'source'. 'source' is effectively moved - // into 'this' Adaptation like dynamic filters stay in effect but - // the parts dealing with open files, prefetched data etc. are moved. 'source' - // is freed after the move. + /// Initializes this from 'source'. 'source' is effectively moved into 'this' + /// Adaptation like dynamic filters stay in effect but the parts dealing with + /// open files, prefetched data etc. are moved. 'source' is freed after the + /// move. virtual void setFromDataSource(std::unique_ptr /*source*/) { VELOX_UNSUPPORTED("setFromDataSource"); } @@ -392,7 +391,7 @@ class Connector { const std::string& scanId, int32_t loadQuantum); - virtual folly::Executor* FOLLY_NULLABLE executor() const { + virtual folly::Executor* executor() const { return nullptr; } @@ -422,7 +421,7 @@ class ConnectorFactory { virtual std::shared_ptr newConnector( const std::string& id, std::shared_ptr config, - folly::Executor* FOLLY_NULLABLE executor = nullptr) = 0; + folly::Executor* executor = nullptr) = 0; private: const std::string name_; diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 7fb869d8abdf..6093b44910fb 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -178,11 +178,11 @@ std::unique_ptr HiveDataSource::createSplitReader() { } void HiveDataSource::addSplit(std::shared_ptr split) { - VELOX_CHECK( - split_ == nullptr, + VELOX_CHECK_NULL( + split_, "Previous split has not been processed yet. Call next to process the split."); split_ = std::dynamic_pointer_cast(split); - VELOX_CHECK(split_, "Wrong type of split"); + VELOX_CHECK_NOT_NULL(split_, "Wrong type of split"); VLOG(1) << "Adding split " << split_->toString(); diff --git a/velox/exec/TableScan.cpp b/velox/exec/TableScan.cpp index 43ef8ef9074f..56355bb650f2 100644 --- a/velox/exec/TableScan.cpp +++ b/velox/exec/TableScan.cpp @@ -23,8 +23,6 @@ using facebook::velox::common::testutil::TestValue; namespace facebook::velox::exec { -std::atomic TableScan::ioWaitNanos_; - TableScan::TableScan( int32_t operatorId, DriverCtx* driverCtx, @@ -74,14 +72,14 @@ RowVectorPtr TableScan::getOutput() { // w/o producing a result. In this case we return with the Yield blocking // reason and an already fulfilled future. curStatus_ = "getOutput: task->shouldStop"; - if (this->driverCtx_->task->shouldStop() != StopReason::kNone or - (getOutputTimeLimitMs_ != 0 and + if ((driverCtx_->task->shouldStop() != StopReason::kNone) || + ((getOutputTimeLimitMs_ != 0) && (getCurrentTimeMs() - startTimeMs) >= getOutputTimeLimitMs_)) { blockingReason_ = BlockingReason::kYield; blockingFuture_ = ContinueFuture{folly::Unit{}}; // A point for test code injection. TestValue::adjust( - "facebook::velox::exec::TableScan::getOutput::bail", this); + "facebook::velox::exec::TableScan::getOutput::yield", this); return nullptr; } @@ -103,19 +101,19 @@ RowVectorPtr TableScan::getOutput() { if (!split.hasConnectorSplit()) { noMoreSplits_ = true; - pendingDynamicFilters_.clear(); + dynamicFilters_.clear(); if (dataSource_) { curStatus_ = "getOutput: noMoreSplits_=1, updating stats_"; - auto connectorStats = dataSource_->runtimeStats(); + const auto connectorStats = dataSource_->runtimeStats(); auto lockedStats = stats_.wlock(); for (const auto& [name, counter] : connectorStats) { if (name == "ioWaitNanos") { ioWaitNanos_ += counter.value - lastIoWaitNanos_; lastIoWaitNanos_ = counter.value; } - if (UNLIKELY(lockedStats->runtimeStats.count(name) == 0)) { - lockedStats->runtimeStats.insert( - std::make_pair(name, RuntimeMetric(counter.unit))); + if (FOLLY_UNLIKELY(lockedStats->runtimeStats.count(name) == 0)) { + lockedStats->runtimeStats.emplace( + name, RuntimeMetric(counter.unit)); } else { VELOX_CHECK_EQ( lockedStats->runtimeStats.at(name).unit, counter.unit); @@ -148,7 +146,7 @@ RowVectorPtr TableScan::getOutput() { tableHandle_, columnHandles_, connectorQueryCtx_.get()); - for (const auto& entry : pendingDynamicFilters_) { + for (const auto& entry : dynamicFilters_) { dataSource_->addDynamicFilter(entry.first, entry.second); } } @@ -167,8 +165,8 @@ RowVectorPtr TableScan::getOutput() { if (connectorSplit->dataSource) { curStatus_ = "getOutput: preloaded split"; ++numPreloadedSplits_; - // The AsyncSource returns a unique_ptr to a shared_ptr. The - // unique_ptr will be nullptr if there was a cancellation. + // The AsyncSource returns a unique_ptr to a shared_ptr. The unique_ptr + // will be nullptr if there was a cancellation. numReadyPreloadedSplits_ += connectorSplit->dataSource->hasValue(); auto preparedDataSource = connectorSplit->dataSource->move(); stats_.wlock()->getOutputTiming.add( @@ -187,7 +185,7 @@ RowVectorPtr TableScan::getOutput() { ++stats_.wlock()->numSplits; curStatus_ = "getOutput: dataSource_->estimatedRowSize"; - auto estimatedRowSize = dataSource_->estimatedRowSize(); + const auto estimatedRowSize = dataSource_->estimatedRowSize(); readBatchSize_ = estimatedRowSize == connector::DataSource::kUnknownRowSize ? outputBatchRows() @@ -201,6 +199,7 @@ RowVectorPtr TableScan::getOutput() { if (operatorCtx_->task()->isCancelled()) { return nullptr; } + ExceptionContextSetter exceptionContext( {[](VeloxException::Type /*exceptionType*/, auto* debugString) { return *static_cast(debugString); @@ -235,8 +234,8 @@ RowVectorPtr TableScan::getOutput() { curStatus_ = "getOutput: updating stats_.rawInput"; lockedStats->rawInputPositions = dataSource_->getCompletedRows(); lockedStats->rawInputBytes = dataSource_->getCompletedBytes(); - auto data = dataOptional.value(); - if (data) { + RowVectorPtr data = dataOptional.value(); + if (data != nullptr) { if (data->size() > 0) { lockedStats->addInputVector(data->estimateFlatSize(), data->size()); constexpr int kMaxSelectiveBatchSizeMultiplier = 4; @@ -285,7 +284,7 @@ void TableScan::preload(std::shared_ptr split) { ctx = operatorCtx_->createConnectorQueryCtx( split->connectorId, planNodeId(), connectorPool_), task = operatorCtx_->task(), - pendingDynamicFilters = pendingDynamicFilters_, + dynamicFilters = dynamicFilters_, split]() -> std::unique_ptr { if (task->isCancelled()) { return nullptr; @@ -298,20 +297,21 @@ void TableScan::preload(std::shared_ptr split) { }, &debugString}); - auto ptr = connector->createDataSource(type, table, columns, ctx.get()); + auto dataSource = + connector->createDataSource(type, table, columns, ctx.get()); if (task->isCancelled()) { return nullptr; } - for (const auto& entry : pendingDynamicFilters) { - ptr->addDynamicFilter(entry.first, entry.second); + for (const auto& entry : dynamicFilters) { + dataSource->addDynamicFilter(entry.first, entry.second); } - ptr->addSplit(split); - return ptr; + dataSource->addSplit(split); + return dataSource; }); } void TableScan::checkPreload() { - auto executor = connector_->executor(); + auto* executor = connector_->executor(); if (maxSplitPreloadPerDriver_ == 0 || !executor || !connector_->supportsSplitPreload()) { return; @@ -345,7 +345,7 @@ void TableScan::addDynamicFilter( if (dataSource_) { dataSource_->addDynamicFilter(outputChannel, filter); } - pendingDynamicFilters_.emplace(outputChannel, filter); + dynamicFilters_.emplace(outputChannel, filter); } } // namespace facebook::velox::exec diff --git a/velox/exec/TableScan.h b/velox/exec/TableScan.h index 1902e090bae7..5a846f8fa4bd 100644 --- a/velox/exec/TableScan.h +++ b/velox/exec/TableScan.h @@ -57,20 +57,19 @@ class TableScan : public SourceOperator { } private: - // Sets 'maxPreloadSplits' and 'splitPreloader' if prefetching - // splits is appropriate. The preloader will be applied to the - // 'first 'maxPreloadSplits' of the Tasks's split queue for 'this' - // when getting splits. + // Sets 'maxPreloadSplits' and 'splitPreloader' if prefetching splits is + // appropriate. The preloader will be applied to the 'first 'maxPreloadSplits' + // of the Task's split queue for 'this' when getting splits. void checkPreload(); - // Sets 'split->dataSource' to be a Asyncsource that makes a - // DataSource to read 'split'. This source will be prepared in the - // background on the executor of the connector. If the DataSource is - // needed before prepare is done, it will be made when needed. + // Sets 'split->dataSource' to be an AsyncSource that makes a DataSource to + // read 'split'. This source will be prepared in the background on the + // executor of the connector. If the DataSource is needed before prepare is + // done, it will be made when needed. void preload(std::shared_ptr split); // Process-wide IO wait time. - static std::atomic ioWaitNanos_; + inline static std::atomic ioWaitNanos_; const std::shared_ptr tableHandle_; const std:: @@ -88,17 +87,16 @@ class TableScan : public SourceOperator { bool noMoreSplits_ = false; // Dynamic filters to add to the data source when it gets created. std::unordered_map> - pendingDynamicFilters_; + dynamicFilters_; int32_t maxPreloadedSplits_{0}; const int32_t maxSplitPreloadPerDriver_{0}; - // Callback passed to getSplitOrFuture() for triggering async - // preload. The callback's lifetime is the lifetime of 'this'. This - // callback can schedule preloads on an executor. These preloads may - // outlive the Task and therefore need to capture a shared_ptr to - // it. + // Callback passed to getSplitOrFuture() for triggering async preload. The + // callback's lifetime is the lifetime of 'this'. This callback can schedule + // preloads on an executor. These preloads may outlive the Task and therefore + // need to capture a shared_ptr to it. std::function&)> splitPreloader_{nullptr}; @@ -111,8 +109,8 @@ class TableScan : public SourceOperator { int32_t readBatchSize_; int32_t maxReadBatchSize_; - // Exits getOutput() method after this many milliseconds. - // Zero means 'no limit'. + // Exits getOutput() method after this many milliseconds. Zero means 'no + // limit'. size_t getOutputTimeLimitMs_{0}; double maxFilteringRatio_{0}; diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 0c233aa0c633..3b0e2840a8b3 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -1438,15 +1438,15 @@ exec::Split Task::getSplitLocked( int32_t maxPreloadSplits, const ConnectorSplitPreloadFunc& preload) { int32_t readySplitIndex = -1; - if (maxPreloadSplits) { + if (maxPreloadSplits > 0) { for (auto i = 0; i < splitsStore.splits.size() && i < maxPreloadSplits; ++i) { auto& connectorSplit = splitsStore.splits[i].connectorSplit; if (!connectorSplit->dataSource) { - // Initializes split->dataSource + // Initializes split->dataSource. preload(connectorSplit); } else if ( - readySplitIndex == -1 && connectorSplit->dataSource->hasValue()) { + (readySplitIndex == -1) && (connectorSplit->dataSource->hasValue())) { readySplitIndex = i; } } @@ -1454,7 +1454,7 @@ exec::Split Task::getSplitLocked( if (readySplitIndex == -1) { readySplitIndex = 0; } - assert(!splitsStore.splits.empty()); + VELOX_CHECK(!splitsStore.splits.empty()); auto split = std::move(splitsStore.splits[readySplitIndex]); splitsStore.splits.erase(splitsStore.splits.begin() + readySplitIndex); diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index 2f9984cb3cab..7f0abc9e7a9e 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -491,7 +491,7 @@ DEBUG_ONLY_TEST_F(TableScanTest, timeLimitInGetOutput) { // Count how many times we bailed from getOutput. size_t numBailed{0}; SCOPED_TESTVALUE_SET( - "facebook::velox::exec::TableScan::getOutput::bail", + "facebook::velox::exec::TableScan::getOutput::yield", std::function( ([&](const TableScan* /*tableScan*/) { ++numBailed; })));