Skip to content

Commit

Permalink
Table scan cleanup (facebookincubator#9156)
Browse files Browse the repository at this point in the history
Summary: Pull Request resolved: facebookincubator#9156

Reviewed By: kevinwilfong

Differential Revision: D55086413

Pulled By: xiaoxmeng

fbshipit-source-id: 60c557f7635e7eb498e17e46f1a15534d862b8bf
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Mar 20, 2024
1 parent 21491d7 commit 5e07790
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 59 deletions.
19 changes: 9 additions & 10 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,18 +207,17 @@ class DataSource {

virtual std::unordered_map<std::string, RuntimeCounter> runtimeStats() = 0;

// Returns true if 'this' has initiated all the prefetch this will
// initiate. This means that the caller should schedule next splits
// to prefetch in the background. false if the source does not
// prefetch.
/// Returns true if 'this' has initiated all the prefetch this will initiate.
/// This means that the caller should schedule next splits to prefetch in the
/// background. false if the source does not prefetch.
virtual bool allPrefetchIssued() const {
return false;
}

// Initializes this from 'source'. 'source' is effectively moved
// into 'this' Adaptation like dynamic filters stay in effect but
// the parts dealing with open files, prefetched data etc. are moved. 'source'
// is freed after the move.
/// Initializes this from 'source'. 'source' is effectively moved into 'this'
/// Adaptation like dynamic filters stay in effect but the parts dealing with
/// open files, prefetched data etc. are moved. 'source' is freed after the
/// move.
virtual void setFromDataSource(std::unique_ptr<DataSource> /*source*/) {
VELOX_UNSUPPORTED("setFromDataSource");
}
Expand Down Expand Up @@ -392,7 +391,7 @@ class Connector {
const std::string& scanId,
int32_t loadQuantum);

virtual folly::Executor* FOLLY_NULLABLE executor() const {
virtual folly::Executor* executor() const {
return nullptr;
}

Expand Down Expand Up @@ -422,7 +421,7 @@ class ConnectorFactory {
virtual std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const Config> config,
folly::Executor* FOLLY_NULLABLE executor = nullptr) = 0;
folly::Executor* executor = nullptr) = 0;

private:
const std::string name_;
Expand Down
6 changes: 3 additions & 3 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,11 @@ std::unique_ptr<SplitReader> HiveDataSource::createSplitReader() {
}

void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
VELOX_CHECK(
split_ == nullptr,
VELOX_CHECK_NULL(
split_,
"Previous split has not been processed yet. Call next to process the split.");
split_ = std::dynamic_pointer_cast<HiveConnectorSplit>(split);
VELOX_CHECK(split_, "Wrong type of split");
VELOX_CHECK_NOT_NULL(split_, "Wrong type of split");

VLOG(1) << "Adding split " << split_->toString();

Expand Down
48 changes: 24 additions & 24 deletions velox/exec/TableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ using facebook::velox::common::testutil::TestValue;

namespace facebook::velox::exec {

std::atomic<uint64_t> TableScan::ioWaitNanos_;

TableScan::TableScan(
int32_t operatorId,
DriverCtx* driverCtx,
Expand Down Expand Up @@ -74,14 +72,14 @@ RowVectorPtr TableScan::getOutput() {
// w/o producing a result. In this case we return with the Yield blocking
// reason and an already fulfilled future.
curStatus_ = "getOutput: task->shouldStop";
if (this->driverCtx_->task->shouldStop() != StopReason::kNone or
(getOutputTimeLimitMs_ != 0 and
if ((driverCtx_->task->shouldStop() != StopReason::kNone) ||
((getOutputTimeLimitMs_ != 0) &&
(getCurrentTimeMs() - startTimeMs) >= getOutputTimeLimitMs_)) {
blockingReason_ = BlockingReason::kYield;
blockingFuture_ = ContinueFuture{folly::Unit{}};
// A point for test code injection.
TestValue::adjust(
"facebook::velox::exec::TableScan::getOutput::bail", this);
"facebook::velox::exec::TableScan::getOutput::yield", this);
return nullptr;
}

Expand All @@ -103,19 +101,19 @@ RowVectorPtr TableScan::getOutput() {

if (!split.hasConnectorSplit()) {
noMoreSplits_ = true;
pendingDynamicFilters_.clear();
dynamicFilters_.clear();
if (dataSource_) {
curStatus_ = "getOutput: noMoreSplits_=1, updating stats_";
auto connectorStats = dataSource_->runtimeStats();
const auto connectorStats = dataSource_->runtimeStats();
auto lockedStats = stats_.wlock();
for (const auto& [name, counter] : connectorStats) {
if (name == "ioWaitNanos") {
ioWaitNanos_ += counter.value - lastIoWaitNanos_;
lastIoWaitNanos_ = counter.value;
}
if (UNLIKELY(lockedStats->runtimeStats.count(name) == 0)) {
lockedStats->runtimeStats.insert(
std::make_pair(name, RuntimeMetric(counter.unit)));
if (FOLLY_UNLIKELY(lockedStats->runtimeStats.count(name) == 0)) {
lockedStats->runtimeStats.emplace(
name, RuntimeMetric(counter.unit));
} else {
VELOX_CHECK_EQ(
lockedStats->runtimeStats.at(name).unit, counter.unit);
Expand Down Expand Up @@ -148,7 +146,7 @@ RowVectorPtr TableScan::getOutput() {
tableHandle_,
columnHandles_,
connectorQueryCtx_.get());
for (const auto& entry : pendingDynamicFilters_) {
for (const auto& entry : dynamicFilters_) {
dataSource_->addDynamicFilter(entry.first, entry.second);
}
}
Expand All @@ -167,8 +165,8 @@ RowVectorPtr TableScan::getOutput() {
if (connectorSplit->dataSource) {
curStatus_ = "getOutput: preloaded split";
++numPreloadedSplits_;
// The AsyncSource returns a unique_ptr to a shared_ptr. The
// unique_ptr will be nullptr if there was a cancellation.
// The AsyncSource returns a unique_ptr to a shared_ptr. The unique_ptr
// will be nullptr if there was a cancellation.
numReadyPreloadedSplits_ += connectorSplit->dataSource->hasValue();
auto preparedDataSource = connectorSplit->dataSource->move();
stats_.wlock()->getOutputTiming.add(
Expand All @@ -187,7 +185,7 @@ RowVectorPtr TableScan::getOutput() {
++stats_.wlock()->numSplits;

curStatus_ = "getOutput: dataSource_->estimatedRowSize";
auto estimatedRowSize = dataSource_->estimatedRowSize();
const auto estimatedRowSize = dataSource_->estimatedRowSize();
readBatchSize_ =
estimatedRowSize == connector::DataSource::kUnknownRowSize
? outputBatchRows()
Expand All @@ -201,6 +199,7 @@ RowVectorPtr TableScan::getOutput() {
if (operatorCtx_->task()->isCancelled()) {
return nullptr;
}

ExceptionContextSetter exceptionContext(
{[](VeloxException::Type /*exceptionType*/, auto* debugString) {
return *static_cast<std::string*>(debugString);
Expand Down Expand Up @@ -235,8 +234,8 @@ RowVectorPtr TableScan::getOutput() {
curStatus_ = "getOutput: updating stats_.rawInput";
lockedStats->rawInputPositions = dataSource_->getCompletedRows();
lockedStats->rawInputBytes = dataSource_->getCompletedBytes();
auto data = dataOptional.value();
if (data) {
RowVectorPtr data = dataOptional.value();
if (data != nullptr) {
if (data->size() > 0) {
lockedStats->addInputVector(data->estimateFlatSize(), data->size());
constexpr int kMaxSelectiveBatchSizeMultiplier = 4;
Expand Down Expand Up @@ -285,7 +284,7 @@ void TableScan::preload(std::shared_ptr<connector::ConnectorSplit> split) {
ctx = operatorCtx_->createConnectorQueryCtx(
split->connectorId, planNodeId(), connectorPool_),
task = operatorCtx_->task(),
pendingDynamicFilters = pendingDynamicFilters_,
dynamicFilters = dynamicFilters_,
split]() -> std::unique_ptr<connector::DataSource> {
if (task->isCancelled()) {
return nullptr;
Expand All @@ -298,20 +297,21 @@ void TableScan::preload(std::shared_ptr<connector::ConnectorSplit> split) {
},
&debugString});

auto ptr = connector->createDataSource(type, table, columns, ctx.get());
auto dataSource =
connector->createDataSource(type, table, columns, ctx.get());
if (task->isCancelled()) {
return nullptr;
}
for (const auto& entry : pendingDynamicFilters) {
ptr->addDynamicFilter(entry.first, entry.second);
for (const auto& entry : dynamicFilters) {
dataSource->addDynamicFilter(entry.first, entry.second);
}
ptr->addSplit(split);
return ptr;
dataSource->addSplit(split);
return dataSource;
});
}

void TableScan::checkPreload() {
auto executor = connector_->executor();
auto* executor = connector_->executor();
if (maxSplitPreloadPerDriver_ == 0 || !executor ||
!connector_->supportsSplitPreload()) {
return;
Expand Down Expand Up @@ -345,7 +345,7 @@ void TableScan::addDynamicFilter(
if (dataSource_) {
dataSource_->addDynamicFilter(outputChannel, filter);
}
pendingDynamicFilters_.emplace(outputChannel, filter);
dynamicFilters_.emplace(outputChannel, filter);
}

} // namespace facebook::velox::exec
32 changes: 15 additions & 17 deletions velox/exec/TableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,19 @@ class TableScan : public SourceOperator {
}

private:
// Sets 'maxPreloadSplits' and 'splitPreloader' if prefetching
// splits is appropriate. The preloader will be applied to the
// 'first 'maxPreloadSplits' of the Tasks's split queue for 'this'
// when getting splits.
// Sets 'maxPreloadSplits' and 'splitPreloader' if prefetching splits is
// appropriate. The preloader will be applied to the 'first 'maxPreloadSplits'
// of the Task's split queue for 'this' when getting splits.
void checkPreload();

// Sets 'split->dataSource' to be a Asyncsource that makes a
// DataSource to read 'split'. This source will be prepared in the
// background on the executor of the connector. If the DataSource is
// needed before prepare is done, it will be made when needed.
// Sets 'split->dataSource' to be an AsyncSource that makes a DataSource to
// read 'split'. This source will be prepared in the background on the
// executor of the connector. If the DataSource is needed before prepare is
// done, it will be made when needed.
void preload(std::shared_ptr<connector::ConnectorSplit> split);

// Process-wide IO wait time.
static std::atomic<uint64_t> ioWaitNanos_;
inline static std::atomic<uint64_t> ioWaitNanos_;

const std::shared_ptr<connector::ConnectorTableHandle> tableHandle_;
const std::
Expand All @@ -88,17 +87,16 @@ class TableScan : public SourceOperator {
bool noMoreSplits_ = false;
// Dynamic filters to add to the data source when it gets created.
std::unordered_map<column_index_t, std::shared_ptr<common::Filter>>
pendingDynamicFilters_;
dynamicFilters_;

int32_t maxPreloadedSplits_{0};

const int32_t maxSplitPreloadPerDriver_{0};

// Callback passed to getSplitOrFuture() for triggering async
// preload. The callback's lifetime is the lifetime of 'this'. This
// callback can schedule preloads on an executor. These preloads may
// outlive the Task and therefore need to capture a shared_ptr to
// it.
// Callback passed to getSplitOrFuture() for triggering async preload. The
// callback's lifetime is the lifetime of 'this'. This callback can schedule
// preloads on an executor. These preloads may outlive the Task and therefore
// need to capture a shared_ptr to it.
std::function<void(const std::shared_ptr<connector::ConnectorSplit>&)>
splitPreloader_{nullptr};

Expand All @@ -111,8 +109,8 @@ class TableScan : public SourceOperator {
int32_t readBatchSize_;
int32_t maxReadBatchSize_;

// Exits getOutput() method after this many milliseconds.
// Zero means 'no limit'.
// Exits getOutput() method after this many milliseconds. Zero means 'no
// limit'.
size_t getOutputTimeLimitMs_{0};

double maxFilteringRatio_{0};
Expand Down
8 changes: 4 additions & 4 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1438,23 +1438,23 @@ exec::Split Task::getSplitLocked(
int32_t maxPreloadSplits,
const ConnectorSplitPreloadFunc& preload) {
int32_t readySplitIndex = -1;
if (maxPreloadSplits) {
if (maxPreloadSplits > 0) {
for (auto i = 0; i < splitsStore.splits.size() && i < maxPreloadSplits;
++i) {
auto& connectorSplit = splitsStore.splits[i].connectorSplit;
if (!connectorSplit->dataSource) {
// Initializes split->dataSource
// Initializes split->dataSource.
preload(connectorSplit);
} else if (
readySplitIndex == -1 && connectorSplit->dataSource->hasValue()) {
(readySplitIndex == -1) && (connectorSplit->dataSource->hasValue())) {
readySplitIndex = i;
}
}
}
if (readySplitIndex == -1) {
readySplitIndex = 0;
}
assert(!splitsStore.splits.empty());
VELOX_CHECK(!splitsStore.splits.empty());
auto split = std::move(splitsStore.splits[readySplitIndex]);
splitsStore.splits.erase(splitsStore.splits.begin() + readySplitIndex);

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ DEBUG_ONLY_TEST_F(TableScanTest, timeLimitInGetOutput) {
// Count how many times we bailed from getOutput.
size_t numBailed{0};
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::TableScan::getOutput::bail",
"facebook::velox::exec::TableScan::getOutput::yield",
std::function<void(const TableScan*)>(
([&](const TableScan* /*tableScan*/) { ++numBailed; })));

Expand Down

0 comments on commit 5e07790

Please sign in to comment.