Skip to content

Commit

Permalink
Use splits weights. (facebookincubator#8822)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookincubator#8822

HiveConnectorSplit has splitWeight member.
It is being used by Presto coordinator to schedule splits while monitoring how busy Tasks are with running and queued splits.
This change adds book keeping for such weights.
We update them only for TableScans, as this is what Presto Java does.
In addition, we add numRunningTableScanSplits and numQueuedTableScanSplits, which are also updated only for TableScans.

After this change is in, we will need to update presto_cpp to use and report the new stats.

Reviewed By: pranjalssh

Differential Revision: D54049212

fbshipit-source-id: 9dfeaa6885185e8aa94e4477dcee91c8932851a7
  • Loading branch information
Sergey Pershin authored and facebook-github-bot committed Feb 26, 2024
1 parent 3e4a26a commit 52ad7f5
Show file tree
Hide file tree
Showing 13 changed files with 259 additions and 33 deletions.
7 changes: 5 additions & 2 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@ class DataSource;
// as a RowVectorPtr, potentially after processing pushdowns.
struct ConnectorSplit {
const std::string connectorId;
const int64_t splitWeight{0};

std::unique_ptr<AsyncSource<DataSource>> dataSource;

explicit ConnectorSplit(const std::string& _connectorId)
: connectorId(_connectorId) {}
explicit ConnectorSplit(
const std::string& _connectorId,
int64_t _splitWeight = 0)
: connectorId(_connectorId), splitWeight(_splitWeight) {}

virtual ~ConnectorSplit() {}

Expand Down
5 changes: 3 additions & 2 deletions velox/connectors/hive/HiveConnectorSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
std::optional<int32_t> _tableBucketNumber = std::nullopt,
const std::unordered_map<std::string, std::string>& _customSplitInfo = {},
const std::shared_ptr<std::string>& _extraFileInfo = {},
const std::unordered_map<std::string, std::string>& _serdeParameters = {})
: ConnectorSplit(connectorId),
const std::unordered_map<std::string, std::string>& _serdeParameters = {},
int64_t _splitWeight = 0)
: ConnectorSplit(connectorId, _splitWeight),
filePath(_filePath),
fileFormat(_fileFormat),
start(_start),
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/Exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ bool Exchange::getSplits(ContinueFuture* future) {
noMoreSplits_ = true;
if (atEnd_) {
operatorCtx_->task()->multipleSplitsFinished(
stats_.rlock()->numSplits);
false, stats_.rlock()->numSplits, 0);
recordExchangeClientStats();
}
return false;
Expand Down Expand Up @@ -83,7 +83,7 @@ BlockingReason Exchange::isBlocked(ContinueFuture* future) {
if (!currentPages_.empty() || atEnd_) {
if (atEnd_ && noMoreSplits_) {
const auto numSplits = stats_.rlock()->numSplits;
operatorCtx_->task()->multipleSplitsFinished(numSplits);
operatorCtx_->task()->multipleSplitsFinished(false, numSplits, 0);
}
recordExchangeClientStats();
return BlockingReason::kNotBlocked;
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/Merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ BlockingReason MergeExchange::addMergeSources(ContinueFuture* future) {
}
// TODO Delay this call until all input data has been processed.
operatorCtx_->task()->multipleSplitsFinished(
remoteSourceTaskIds_.size());
false, remoteSourceTaskIds_.size(), 0);
return BlockingReason::kNotBlocked;
}
} else {
Expand Down
7 changes: 6 additions & 1 deletion velox/exec/TableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,13 @@ RowVectorPtr TableScan::getOutput() {
}

const auto& connectorSplit = split.connectorSplit;
currentSplitWeight_ = connectorSplit->splitWeight;
needNewSplit_ = false;

// A point for test code injection.
TestValue::adjust(
"facebook::velox::exec::TableScan::getOutput::gotSplit", this);

VELOX_CHECK_EQ(
connector_->connectorId(),
connectorSplit->connectorId,
Expand Down Expand Up @@ -261,7 +266,7 @@ RowVectorPtr TableScan::getOutput() {
}

curStatus_ = "getOutput: task->splitFinished";
driverCtx_->task->splitFinished();
driverCtx_->task->splitFinished(true, currentSplitWeight_);
needNewSplit_ = true;
}
}
Expand Down
1 change: 1 addition & 0 deletions velox/exec/TableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class TableScan : public SourceOperator {
memory::MemoryPool* const connectorPool_;
ContinueFuture blockingFuture_{ContinueFuture::makeEmpty()};
BlockingReason blockingReason_;
int64_t currentSplitWeight_{0};
bool needNewSplit_ = true;
std::shared_ptr<connector::Connector> connector_;
std::shared_ptr<connector::ConnectorQueryCtx> connectorQueryCtx_;
Expand Down
59 changes: 46 additions & 13 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ void buildSplitStates(
// Not all leaf nodes require splits. ValuesNode doesn't. Check if this plan
// node requires splits.
if (planNode->requiresSplits()) {
splitStateMap[planNode->id()];
splitStateMap[planNode->id()].sourceIsTableScan =
(dynamic_cast<const core::TableScanNode*>(planNode) != nullptr);
}
return;
}
Expand Down Expand Up @@ -1220,6 +1221,11 @@ std::unique_ptr<ContinuePromise> Task::addSplitLocked(

if (split.connectorSplit) {
VELOX_CHECK_NULL(split.connectorSplit->dataSource);
if (splitsState.sourceIsTableScan) {
++taskStats_.numQueuedTableScanSplits;
taskStats_.queuedTableScanSplitWeights +=
split.connectorSplit->splitWeight;
}
}

if (!split.hasGroup()) {
Expand Down Expand Up @@ -1359,22 +1365,27 @@ BlockingReason Task::getSplitOrFuture(
exec::Split& split,
ContinueFuture& future,
int32_t maxPreloadSplits,
std::function<void(std::shared_ptr<connector::ConnectorSplit>)> preload) {
const std::function<void(std::shared_ptr<connector::ConnectorSplit>)>&
preload) {
std::lock_guard<std::timed_mutex> l(mutex_);
auto& splitsState = getPlanNodeSplitsStateLocked(planNodeId);
return getSplitOrFutureLocked(
getPlanNodeSplitsStateLocked(planNodeId).groupSplitsStores[splitGroupId],
splitsState.sourceIsTableScan,
splitsState.groupSplitsStores[splitGroupId],
split,
future,
maxPreloadSplits,
preload);
}

BlockingReason Task::getSplitOrFutureLocked(
bool forTableScan,
SplitsStore& splitsStore,
exec::Split& split,
ContinueFuture& future,
int32_t maxPreloadSplits,
std::function<void(std::shared_ptr<connector::ConnectorSplit>)> preload) {
const std::function<void(std::shared_ptr<connector::ConnectorSplit>)>&
preload) {
if (splitsStore.splits.empty()) {
if (splitsStore.noMoreSplits) {
return BlockingReason::kNotBlocked;
Expand All @@ -1386,23 +1397,26 @@ BlockingReason Task::getSplitOrFutureLocked(
return BlockingReason::kWaitForSplit;
}

split = getSplitLocked(splitsStore, maxPreloadSplits, preload);
split = getSplitLocked(forTableScan, splitsStore, maxPreloadSplits, preload);
return BlockingReason::kNotBlocked;
}

exec::Split Task::getSplitLocked(
bool forTableScan,
SplitsStore& splitsStore,
int32_t maxPreloadSplits,
std::function<void(std::shared_ptr<connector::ConnectorSplit>)> preload) {
const std::function<void(std::shared_ptr<connector::ConnectorSplit>)>&
preload) {
int32_t readySplitIndex = -1;
if (maxPreloadSplits) {
for (auto i = 0; i < splitsStore.splits.size() && i < maxPreloadSplits;
++i) {
auto& split = splitsStore.splits[i].connectorSplit;
if (!split->dataSource) {
auto& connectorSplit = splitsStore.splits[i].connectorSplit;
if (!connectorSplit->dataSource) {
// Initializes split->dataSource
preload(split);
} else if (readySplitIndex == -1 && split->dataSource->hasValue()) {
preload(connectorSplit);
} else if (
readySplitIndex == -1 && connectorSplit->dataSource->hasValue()) {
readySplitIndex = i;
}
}
Expand All @@ -1416,6 +1430,13 @@ exec::Split Task::getSplitLocked(

--taskStats_.numQueuedSplits;
++taskStats_.numRunningSplits;
if (forTableScan && split.connectorSplit) {
--taskStats_.numQueuedTableScanSplits;
++taskStats_.numRunningTableScanSplits;
taskStats_.queuedTableScanSplitWeights -= split.connectorSplit->splitWeight;
taskStats_.runningTableScanSplitWeights +=
split.connectorSplit->splitWeight;
}
taskStats_.lastSplitStartTimeMs = getCurrentTimeMs();
if (taskStats_.firstSplitStartTimeMs == 0) {
taskStats_.firstSplitStartTimeMs = taskStats_.lastSplitStartTimeMs;
Expand All @@ -1424,19 +1445,30 @@ exec::Split Task::getSplitLocked(
return split;
}

void Task::splitFinished() {
void Task::splitFinished(bool fromTableScan, int64_t splitWeight) {
std::lock_guard<std::timed_mutex> l(mutex_);
++taskStats_.numFinishedSplits;
--taskStats_.numRunningSplits;
if (fromTableScan) {
--taskStats_.numRunningTableScanSplits;
taskStats_.runningTableScanSplitWeights -= splitWeight;
}
if (isAllSplitsFinishedLocked()) {
taskStats_.executionEndTimeMs = getCurrentTimeMs();
}
}

void Task::multipleSplitsFinished(int32_t numSplits) {
void Task::multipleSplitsFinished(
bool fromTableScan,
int32_t numSplits,
int64_t splitsWeight) {
std::lock_guard<std::timed_mutex> l(mutex_);
taskStats_.numFinishedSplits += numSplits;
taskStats_.numRunningSplits -= numSplits;
if (fromTableScan) {
taskStats_.numRunningTableScanSplits -= numSplits;
taskStats_.runningTableScanSplitWeights -= splitsWeight;
}
if (isAllSplitsFinishedLocked()) {
taskStats_.executionEndTimeMs = getCurrentTimeMs();
}
Expand Down Expand Up @@ -1847,7 +1879,8 @@ ContinueFuture Task::terminate(TaskState terminalState) {
std::vector<exec::Split> splits;
for (auto& [groupId, store] : splitState.groupSplitsStores) {
while (!store.splits.empty()) {
splits.emplace_back(getSplitLocked(store, 0, nullptr));
splits.emplace_back(getSplitLocked(
splitState.sourceIsTableScan, store, 0, nullptr));
}
}
if (!splits.empty()) {
Expand Down
20 changes: 13 additions & 7 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,12 +360,15 @@ class Task : public std::enable_shared_from_this<Task> {
exec::Split& split,
ContinueFuture& future,
int32_t maxPreloadSplits = 0,
std::function<void(std::shared_ptr<connector::ConnectorSplit>)> preload =
nullptr);
const std::function<void(std::shared_ptr<connector::ConnectorSplit>)>&
preload = nullptr);

void splitFinished();
void splitFinished(bool fromTableScan, int64_t splitWeight);

void multipleSplitsFinished(int32_t numSplits);
void multipleSplitsFinished(
bool fromTableScan,
int32_t numSplits,
int64_t splitsWeight);

/// Adds a MergeSource for the specified splitGroupId and planNodeId.
std::shared_ptr<MergeSource> addLocalMergeSource(
Expand Down Expand Up @@ -784,19 +787,22 @@ class Task : public std::enable_shared_from_this<Task> {

/// Retrieve a split or split future from the given split store structure.
BlockingReason getSplitOrFutureLocked(
bool forTableScan,
SplitsStore& splitsStore,
exec::Split& split,
ContinueFuture& future,
int32_t maxPreloadSplits = 0,
std::function<void(std::shared_ptr<connector::ConnectorSplit>)> preload =
nullptr);
const std::function<void(std::shared_ptr<connector::ConnectorSplit>)>&
preload = nullptr);

/// Returns next split from the store. The caller must ensure the store is not
/// empty.
exec::Split getSplitLocked(
bool forTableScan,
SplitsStore& splitsStore,
int32_t maxPreloadSplits,
std::function<void(std::shared_ptr<connector::ConnectorSplit>)> preload);
const std::function<void(std::shared_ptr<connector::ConnectorSplit>)>&
preload);

// Creates for the given split group and fills up the 'SplitGroupState'
// structure, which stores inter-operator state (local exchange, bridges).
Expand Down
6 changes: 6 additions & 0 deletions velox/exec/TaskStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ struct TaskStats {
int32_t numQueuedSplits{0};
std::unordered_set<int32_t> completedSplitGroups;

/// Table scan split stats.
int32_t numRunningTableScanSplits{0};
int32_t numQueuedTableScanSplits{0};
int64_t runningTableScanSplitWeights{0};
int64_t queuedTableScanSplitWeights{0};

/// The subscript is given by each Operator's
/// DriverCtx::pipelineId. This is a sum total reflecting fully
/// processed Splits for Drivers of this pipeline.
Expand Down
3 changes: 3 additions & 0 deletions velox/exec/TaskStructs.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ struct SplitsStore {

/// Structure contains the current info on splits for a particular plan node.
struct SplitsState {
/// True if the source node is a table scan.
bool sourceIsTableScan{false};

/// Plan node-wide 'no more splits'.
bool noMoreSplits{false};

Expand Down
Loading

0 comments on commit 52ad7f5

Please sign in to comment.