Skip to content

Commit

Permalink
PlanNodeStats add addInputTiming, getOutputTiming and finishTiming (#…
Browse files Browse the repository at this point in the history
…10986)

Summary:
New planNodeStats output
```
-- Project[4][expressions: (c0:INTEGER, ROW["c0"]), (p1:BIGINT, plus(ROW["c1"],1)), (p2:BIGINT, plus(ROW["c1"],ROW["u_c1"]))] -> c0:INTEGER, p1:BIGINT, p2:BIGINT
      Output: 2000 rows (154.34KB, 20 batches), Cpu time: 907.80us, Blocked wall time: 0ns, Peak memory: 2.00KB, Memory allocations: 40, Threads: 1, CPU breakdown: I/O/F (27.24us/872.82us/7.74us)
   -- HashJoin[3][INNER c0=u_c0] -> c0:INTEGER, c1:BIGINT, u_c1:BIGINT
      Output: 2000 rows (136.23KB, 20 batches), Cpu time: 508.74us, Blocked wall time: 0ns, Peak memory: 88.50KB, Memory allocations: 7, CPU breakdown: I/O/F (177.87us/329.20us/1.66us)
      HashBuild: Input: 100 rows (1.31KB, 1 batches), Output: 0 rows (0B, 0 batches), Cpu time: 41.77us, Blocked wall time: 0ns, Peak memory: 68.00KB, Memory allocations: 2, Threads: 1, CPU breakdown: I/O/F(40.18us/1.59us/0ns)
      HashProbe: Input: 2000 rows (118.12KB, 20 batches), Output: 2000 rows (136.23KB, 20 batches), Cpu time: 466.97us, Blocked wall time: 0ns, Peak memory: 20.50KB, Memory allocations: 5, Threads: 1, CPU breakdown: I/O/F (137.69us/327.61us/1.66us)
      -- TableScan[2][table: hive_table] -> c0:INTEGER, c1:BIGINT
         Input: 2000 rows (118.12KB, 20 batches), Raw Input: 20480 rows (72.79KB), Output: 2000 rows (118.12KB, 20 batches), Cpu time: 8.89ms, Blocked wall time: 10.00us, Peak memory: 80.38KB, Memory allocations: 262, Threads: 1, Splits: 20, DynamicFilter producer plan nodes: 3, CPU breakdown: I/O/F (0ns/8.88ms/4.93us)
      -- Project[1][expressions: (u_c0:INTEGER, ROW["c0"]), (u_c1:BIGINT, ROW["c1"])] -> u_c0:INTEGER, u_c1:BIGINT
         Output: 100 rows (1.31KB, 1 batches), Cpu time: 43.22us, Blocked wall time: 0ns, Peak memory: 0B, Memory allocations: 0, Threads: 1, CPU breakdown: I/O/F (691ns/5.54us/36.98us)
         -- Values[0][100 rows in 1 vectors] -> c0:INTEGER, c1:BIGINT
            Input: 0 rows (0B, 0 batches), Output: 100 rows (1.31KB, 1 batches), Cpu time: 3.05us, Blocked wall time: 0ns, Peak memory: 0B, Memory allocations: 0, Threads: 1, CPU breakdown: I/O/F (0ns/2.48us/568ns)
```

Pull Request resolved: #10986

Reviewed By: kevinwilfong

Differential Revision: D64335324

Pulled By: kgpai

fbshipit-source-id: 286e0cc01db9eb420ff92965397bde139495745a
  • Loading branch information
