From 42940c8bb790e354be124a1f059ed02f6cf28f1e Mon Sep 17 00:00:00 2001 From: Pedro Eugenio Rocha Pedreira Date: Fri, 27 Sep 2024 15:57:26 -0700 Subject: [PATCH] Properly report lazy loaded inputBytes (#11097) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/11097 Whenever there is lazy loading, TableScan would come out with zero input and output bytes, and they would be attributed to the operator that in fact loaded the lazy vector. Using the existing mechanism to save it locally and periodically apply to the correct source operator. Reviewed By: xiaoxmeng Differential Revision: D63414708 fbshipit-source-id: d1bbc64fc7ec1ed952593aaeece0b02daed60038 --- velox/common/time/CpuWallTimer.cpp | 2 + velox/common/time/CpuWallTimer.h | 34 +++++++++++------ velox/exec/Driver.cpp | 26 +++++++++++-- velox/exec/Driver.h | 13 +++---- velox/exec/Operator.h | 1 + velox/exec/tests/PrintPlanWithStatsTest.cpp | 3 ++ velox/exec/tests/TableScanTest.cpp | 7 +++- velox/vector/LazyVector.cpp | 41 ++++++++++++++++----- velox/vector/LazyVector.h | 2 + velox/vector/tests/LazyVectorTest.cpp | 6 ++- 10 files changed, 99 insertions(+), 36 deletions(-) diff --git a/velox/common/time/CpuWallTimer.cpp b/velox/common/time/CpuWallTimer.cpp index 7a1e137ba91f..e21944b05d1c 100644 --- a/velox/common/time/CpuWallTimer.cpp +++ b/velox/common/time/CpuWallTimer.cpp @@ -17,6 +17,7 @@ #include "velox/common/time/CpuWallTimer.h" namespace facebook::velox { + CpuWallTimer::CpuWallTimer(CpuWallTiming& timing) : timing_(timing) { ++timing_.count; cpuTimeStart_ = process::threadCpuNanos(); @@ -29,4 +30,5 @@ CpuWallTimer::~CpuWallTimer() { std::chrono::steady_clock::now() - wallTimeStart_); timing_.wallNanos += duration.count(); } + } // namespace facebook::velox diff --git a/velox/common/time/CpuWallTimer.h b/velox/common/time/CpuWallTimer.h index f60f23c19dc9..6562364a2942 100644 --- a/velox/common/time/CpuWallTimer.h +++ b/velox/common/time/CpuWallTimer.h @@ -57,19 +57,14 @@ class CpuWallTimer { CpuWallTiming& timing_; }; -// Keeps track of elapsed CPU and wall time from construction time. -// Composes delta CpuWallTiming upon destruction and passes it to the user -// callback, where it can be added to the user's CpuWallTiming using -// CpuWallTiming::add(). -template -class DeltaCpuWallTimer { +/// Keeps track of elapsed CPU and wall time from construction time. +class DeltaCpuWallTimeStopWatch { public: - explicit DeltaCpuWallTimer(F&& func) + explicit DeltaCpuWallTimeStopWatch() : wallTimeStart_(std::chrono::steady_clock::now()), - cpuTimeStart_(process::threadCpuNanos()), - func_(std::move(func)) {} + cpuTimeStart_(process::threadCpuNanos()) {} - ~DeltaCpuWallTimer() { + CpuWallTiming elapsed() const { // NOTE: End the cpu-time timing first, and then end the wall-time timing, // so as to avoid the counter-intuitive phenomenon that the final calculated // cpu-time is slightly larger than the wall-time. @@ -78,8 +73,7 @@ class DeltaCpuWallTimer { std::chrono::duration_cast( std::chrono::steady_clock::now() - wallTimeStart_) .count(); - const CpuWallTiming deltaTiming{1, wallTimeDuration, cpuTimeDuration}; - func_(deltaTiming); + return CpuWallTiming{1, wallTimeDuration, cpuTimeDuration}; } private: @@ -87,6 +81,22 @@ class DeltaCpuWallTimer { // counting earlier than cpu-time. const std::chrono::steady_clock::time_point wallTimeStart_; const uint64_t cpuTimeStart_; +}; + +/// Composes delta CpuWallTiming upon destruction and passes it to the user +/// callback, where it can be added to the user's CpuWallTiming using +/// CpuWallTiming::add(). +template +class DeltaCpuWallTimer { + public: + explicit DeltaCpuWallTimer(F&& func) : func_(std::move(func)) {} + + ~DeltaCpuWallTimer() { + func_(timer_.elapsed()); + } + + private: + DeltaCpuWallTimeStopWatch timer_; F func_; }; diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index b818e17d2c00..d730d39e8d81 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -399,20 +399,22 @@ size_t OpCallStatusRaw::callDuration() const { : fmt::format("null::{}", operatorMethod); } -CpuWallTiming Driver::processLazyTiming( +CpuWallTiming Driver::processLazyIoStats( Operator& op, const CpuWallTiming& timing) { if (&op == operators_[0].get()) { return timing; } auto lockStats = op.stats().wlock(); + + // Checks and tries to update cpu time from lazy loads. auto it = lockStats->runtimeStats.find(LazyVector::kCpuNanos); if (it == lockStats->runtimeStats.end()) { // Return early if no lazy activity. Lazy CPU and wall times are recorded // together, checking one is enough. return timing; } - int64_t cpu = it->second.sum; + const int64_t cpu = it->second.sum; auto cpuDelta = std::max(0, cpu - lockStats->lastLazyCpuNanos); if (cpuDelta == 0) { // Return early if no change. Checking one counter is enough. If this did @@ -421,15 +423,29 @@ CpuWallTiming Driver::processLazyTiming( return timing; } lockStats->lastLazyCpuNanos = cpu; + + // Checks and tries to update wall time from lazy loads. int64_t wallDelta = 0; it = lockStats->runtimeStats.find(LazyVector::kWallNanos); if (it != lockStats->runtimeStats.end()) { - int64_t wall = it->second.sum; + const int64_t wall = it->second.sum; wallDelta = std::max(0, wall - lockStats->lastLazyWallNanos); if (wallDelta > 0) { lockStats->lastLazyWallNanos = wall; } } + + // Checks and tries to update input bytes from lazy loads. + int64_t inputBytesDelta = 0; + it = lockStats->runtimeStats.find(LazyVector::kInputBytes); + if (it != lockStats->runtimeStats.end()) { + const int64_t inputBytes = it->second.sum; + inputBytesDelta = inputBytes - lockStats->lastLazyInputBytes; + if (inputBytesDelta > 0) { + lockStats->lastLazyInputBytes = inputBytes; + } + } + lockStats.unlock(); cpuDelta = std::min(cpuDelta, timing.cpuNanos); wallDelta = std::min(wallDelta, timing.wallNanos); @@ -439,6 +455,8 @@ CpuWallTiming Driver::processLazyTiming( static_cast(wallDelta), static_cast(cpuDelta), }); + lockStats->inputBytes += inputBytesDelta; + lockStats->outputBytes += inputBytesDelta; return CpuWallTiming{ 1, timing.wallNanos - wallDelta, @@ -1016,7 +1034,7 @@ void Driver::withDeltaCpuWallTimer( // opTimingMember upon destruction of the timer when withDeltaCpuWallTimer // ends. The timer is created on the stack to avoid heap allocation auto f = [op, opTimingMember, this](const CpuWallTiming& elapsedTime) { - auto elapsedSelfTime = processLazyTiming(*op, elapsedTime); + auto elapsedSelfTime = processLazyIoStats(*op, elapsedTime); op->stats().withWLock([&](auto& lockedStats) { (lockedStats.*opTimingMember).add(elapsedSelfTime); }); diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index 6563f204b17f..a1f9d087321d 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -532,13 +532,12 @@ class Driver : public std::enable_shared_from_this { TimingMemberPtr opTimingMember, Func&& opFunction); - // Adjusts 'timing' by removing the lazy load wall and CPU times - // accrued since last time timing information was recorded for - // 'op'. The accrued lazy load times are credited to the source - // operator of 'this'. The per-operator runtimeStats for lazy load - // are left in place to reflect which operator triggered the load - // but these do not bias the op's timing. - CpuWallTiming processLazyTiming(Operator& op, const CpuWallTiming& timing); + // Adjusts 'timing' by removing the lazy load wall time, CPU time, and input + // bytes accrued since last time timing information was recorded for 'op'. The + // accrued lazy load times are credited to the source operator of 'this'. The + // per-operator runtimeStats for lazy load are left in place to reflect which + // operator triggered the load but these do not bias the op's timing. + CpuWallTiming processLazyIoStats(Operator& op, const CpuWallTiming& timing); inline void validateOperatorOutputResult( const RowVectorPtr& result, diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 1c56a3bfb294..711a59da0c71 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -177,6 +177,7 @@ struct OperatorStats { // Last recorded values for lazy loading times for loads triggered by 'this'. int64_t lastLazyCpuNanos{0}; int64_t lastLazyWallNanos{0}; + int64_t lastLazyInputBytes{0}; // Total null keys processed by the operator. // Currently populated only by HashJoin/HashBuild. diff --git a/velox/exec/tests/PrintPlanWithStatsTest.cpp b/velox/exec/tests/PrintPlanWithStatsTest.cpp index 9b52725c3fee..3539c301341b 100644 --- a/velox/exec/tests/PrintPlanWithStatsTest.cpp +++ b/velox/exec/tests/PrintPlanWithStatsTest.cpp @@ -152,6 +152,7 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) { {{"-- Project\\[4\\]\\[expressions: \\(c0:INTEGER, ROW\\[\"c0\"\\]\\), \\(p1:BIGINT, plus\\(ROW\\[\"c1\"\\],1\\)\\), \\(p2:BIGINT, plus\\(ROW\\[\"c1\"\\],ROW\\[\"u_c1\"\\]\\)\\)\\] -> c0:INTEGER, p1:BIGINT, p2:BIGINT"}, {" Output: 2000 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1"}, {" dataSourceLazyCpuNanos[ ]* sum: .+, count: .+, min: .+, max: .+"}, + {" dataSourceLazyInputBytes[ ]* sum: .+, count: .+, min: .+, max: .+"}, {" dataSourceLazyWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, @@ -269,6 +270,7 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) { {{"-- Aggregation\\[1\\]\\[PARTIAL \\[c5\\] a0 := max\\(ROW\\[\"c0\"\\]\\), a1 := sum\\(ROW\\[\"c1\"\\]\\), a2 := sum\\(ROW\\[\"c2\"\\]\\), a3 := sum\\(ROW\\[\"c3\"\\]\\), a4 := sum\\(ROW\\[\"c4\"\\]\\)\\] -> c5:VARCHAR, a0:BIGINT, a1:BIGINT, a2:BIGINT, a3:DOUBLE, a4:DOUBLE"}, {" Output: .+, Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1"}, {" dataSourceLazyCpuNanos\\s+sum: .+, count: .+, min: .+, max: .+"}, + {" dataSourceLazyInputBytes\\s+sum: .+, count: .+, min: .+, max: .+"}, {" dataSourceLazyWallNanos\\s+sum: .+, count: .+, min: .+, max: .+"}, {" distinctKey0\\s+sum: .+, count: 1, min: .+, max: .+"}, {" hashtable.capacity\\s+sum: 1252, count: 1, min: 1252, max: 1252"}, @@ -345,6 +347,7 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) { {{R"(-- TableWrite\[1\]\[.+InsertTableHandle .+)"}, {" Output: .+, Physical written output: .+, Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1"}, {" dataSourceLazyCpuNanos\\s+sum: .+, count: .+, min: .+, max: .+"}, + {" dataSourceLazyInputBytes\\s+sum: .+, count: .+, min: .+, max: .+"}, {" dataSourceLazyWallNanos\\s+sum: .+, count: .+, min: .+, max: .+"}, {" numWrittenFiles\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index db96a5089460..85643fd4134d 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -3815,7 +3815,12 @@ TEST_F(TableScanTest, structLazy) { .project({"cardinality(c2.c0)"}) .planNode(); - assertQuery(op, {filePath}, "select c0 % 3 from tmp"); + auto task = assertQuery(op, {filePath}, "select c0 % 3 from tmp"); + + // Ensure lazy stats are attributed to table scan. + const auto stats = task->taskStats(); + EXPECT_GT(stats.pipelineStats[0].operatorStats[0].inputBytes, 0); + EXPECT_GT(stats.pipelineStats[0].operatorStats[0].outputBytes, 0); } TEST_F(TableScanTest, interleaveLazyEager) { diff --git a/velox/vector/LazyVector.cpp b/velox/vector/LazyVector.cpp index d0cc09c3d59e..a614ead8452c 100644 --- a/velox/vector/LazyVector.cpp +++ b/velox/vector/LazyVector.cpp @@ -23,16 +23,37 @@ #include "velox/vector/SelectivityVector.h" namespace facebook::velox { - namespace { -void writeIOTiming(const CpuWallTiming& delta) { - addThreadLocalRuntimeStat( - LazyVector::kWallNanos, - RuntimeCounter(delta.wallNanos, RuntimeCounter::Unit::kNanos)); - addThreadLocalRuntimeStat( - LazyVector::kCpuNanos, - RuntimeCounter(delta.cpuNanos, RuntimeCounter::Unit::kNanos)); -} + +// Convenience class to record cpu and wall time from construction, updating +// thread local stats at destruction, including input bytes of the vector passed +// as parameter. +class LazyIoStatsRecorder { + public: + LazyIoStatsRecorder(VectorPtr* vector) : vector_(vector) {} + + ~LazyIoStatsRecorder() { + auto cpuDelta = timer_.elapsed(); + addThreadLocalRuntimeStat( + LazyVector::kWallNanos, + RuntimeCounter(cpuDelta.wallNanos, RuntimeCounter::Unit::kNanos)); + addThreadLocalRuntimeStat( + LazyVector::kCpuNanos, + RuntimeCounter(cpuDelta.cpuNanos, RuntimeCounter::Unit::kNanos)); + + if (*vector_) { + addThreadLocalRuntimeStat( + LazyVector::kInputBytes, + RuntimeCounter( + (*vector_)->estimateFlatSize(), RuntimeCounter::Unit::kBytes)); + } + } + + private: + DeltaCpuWallTimeStopWatch timer_; + VectorPtr* vector_; +}; + } // namespace void VectorLoader::load( @@ -41,7 +62,7 @@ void VectorLoader::load( vector_size_t resultSize, VectorPtr* result) { { - DeltaCpuWallTimer timer([&](auto& delta) { writeIOTiming(delta); }); + LazyIoStatsRecorder recorder(result); loadInternal(rows, hook, resultSize, result); } if (hook) { diff --git a/velox/vector/LazyVector.h b/velox/vector/LazyVector.h index 30354af615ab..3957002a1d31 100644 --- a/velox/vector/LazyVector.h +++ b/velox/vector/LazyVector.h @@ -224,6 +224,8 @@ class LazyVector : public BaseVector { public: static constexpr const char* kCpuNanos = "dataSourceLazyCpuNanos"; static constexpr const char* kWallNanos = "dataSourceLazyWallNanos"; + static constexpr const char* kInputBytes = "dataSourceLazyInputBytes"; + LazyVector( velox::memory::MemoryPool* pool, TypePtr type, diff --git a/velox/vector/tests/LazyVectorTest.cpp b/velox/vector/tests/LazyVectorTest.cpp index 2ad3cbc8e446..aee10a50bc42 100644 --- a/velox/vector/tests/LazyVectorTest.cpp +++ b/velox/vector/tests/LazyVectorTest.cpp @@ -639,9 +639,11 @@ TEST_F(LazyVectorTest, runtimeStats) { std::sort(stats.begin(), stats.end(), [](auto& x, auto& y) { return x.first < y.first; }); - ASSERT_EQ(stats.size(), 2); + ASSERT_EQ(stats.size(), 3); ASSERT_EQ(stats[0].first, LazyVector::kCpuNanos); ASSERT_GE(stats[0].second.value, 0); - ASSERT_EQ(stats[1].first, LazyVector::kWallNanos); + ASSERT_EQ(stats[1].first, LazyVector::kInputBytes); ASSERT_GE(stats[1].second.value, 0); + ASSERT_EQ(stats[2].first, LazyVector::kWallNanos); + ASSERT_GE(stats[2].second.value, 0); }