Skip to content

Commit

Permalink
[WIP]Put table scan in suspensions state when read
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoxmeng committed Apr 13, 2024
1 parent 15be2c8 commit 18e366b
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
24 changes: 19 additions & 5 deletions velox/exec/TableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,23 @@ folly::dynamic TableScan::toJson() const {
return ret;
}

bool TableScan::shouldYield(size_t startTimeMs) const {
// NOTE: if the task has been paused, then we shall continue execution.
return (driverCtx_->task->shouldStop() == StopReason::kYield ||
driverCtx_->driver->shouldYield() ||
((getOutputTimeLimitMs_ != 0) &&
(getCurrentTimeMs() - startTimeMs) >= getOutputTimeLimitMs_)) &&
!driverCtx_->task->pauseRequested();
}

bool TableScan::shouldStop() const {
const auto blockingReason = driverCtx_->task->shouldStop();
return blockingReason != StopReason::kNone &&
blockingReason != StopReason::kYield;
}

RowVectorPtr TableScan::getOutput() {
SuspendedSection suspendedSection(driverCtx_->driver);
auto exitCurStatusGuard = folly::makeGuard([this]() { curStatus_ = ""; });

if (noMoreSplits_) {
Expand All @@ -72,9 +88,7 @@ RowVectorPtr TableScan::getOutput() {
// w/o producing a result. In this case we return with the Yield blocking
// reason and an already fulfilled future.
curStatus_ = "getOutput: task->shouldStop";
if ((driverCtx_->task->shouldStop() != StopReason::kNone) ||
((getOutputTimeLimitMs_ != 0) &&
(getCurrentTimeMs() - startTimeMs) >= getOutputTimeLimitMs_)) {
if (shouldYield(startTimeMs) || shouldStop()) {
blockingReason_ = BlockingReason::kYield;
blockingFuture_ = ContinueFuture{folly::Unit{}};
// A point for test code injection.
Expand Down Expand Up @@ -137,7 +151,7 @@ RowVectorPtr TableScan::getOutput() {
connectorSplit->connectorId,
"Got splits with different connector IDs");

if (!dataSource_) {
if (dataSource_ == nullptr) {
curStatus_ = "getOutput: creating dataSource_";
connectorQueryCtx_ = operatorCtx_->createConnectorQueryCtx(
connectorSplit->connectorId, planNodeId(), connectorPool_);
Expand All @@ -162,7 +176,7 @@ RowVectorPtr TableScan::getOutput() {
},
&debugString_});

if (connectorSplit->dataSource) {
if (connectorSplit->dataSource != nullptr) {
curStatus_ = "getOutput: preloaded split";
++numPreloadedSplits_;
// The AsyncSource returns a unique_ptr to a shared_ptr. The unique_ptr
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/TableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ class TableScan : public SourceOperator {
}

private:
bool shouldYield(size_t startTimeMs) const;

bool shouldStop() const;

// Sets 'maxPreloadSplits' and 'splitPreloader' if prefetching splits is
// appropriate. The preloader will be applied to the 'first 'maxPreloadSplits'
// of the Task's split queue for 'this' when getting splits.
Expand Down

0 comments on commit 18e366b

Please sign in to comment.