jinchengchenghh authored and facebook-github-bot committed Oct 15, 2024
1 parent 4682e76 commit c30c8f9
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 42 deletions.
46 changes: 26 additions & 20 deletions velox/docs/develop/debugging/print-plan-with-stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,17 @@ memory usage for each plan node.
.. code-block::
-> Project[expressions: (c0:INTEGER, ROW["c0"]), (p1:BIGINT, plus(ROW["c1"],1)), (p2:BIGINT, plus(ROW["c1"],ROW["u_c1"]))]
Output: 2000 rows (154.98KB), Cpu time: 695.33us, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1
Output: 2000 rows (154.98KB), Cpu time: 907.80us, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (27.24us/872.82us/7.74us)
-> HashJoin[INNER c0=u_c0]
Output: 2000 rows (136.88KB), Cpu time: 320.15us, Blocked wall time: 117.00us, Peak memory: 2.00MB
HashBuild: Input: 100 rows (1.31KB), Output: 0 rows (0B), Cpu time: 114.15us, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1
HashProbe: Input: 2000 rows (118.12KB), Output: 2000 rows (136.88KB), Cpu time: 206.01us, Blocked wall time: 117.00us, Peak memory: 1.00MB, Threads: 1
Output: 2000 rows (136.88KB), Cpu time: 508.74us, Blocked wall time: 117.00us, Peak memory: 2.00MB, CPU breakdown: I/O/F (177.87us/329.20us/1.66us)
HashBuild: Input: 100 rows (1.31KB), Output: 0 rows (0B), Cpu time: 41.77us, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (40.18us/1.59us/0ns)
HashProbe: Input: 2000 rows (118.12KB), Output: 2000 rows (136.88KB), Cpu time: 466.97us, Blocked wall time: 117.00us, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (137.69us/327.61us/1.66us)
-> TableScan[Table: Orders]
Input: 2000 rows (118.12KB), Raw Input: 20480 rows (72.31KB), Output: 2000 rows (118.12KB), Cpu time: 4.08ms, Blocked wall time: 5.00us, Peak memory: 1.00MB, Threads: 1, Splits: 20
Input: 2000 rows (118.12KB), Raw Input: 20480 rows (72.31KB), Output: 2000 rows (118.12KB), Cpu time: 8.89ms, Blocked wall time: 5.00us, Peak memory: 1.00MB, Threads: 1, Splits: 20, CPU breakdown: I/O/F (0ns/8.88ms/4.93us)
-> Project[expressions: (u_c0:INTEGER, ROW["c0"]), (u_c1:BIGINT, ROW["c1"])]
Output: 100 rows (1.31KB), Cpu time: 17.99us, Blocked wall time: 0ns, Peak memory: 0B, Threads: 1
Output: 100 rows (1.31KB), Cpu time: 43.22us, Blocked wall time: 0ns, Peak memory: 0B, Threads: 1, CPU breakdown: I/O/F (691ns/5.54us/36.98us)
-> Values[100 rows in 1 vectors]
Input: 0 rows (0B), Output: 100 rows (1.31KB), Cpu time: 5.38us, Blocked wall time: 0ns, Peak memory: 0B, Threads: 1
Input: 0 rows (0B), Output: 100 rows (1.31KB), Cpu time: 3.05us, Blocked wall time: 0ns, Peak memory: 0B, Threads: 1, CPU breakdown: I/O/F (0ns/2.48us/568ns)
With includeCustomStats flag enabled, printPlanWithStats adds operator-specific
statistics for each plan node, e.g. number of distinct values for the join key,
Expand All @@ -105,19 +105,19 @@ Here is the output for the join query from above.
.. code-block::
-> Project[expressions: (c0:INTEGER, ROW["c0"]), (p1:BIGINT, plus(ROW["c1"],1)), (p2:BIGINT, plus(ROW["c1"],ROW["u_c1"]))]
Output: 2000 rows (154.98KB), Cpu time: 1.11ms, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1
Output: 2000 rows (154.98KB), Cpu time: 907.80us, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (27.24us/872.82us/7.74us)
dataSourceLazyWallNanos sum: 473.00us, count: 20, min: 11.00us, max: 96.00us
-> HashJoin[INNER c0=u_c0]
Output: 2000 rows (136.88KB), Cpu time: 533.54us, Blocked wall time: 223.00us, Peak memory: 2.00MB
HashBuild: Input: 100 rows (1.31KB), Output: 0 rows (0B), Cpu time: 208.57us, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1
Output: 2000 rows (136.88KB), Cpu time: 508.74us, Blocked wall time: 223.00us, Peak memory: 2.00MB, CPU breakdown: I/O/F (177.87us/329.20us/1.66us)
HashBuild: Input: 100 rows (1.31KB), Output: 0 rows (0B), Cpu time: 41.77us, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (40.18us/1.59us/0ns)
distinctKey0 sum: 101, count: 1, min: 101, max: 101
queuedWallNanos sum: 125.00us, count: 1, min: 125.00us, max: 125.00us
rangeKey0 sum: 200, count: 1, min: 200, max: 200
HashProbe: Input: 2000 rows (118.12KB), Output: 2000 rows (136.88KB), Cpu time: 324.97us, Blocked wall time: 223.00us, Peak memory: 1.00MB, Threads: 1
HashProbe: Input: 2000 rows (118.12KB), Output: 2000 rows (136.88KB), Cpu time: 466.97us, Blocked wall time: 223.00us, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (137.69us/327.61us/1.66us)
dynamicFiltersProduced sum: 1, count: 1, min: 1, max: 1
queuedWallNanos sum: 24.00us, count: 1, min: 24.00us, max: 24.00us
-> TableScan[Table: Orders]
Input: 2000 rows (118.12KB), Raw Input: 20480 rows (72.31KB), Output: 2000 rows (118.12KB), Cpu time: 5.50ms, Blocked wall time: 10.00us, Peak memory: 1.00MB, Threads: 1, Splits: 20
Input: 2000 rows (118.12KB), Raw Input: 20480 rows (72.31KB), Output: 2000 rows (118.12KB), Cpu time: 8.89ms, Blocked wall time: 10.00us, Peak memory: 1.00MB, Threads: 1, Splits: 20, CPU breakdown: I/O/F (0ns/8.88ms/4.93us)
dataSourceWallNanos sum: 2.52ms, count: 40, min: 12.00us, max: 250.00us
dynamicFiltersAccepted sum: 1, count: 1, min: 1, max: 1
localReadBytes sum: 0B, count: 1, min: 0B, max: 0B
Expand All @@ -136,9 +136,9 @@ Here is the output for the join query from above.
totalRemainingFilterTime sum: 0ns, count: 1, min: 0ns, max: 0ns
queryThreadIoLatency sum: 0, count: 1, min: 0, max: 0
-> Project[expressions: (u_c0:INTEGER, ROW["c0"]), (u_c1:BIGINT, ROW["c1"])]
Output: 100 rows (1.31KB), Cpu time: 21.50us, Blocked wall time: 0ns, Peak memory: 0B, Threads: 1
Output: 100 rows (1.31KB), Cpu time: 43.22us, Blocked wall time: 0ns, Peak memory: 0B, Threads: 1, CPU breakdown: I/O/F (691ns/5.54us/36.98us)
-> Values[100 rows in 1 vectors]
Input: 0 rows (0B), Output: 100 rows (1.31KB), Cpu time: 12.14us, Blocked wall time: 0ns, Peak memory: 0B, Threads: 1
Input: 0 rows (0B), Output: 100 rows (1.31KB), Cpu time: 3.05us, Blocked wall time: 0ns, Peak memory: 0B, Threads: 1, CPU breakdown: I/O/F (0ns/2.48us/568ns)
And this is the output for the aggregation query from above.

