From fd10d67b8e127f56888e3395a4918d50ad3e75f4 Mon Sep 17 00:00:00 2001 From: xiaoxmeng Date: Fri, 12 Apr 2024 19:08:44 -0700 Subject: [PATCH] [WIP]Put table scan in suspensions state for getoutput processing --- velox/exec/Driver.cpp | 5 ++++- velox/exec/TableScan.cpp | 29 ++++++++++++++++++++++++----- velox/exec/TableScan.h | 8 ++++++++ 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 8d8fe4d475303..0e97970968c31 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -1007,7 +1007,10 @@ SuspendedSection::SuspendedSection(Driver* driver) : driver_(driver) { SuspendedSection::~SuspendedSection() { if (driver_->task()->leaveSuspended(driver_->state()) != StopReason::kNone) { - VELOX_FAIL("Terminate detected when leaving suspended section"); + LOG(WARNING) + << "Terminate detected when leaving suspended section for driver " + << driver_->driverCtx()->driverId << " from task " + << driver_->task()->taskId(); } } diff --git a/velox/exec/TableScan.cpp b/velox/exec/TableScan.cpp index 56355bb650f28..865d514ef6fb8 100644 --- a/velox/exec/TableScan.cpp +++ b/velox/exec/TableScan.cpp @@ -57,7 +57,28 @@ folly::dynamic TableScan::toJson() const { return ret; } +bool TableScan::shouldYield(size_t startTimeMs) const { + // Checks task-level yield signal, driver-level yield signal and table scan + // output processing time limit. + // + // NOTE: if the task is being paused, then we shall continue execution as we + // won't yield the driver thread but simply spinning (with on-thread time + // sleep) until the task has been resumed. + return (driverCtx_->task->shouldStop() == StopReason::kYield || + driverCtx_->driver->shouldYield() || + ((getOutputTimeLimitMs_ != 0) && + (getCurrentTimeMs() - startTimeMs) >= getOutputTimeLimitMs_)) && + !driverCtx_->task->pauseRequested(); +} + +bool TableScan::shouldStop() const { + const auto blockingReason = driverCtx_->task->shouldStop(); + return blockingReason != StopReason::kNone && + blockingReason != StopReason::kYield; +} + RowVectorPtr TableScan::getOutput() { + SuspendedSection suspendedSection(driverCtx_->driver); auto exitCurStatusGuard = folly::makeGuard([this]() { curStatus_ = ""; }); if (noMoreSplits_) { @@ -72,9 +93,7 @@ RowVectorPtr TableScan::getOutput() { // w/o producing a result. In this case we return with the Yield blocking // reason and an already fulfilled future. curStatus_ = "getOutput: task->shouldStop"; - if ((driverCtx_->task->shouldStop() != StopReason::kNone) || - ((getOutputTimeLimitMs_ != 0) && - (getCurrentTimeMs() - startTimeMs) >= getOutputTimeLimitMs_)) { + if (shouldYield(startTimeMs) || shouldStop()) { blockingReason_ = BlockingReason::kYield; blockingFuture_ = ContinueFuture{folly::Unit{}}; // A point for test code injection. @@ -137,7 +156,7 @@ RowVectorPtr TableScan::getOutput() { connectorSplit->connectorId, "Got splits with different connector IDs"); - if (!dataSource_) { + if (dataSource_ == nullptr) { curStatus_ = "getOutput: creating dataSource_"; connectorQueryCtx_ = operatorCtx_->createConnectorQueryCtx( connectorSplit->connectorId, planNodeId(), connectorPool_); @@ -162,7 +181,7 @@ RowVectorPtr TableScan::getOutput() { }, &debugString_}); - if (connectorSplit->dataSource) { + if (connectorSplit->dataSource != nullptr) { curStatus_ = "getOutput: preloaded split"; ++numPreloadedSplits_; // The AsyncSource returns a unique_ptr to a shared_ptr. The unique_ptr diff --git a/velox/exec/TableScan.h b/velox/exec/TableScan.h index 5a846f8fa4bd4..3fd55aee82c5e 100644 --- a/velox/exec/TableScan.h +++ b/velox/exec/TableScan.h @@ -57,6 +57,14 @@ class TableScan : public SourceOperator { } private: + // Checks if this table scan operator needs to yield before processing the + // next split. + bool shouldYield(size_t startTimeMs) const; + + // Checks if this table scan operator needs to stop because the task has been + // terminated. + bool shouldStop() const; + // Sets 'maxPreloadSplits' and 'splitPreloader' if prefetching splits is // appropriate. The preloader will be applied to the 'first 'maxPreloadSplits' // of the Task's split queue for 'this' when getting splits.