diff --git a/velox/exec/TableScan.cpp b/velox/exec/TableScan.cpp index 56355bb650f28..395049b27737b 100644 --- a/velox/exec/TableScan.cpp +++ b/velox/exec/TableScan.cpp @@ -57,7 +57,28 @@ folly::dynamic TableScan::toJson() const { return ret; } +bool TableScan::shouldYield(size_t startTimeMs) const { + // Check task-level yield signal, driver-level yield signal and table scan + // output processing time limit. + // + // NOTE: if the task is being paused, then we shall continue execution as we + // won't yield the driver thread but simply spinning (with on-thread time + // sleep) until the task has been resumed. + return (driverCtx_->task->shouldStop() == StopReason::kYield || + driverCtx_->driver->shouldYield() || + ((getOutputTimeLimitMs_ != 0) && + (getCurrentTimeMs() - startTimeMs) >= getOutputTimeLimitMs_)) && + !driverCtx_->task->pauseRequested(); +} + +bool TableScan::shouldStop() const { + const auto blockingReason = driverCtx_->task->shouldStop(); + return blockingReason != StopReason::kNone && + blockingReason != StopReason::kYield; +} + RowVectorPtr TableScan::getOutput() { + SuspendedSection suspendedSection(driverCtx_->driver); auto exitCurStatusGuard = folly::makeGuard([this]() { curStatus_ = ""; }); if (noMoreSplits_) { @@ -72,9 +93,7 @@ RowVectorPtr TableScan::getOutput() { // w/o producing a result. In this case we return with the Yield blocking // reason and an already fulfilled future. curStatus_ = "getOutput: task->shouldStop"; - if ((driverCtx_->task->shouldStop() != StopReason::kNone) || - ((getOutputTimeLimitMs_ != 0) && - (getCurrentTimeMs() - startTimeMs) >= getOutputTimeLimitMs_)) { + if (shouldYield(startTimeMs) || shouldStop()) { blockingReason_ = BlockingReason::kYield; blockingFuture_ = ContinueFuture{folly::Unit{}}; // A point for test code injection. @@ -137,7 +156,7 @@ RowVectorPtr TableScan::getOutput() { connectorSplit->connectorId, "Got splits with different connector IDs"); - if (!dataSource_) { + if (dataSource_ == nullptr) { curStatus_ = "getOutput: creating dataSource_"; connectorQueryCtx_ = operatorCtx_->createConnectorQueryCtx( connectorSplit->connectorId, planNodeId(), connectorPool_); @@ -162,7 +181,7 @@ RowVectorPtr TableScan::getOutput() { }, &debugString_}); - if (connectorSplit->dataSource) { + if (connectorSplit->dataSource != nullptr) { curStatus_ = "getOutput: preloaded split"; ++numPreloadedSplits_; // The AsyncSource returns a unique_ptr to a shared_ptr. The unique_ptr diff --git a/velox/exec/TableScan.h b/velox/exec/TableScan.h index 5a846f8fa4bd4..3fd55aee82c5e 100644 --- a/velox/exec/TableScan.h +++ b/velox/exec/TableScan.h @@ -57,6 +57,14 @@ class TableScan : public SourceOperator { } private: + // Checks if this table scan operator needs to yield before processing the + // next split. + bool shouldYield(size_t startTimeMs) const; + + // Checks if this table scan operator needs to stop because the task has been + // terminated. + bool shouldStop() const; + // Sets 'maxPreloadSplits' and 'splitPreloader' if prefetching splits is // appropriate. The preloader will be applied to the 'first 'maxPreloadSplits' // of the Task's split queue for 'this' when getting splits.