Expand All @@ -147,18 +147,18 @@ And this is the output for the aggregation query from above.
.. code-block::
-> Aggregation[PARTIAL [c5] a0 := max(ROW["c0"]), a1 := sum(ROW["c1"]), a2 := sum(ROW["c2"]), a3 := sum(ROW["c3"]), a4 := sum(ROW["c4"])]
Output: 849 rows (84.38KB), Cpu time: 1.83ms, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1
Output: 849 rows (84.38KB), Cpu time: 1.96ms, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (1.38ms/579.12us/6.82us)
-> TableScan[Table: hive_table]
Input: 10000 rows (0B), Output: 10000 rows (0B), Cpu time: 810.13us, Blocked wall time: 25.00us, Peak memory: 1.00MB, Threads: 1, Splits: 1
Input: 10000 rows (0B), Output: 10000 rows (0B), Cpu time: 2.89ms, Blocked wall time: 25.00us, Peak memory: 1.00MB, Threads: 1, Splits: 1, CPU breakdown: I/O/F (0ns/2.89ms/3.35us)
`printPlanWithStats(*plan, task->taskStats(), true)` includes custom statistics:

.. code-block::
-> Aggregation[PARTIAL [c5] a0 := max(ROW["c0"]), a1 := sum(ROW["c1"]), a2 := sum(ROW["c2"]), a3 := sum(ROW["c3"]), a4 := sum(ROW["c4"])]
Output: 849 rows (84.38KB), Cpu time: 1.65ms, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1
Output: 849 rows (84.38KB), Cpu time: 1.96ms, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (1.38ms/579.12us/6.82us)
-> TableScan[Table: hive_table]
Input: 10000 rows (0B), Output: 10000 rows (0B), Cpu time: 759.00us, Blocked wall time: 30.00us, Peak memory: 1.00MB, Threads: 1, Splits: 1
Input: 10000 rows (0B), Output: 10000 rows (0B), Cpu time: 2.89ms, Blocked wall time: 30.00us, Peak memory: 1.00MB, Threads: 1, Splits: 1, CPU breakdown: I/O/F (0ns/2.89ms/3.35us)
dataSourceLazyWallNanos sum: 1.07ms, count: 7, min: 92.00us, max: 232.00us
dataSourceWallNanos sum: 329.00us, count: 2, min: 48.00us, max: 281.00us
loadedToValueHook sum: 50000, count: 5, min: 10000, max: 10000
Expand Down Expand Up @@ -189,7 +189,7 @@ their estimated sizes, cpu time, blocked wall time, and the number of threads us
.. code-block::
-> TableScan[Table: Orders]
Input: 2000 rows (118.12KB), Raw Input: 20480 rows (72.31KB), Output: 2000 rows (118.12KB), Cpu time: 5.50ms, Blocked wall time: 10.00us, Peak memory: 1.00MB, Threads: 1, Splits: 20
Input: 2000 rows (118.12KB), Raw Input: 20480 rows (72.31KB), Output: 2000 rows (118.12KB), Cpu time: 8.89ms, Blocked wall time: 10.00us, Peak memory: 1.00MB, Threads: 1, Splits: 20, CPU breakdown: I/O/F (0ns/8.88ms/4.93us)
printPlanWithStats shows output rows and
Expand All @@ -215,7 +215,13 @@ information is shown for all plan nodes.

