Skip to content

Commit

Permalink
[WIP]Put table scan in suspensions state when read
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoxmeng committed Apr 13, 2024
1 parent 15be2c8 commit 4c30900
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 5 deletions.
29 changes: 24 additions & 5 deletions velox/exec/TableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,28 @@ folly::dynamic TableScan::toJson() const {
return ret;
}

bool TableScan::shouldYield(size_t startTimeMs) const {
// Check task-level yield signal, driver-level yield signal and table scan
// output processing time limit.
//
// NOTE: if the task is being paused, then we shall continue execution as we
// won't yield the driver thread but simply spinning (with on-thread time
// sleep) until the task has been resumed.
return (driverCtx_->task->shouldStop() == StopReason::kYield ||
driverCtx_->driver->shouldYield() ||
((getOutputTimeLimitMs_ != 0) &&
(getCurrentTimeMs() - startTimeMs) >= getOutputTimeLimitMs_)) &&
!driverCtx_->task->pauseRequested();
}

bool TableScan::shouldStop() const {
const auto blockingReason = driverCtx_->task->shouldStop();
return blockingReason != StopReason::kNone &&
blockingReason != StopReason::kYield;
}

RowVectorPtr TableScan::getOutput() {
SuspendedSection suspendedSection(driverCtx_->driver);
auto exitCurStatusGuard = folly::makeGuard([this]() { curStatus_ = ""; });

if (noMoreSplits_) {
Expand All @@ -72,9 +93,7 @@ RowVectorPtr TableScan::getOutput() {
// w/o producing a result. In this case we return with the Yield blocking
// reason and an already fulfilled future.
curStatus_ = "getOutput: task->shouldStop";
if ((driverCtx_->task->shouldStop() != StopReason::kNone) ||
((getOutputTimeLimitMs_ != 0) &&
(getCurrentTimeMs() - startTimeMs) >= getOutputTimeLimitMs_)) {
if (shouldYield(startTimeMs) || shouldStop()) {
blockingReason_ = BlockingReason::kYield;
blockingFuture_ = ContinueFuture{folly::Unit{}};
// A point for test code injection.
Expand Down Expand Up @@ -137,7 +156,7 @@ RowVectorPtr TableScan::getOutput() {
connectorSplit->connectorId,
"Got splits with different connector IDs");

if (!dataSource_) {
if (dataSource_ == nullptr) {
curStatus_ = "getOutput: creating dataSource_";
connectorQueryCtx_ = operatorCtx_->createConnectorQueryCtx(
connectorSplit->connectorId, planNodeId(), connectorPool_);
Expand All @@ -162,7 +181,7 @@ RowVectorPtr TableScan::getOutput() {
},
&debugString_});

if (connectorSplit->dataSource) {
if (connectorSplit->dataSource != nullptr) {
curStatus_ = "getOutput: preloaded split";
++numPreloadedSplits_;
// The AsyncSource returns a unique_ptr to a shared_ptr. The unique_ptr
Expand Down
8 changes: 8 additions & 0 deletions velox/exec/TableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ class TableScan : public SourceOperator {
}

private:
// Checks if this table scan operator needs to yield before processing the
// next split.
bool shouldYield(size_t startTimeMs) const;

// Checks if this table scan operator needs to stop because the task has been
// terminated.
bool shouldStop() const;

// Sets 'maxPreloadSplits' and 'splitPreloader' if prefetching splits is
// appropriate. The preloader will be applied to the 'first 'maxPreloadSplits'
// of the Task's split queue for 'this' when getting splits.
Expand Down

0 comments on commit 4c30900

Please sign in to comment.