Skip to content

Commit

Permalink
Measure isBlocked and isFinished operator calls (#10923)
Browse files Browse the repository at this point in the history
Summary: Pull Request resolved: #10923

Reviewed By: kevinwilfong

Differential Revision: D63659424

Pulled By: xiaoxmeng

fbshipit-source-id: 3f0081fbef6d523f8cd8bee371cf8eab82407b0a
  • Loading branch information
yingsu00 authored and facebook-github-bot committed Oct 3, 2024
1 parent 716dfc6 commit f4ca3c6
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 46 deletions.
99 changes: 54 additions & 45 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -556,23 +556,28 @@ StopReason Driver::runInternal(
return blockDriver(self, i, std::move(future), blockingState, guard);
}

CALL_OPERATOR(
blockingReason_ = op->isBlocked(&future),
op,
curOperatorId_,
kOpMethodIsBlocked);
withDeltaCpuWallTimer(op, &OperatorStats::isBlockedTiming, [&]() {
CALL_OPERATOR(
blockingReason_ = op->isBlocked(&future),
op,
curOperatorId_,
kOpMethodIsBlocked);
});

if (blockingReason_ != BlockingReason::kNotBlocked) {
return blockDriver(self, i, std::move(future), blockingState, guard);
}

if (i < numOperators - 1) {
Operator* nextOp = operators_[i + 1].get();

CALL_OPERATOR(
blockingReason_ = nextOp->isBlocked(&future),
nextOp,
curOperatorId_ + 1,
kOpMethodIsBlocked);
withDeltaCpuWallTimer(op, &OperatorStats::isBlockedTiming, [&]() {
CALL_OPERATOR(
blockingReason_ = nextOp->isBlocked(&future),
nextOp,
curOperatorId_ + 1,
kOpMethodIsBlocked);
});
if (blockingReason_ != BlockingReason::kNotBlocked) {
return blockDriver(
self, i + 1, std::move(future), blockingState, guard);
Expand All @@ -587,27 +592,24 @@ StopReason Driver::runInternal(
if (needsInput) {
uint64_t resultBytes = 0;
RowVectorPtr intermediateResult;
{
withDeltaCpuWallTimer(op, &OperatorStats::getOutputTiming, [&]() {
TestValue::adjust(
"facebook::velox::exec::Driver::runInternal::getOutput",
op);
CALL_OPERATOR(
intermediateResult = op->getOutput(),
op,
curOperatorId_,
kOpMethodGetOutput);
if (intermediateResult) {
validateOperatorOutputResult(intermediateResult, *op);
resultBytes = intermediateResult->estimateFlatSize();
{
auto lockedStats = op->stats().wlock();
lockedStats->addOutputVector(
resultBytes, intermediateResult->size());
}
withDeltaCpuWallTimer(op, &OperatorStats::getOutputTiming, [&]() {
TestValue::adjust(
"facebook::velox::exec::Driver::runInternal::getOutput", op);
CALL_OPERATOR(
intermediateResult = op->getOutput(),
op,
curOperatorId_,
kOpMethodGetOutput);
if (intermediateResult) {
validateOperatorOutputResult(intermediateResult, *op);
resultBytes = intermediateResult->estimateFlatSize();
{
auto lockedStats = op->stats().wlock();
lockedStats->addOutputVector(
resultBytes, intermediateResult->size());
}
});
}
}
});
pushdownFilters(i);
if (intermediateResult) {
withDeltaCpuWallTimer(op, &OperatorStats::addInputTiming, [&]() {
Expand Down Expand Up @@ -643,21 +645,26 @@ StopReason Driver::runInternal(
// is not blocked and empty, this is finished. If this is
// not the source, just try to get output from the one
// before.
CALL_OPERATOR(
blockingReason_ = op->isBlocked(&future),
op,
curOperatorId_,
kOpMethodIsBlocked);
withDeltaCpuWallTimer(op, &OperatorStats::isBlockedTiming, [&]() {
CALL_OPERATOR(
blockingReason_ = op->isBlocked(&future),
op,
curOperatorId_,
kOpMethodIsBlocked);
});
if (blockingReason_ != BlockingReason::kNotBlocked) {
return blockDriver(
self, i, std::move(future), blockingState, guard);
}

bool finished{false};
CALL_OPERATOR(
finished = op->isFinished(),
op,
curOperatorId_,
kOpMethodIsFinished);
withDeltaCpuWallTimer(op, &OperatorStats::finishTiming, [&]() {
CALL_OPERATOR(
finished = op->isFinished(),
op,
curOperatorId_,
kOpMethodIsFinished);
});
if (finished) {
withDeltaCpuWallTimer(
op, &OperatorStats::finishTiming, [this, &nextOp]() {
Expand Down Expand Up @@ -704,11 +711,13 @@ StopReason Driver::runInternal(
}

bool finished{false};
CALL_OPERATOR(
finished = op->isFinished(),
op,
curOperatorId_,
kOpMethodIsFinished);
withDeltaCpuWallTimer(op, &OperatorStats::finishTiming, [&]() {
CALL_OPERATOR(
finished = op->isFinished(),
op,
curOperatorId_,
kOpMethodIsFinished);
});
if (finished) {
guard.notThrown();
close();
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,8 @@ void OperatorStats::add(const OperatorStats& other) {

finishTiming.add(other.finishTiming);

isBlockedTiming.add(other.isBlockedTiming);

backgroundTiming.add(other.backgroundTiming);

memoryStats.add(other.memoryStats);
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ struct OperatorStats {
/// read.
int64_t numSplits{0};

CpuWallTiming isBlockedTiming;

/// Bytes read from raw source, e.g. compressed file or network connection.
uint64_t rawInputBytes = 0;
uint64_t rawInputPositions = 0;
Expand Down
1 change: 1 addition & 0 deletions velox/exec/PlanNodeStats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ void PlanNodeStats::addTotals(const OperatorStats& stats) {
cpuWallTiming.add(stats.addInputTiming);
cpuWallTiming.add(stats.getOutputTiming);
cpuWallTiming.add(stats.finishTiming);
cpuWallTiming.add(stats.isBlockedTiming);

backgroundTiming.add(stats.backgroundTiming);

Expand Down
5 changes: 4 additions & 1 deletion velox/exec/tests/TaskTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,7 @@ DEBUG_ONLY_TEST_F(TaskTest, liveStats) {
EXPECT_EQ(32 * i, operatorStats.outputBytes);
EXPECT_EQ(3 * i, operatorStats.outputPositions);
EXPECT_EQ(i, operatorStats.outputVectors);
EXPECT_EQ(2 * (i + 1), operatorStats.isBlockedTiming.count);
EXPECT_EQ(0, operatorStats.finishTiming.count);
EXPECT_EQ(0, operatorStats.backgroundTiming.count);

Expand All @@ -1236,7 +1237,9 @@ DEBUG_ONLY_TEST_F(TaskTest, liveStats) {
EXPECT_EQ(32 * numBatches, operatorStats.outputBytes);
EXPECT_EQ(3 * numBatches, operatorStats.outputPositions);
EXPECT_EQ(numBatches, operatorStats.outputVectors);
EXPECT_EQ(1, operatorStats.finishTiming.count);
// isBlocked() should be called at least twice for each batch
EXPECT_LE(2 * numBatches, operatorStats.isBlockedTiming.count);
EXPECT_EQ(2, operatorStats.finishTiming.count);
// No operators with background CPU time yet.
EXPECT_EQ(0, operatorStats.backgroundTiming.count);

Expand Down

0 comments on commit f4ca3c6

Please sign in to comment.