.. code-block::
Cpu time: 5.50ms, Peak memory: 1.00MB
Cpu time: 8.89ms, Peak memory: 1.00MB
A breakdown of CPU time into addInput, getOutput and finish stages of the operator is also available. I/O/F below is a shortcut for addInput/getOutput/finish.

.. code-block::
CPU breakdown: I/O/F (0ns/8.88ms/4.93us)
Some operators like TableScan and HashProbe may be blocked waiting for splits or
hash tables. Velox records the total wall time an operator was blocked and
Expand Down
13 changes: 13 additions & 0 deletions velox/exec/PlanNodeStats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ void PlanNodeStats::addTotals(const OperatorStats& stats) {
outputBytes += stats.outputBytes;
outputVectors += stats.outputVectors;

addInputTiming.add(stats.addInputTiming);
getOutputTiming.add(stats.getOutputTiming);
finishTiming.add(stats.finishTiming);
cpuWallTiming.add(stats.addInputTiming);
cpuWallTiming.add(stats.getOutputTiming);
cpuWallTiming.add(stats.finishTiming);
Expand Down Expand Up @@ -123,6 +126,13 @@ std::string PlanNodeStats::toString(bool includeInputStats) const {
<< folly::join(',', dynamicFilterStats.producerNodeIds);
}

out << ", CPU breakdown: I/O/F "
<< fmt::format(
"({}/{}/{})",
succinctNanos(addInputTiming.cpuNanos),
succinctNanos(getOutputTiming.cpuNanos),
succinctNanos(finishTiming.cpuNanos));

return out.str();
}

Expand Down Expand Up @@ -163,6 +173,9 @@ folly::dynamic toPlanStatsJson(const facebook::velox::exec::TaskStats& stats) {
stat["outputRows"] = operatorStat.second->outputRows;
stat["outputVectors"] = operatorStat.second->outputVectors;
stat["outputBytes"] = operatorStat.second->outputBytes;
stat["addInputTiming"] = operatorStat.second->addInputTiming.toString();
stat["getOutputTiming"] = operatorStat.second->getOutputTiming.toString();
stat["finishTiming"] = operatorStat.second->finishTiming.toString();
stat["cpuWallTiming"] = operatorStat.second->cpuWallTiming.toString();
stat["blockedWallNanos"] = operatorStat.second->blockedWallNanos;
stat["peakMemoryBytes"] = operatorStat.second->peakMemoryBytes;
Expand Down
12 changes: 12 additions & 0 deletions velox/exec/PlanNodeStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,18 @@ struct PlanNodeStats {
/// Sum of output bytes for all corresponding operators.
uint64_t outputBytes{0};

// Sum of CPU, scheduled and wall times for addInput call for all
// corresponding operators.
CpuWallTiming addInputTiming;

// Sum of CPU, scheduled and wall times for noMoreInput call for all
// corresponding operators.
CpuWallTiming finishTiming;

// Sum of CPU, scheduled and wall times for getOutput call for all
// corresponding operators.
CpuWallTiming getOutputTiming;

/// Sum of CPU, scheduled and wall times for all corresponding operators. For
/// each operator, timing of addInput, getOutput and finish calls are added
/// up.
Expand Down
Loading

0 comments on commit c30c8f9

Please sign in to comment.