Skip to content

Commit f162fd3

Browse files
authored
refactor: include metric output_batches into BaselineMetrics (#18491)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #17027 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> `output_batches` should be a common metric in all operators, thus should ideally be added to `BaselineMetrics` ``` > explain analyze select * from generate_series(1, 1000000) as t1(v1) order by v1 desc; +-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Plan with Metrics | SortExec: expr=[v1@0 DESC], preserve_partitioning=[false], metrics=[output_rows=1000000, elapsed_compute=535.320324ms, output_bytes=7.6 MB, output_batches=123, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, batches_split=0] | | | ProjectionExec: expr=[value@0 as v1], metrics=[output_rows=1000000, elapsed_compute=208.379µs, output_bytes=7.7 MB, output_batches=123] | | | LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=1000000, batch_size=8192], metrics=[output_rows=1000000, elapsed_compute=15.924291ms, output_bytes=7.7 MB, output_batches=123] | | | | +-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 1 row(s) fetched. Elapsed 0.492 second ``` ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> - Added `output_batches` into `BaselineMetrics` with `DEV` MetricType - Tracked through `record_poll()` API - Changes are similar to #18268 - Refactored `assert_metrics` macro to take multiple metrics strings for substring check - Added `output_bytes` and `output_batches` tracking in `TopK` operator - Added `baseline` metrics for `RepartitionExec` ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Added UT ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> Changes in the `EXPLAIN ANALYZE` output, `output_batches` will be added to `metrics=[...]`
1 parent 28755b1 commit f162fd3

File tree

19 files changed

+157
-117
lines changed

19 files changed

+157
-117
lines changed

datafusion/core/tests/sql/explain_analyze.rs

Lines changed: 60 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,9 @@ async fn explain_analyze_baseline_metrics() {
6161
assert_metrics!(
6262
&formatted,
6363
"AggregateExec: mode=Partial, gby=[]",
64-
"metrics=[output_rows=3, elapsed_compute="
65-
);
66-
assert_metrics!(
67-
&formatted,
68-
"AggregateExec: mode=Partial, gby=[]",
69-
"output_bytes="
64+
"metrics=[output_rows=3, elapsed_compute=",
65+
"output_bytes=",
66+
"output_batches=3"
7067
);
7168

7269
assert_metrics!(
@@ -75,59 +72,76 @@ async fn explain_analyze_baseline_metrics() {
7572
"reduction_factor=5.1% (5/99)"
7673
);
7774

78-
assert_metrics!(
79-
&formatted,
80-
"AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]",
81-
"metrics=[output_rows=5, elapsed_compute="
82-
);
83-
assert_metrics!(
84-
&formatted,
85-
"AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]",
86-
"output_bytes="
87-
);
88-
assert_metrics!(
89-
&formatted,
90-
"FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
91-
"metrics=[output_rows=99, elapsed_compute="
92-
);
75+
{
76+
let expected_batch_count_after_repartition =
77+
if cfg!(not(feature = "force_hash_collisions")) {
78+
"output_batches=3"
79+
} else {
80+
"output_batches=1"
81+
};
82+
83+
assert_metrics!(
84+
&formatted,
85+
"AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]",
86+
"metrics=[output_rows=5, elapsed_compute=",
87+
"output_bytes=",
88+
expected_batch_count_after_repartition
89+
);
90+
91+
assert_metrics!(
92+
&formatted,
93+
"RepartitionExec: partitioning=Hash([c1@0], 3), input_partitions=3",
94+
"metrics=[output_rows=5, elapsed_compute=",
95+
"output_bytes=",
96+
expected_batch_count_after_repartition
97+
);
98+
99+
assert_metrics!(
100+
&formatted,
101+
"ProjectionExec: expr=[]",
102+
"metrics=[output_rows=5, elapsed_compute=",
103+
"output_bytes=",
104+
expected_batch_count_after_repartition
105+
);
106+
107+
assert_metrics!(
108+
&formatted,
109+
"CoalesceBatchesExec: target_batch_size=4096",
110+
"metrics=[output_rows=5, elapsed_compute",
111+
"output_bytes=",
112+
expected_batch_count_after_repartition
113+
);
114+
}
115+
93116
assert_metrics!(
94117
&formatted,
95118
"FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
96-
"output_bytes="
119+
"metrics=[output_rows=99, elapsed_compute=",
120+
"output_bytes=",
121+
"output_batches=1"
97122
);
123+
98124
assert_metrics!(
99125
&formatted,
100126
"FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
101127
"selectivity=99% (99/100)"
102128
);
103-
assert_metrics!(
104-
&formatted,
105-
"ProjectionExec: expr=[]",
106-
"metrics=[output_rows=5, elapsed_compute="
107-
);
108-
assert_metrics!(&formatted, "ProjectionExec: expr=[]", "output_bytes=");
109-
assert_metrics!(
110-
&formatted,
111-
"CoalesceBatchesExec: target_batch_size=4096",
112-
"metrics=[output_rows=5, elapsed_compute"
113-
);
114-
assert_metrics!(
115-
&formatted,
116-
"CoalesceBatchesExec: target_batch_size=4096",
117-
"output_bytes="
118-
);
129+
119130
assert_metrics!(
120131
&formatted,
121132
"UnionExec",
122-
"metrics=[output_rows=3, elapsed_compute="
133+
"metrics=[output_rows=3, elapsed_compute=",
134+
"output_bytes=",
135+
"output_batches=3"
123136
);
124-
assert_metrics!(&formatted, "UnionExec", "output_bytes=");
137+
125138
assert_metrics!(
126139
&formatted,
127140
"WindowAggExec",
128-
"metrics=[output_rows=1, elapsed_compute="
141+
"metrics=[output_rows=1, elapsed_compute=",
142+
"output_bytes=",
143+
"output_batches=1"
129144
);
130-
assert_metrics!(&formatted, "WindowAggExec", "output_bytes=");
131145

132146
fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool {
133147
use datafusion::physical_plan;
@@ -228,9 +242,13 @@ async fn explain_analyze_level() {
228242

229243
for (level, needle, should_contain) in [
230244
(ExplainAnalyzeLevel::Summary, "spill_count", false),
245+
(ExplainAnalyzeLevel::Summary, "output_batches", false),
231246
(ExplainAnalyzeLevel::Summary, "output_rows", true),
247+
(ExplainAnalyzeLevel::Summary, "output_bytes", true),
232248
(ExplainAnalyzeLevel::Dev, "spill_count", true),
233249
(ExplainAnalyzeLevel::Dev, "output_rows", true),
250+
(ExplainAnalyzeLevel::Dev, "output_bytes", true),
251+
(ExplainAnalyzeLevel::Dev, "output_batches", true),
234252
] {
235253
let plan = collect_plan(sql, level).await;
236254
assert_eq!(

datafusion/core/tests/sql/mod.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,18 +40,24 @@ use std::io::Write;
4040
use std::path::PathBuf;
4141
use tempfile::TempDir;
4242

43-
/// A macro to assert that some particular line contains two substrings
43+
/// A macro to assert that some particular line contains the given substrings
4444
///
45-
/// Usage: `assert_metrics!(actual, operator_name, metrics)`
45+
/// Usage: `assert_metrics!(actual, operator_name, metrics_1, metrics_2, ...)`
4646
macro_rules! assert_metrics {
47-
($ACTUAL: expr, $OPERATOR_NAME: expr, $METRICS: expr) => {
47+
($ACTUAL: expr, $OPERATOR_NAME: expr, $($METRICS: expr),+) => {
4848
let found = $ACTUAL
4949
.lines()
50-
.any(|line| line.contains($OPERATOR_NAME) && line.contains($METRICS));
50+
.any(|line| line.contains($OPERATOR_NAME) $( && line.contains($METRICS))+);
51+
52+
let mut metrics = String::new();
53+
$(metrics.push_str(format!(" '{}',", $METRICS).as_str());)+
54+
// remove the last `,` from the string
55+
metrics.pop();
56+
5157
assert!(
5258
found,
53-
"Can not find a line with both '{}' and '{}' in\n\n{}",
54-
$OPERATOR_NAME, $METRICS, $ACTUAL
59+
"Cannot find a line with operator name '{}' and metrics containing values {} in :\n\n{}",
60+
$OPERATOR_NAME, metrics, $ACTUAL
5561
);
5662
};
5763
}

datafusion/physical-plan/src/joins/cross_join.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -650,7 +650,6 @@ impl<T: BatchTransformer> CrossJoinStream<T> {
650650
self.left_index += 1;
651651
}
652652

653-
self.join_metrics.output_batches.add(1);
654653
return Ok(StatefulStreamResult::Ready(Some(batch)));
655654
}
656655
}

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,6 @@ impl HashJoinStream {
494494
&self.column_indices,
495495
self.join_type,
496496
)?;
497-
self.join_metrics.output_batches.add(1);
498497
timer.done();
499498

500499
self.state = HashJoinStreamState::FetchProbeBatch;
@@ -597,7 +596,6 @@ impl HashJoinStream {
597596
)?
598597
};
599598

600-
self.join_metrics.output_batches.add(1);
601599
timer.done();
602600

603601
if next_offset.is_none() {
@@ -653,8 +651,6 @@ impl HashJoinStream {
653651
if let Ok(ref batch) = result {
654652
self.join_metrics.input_batches.add(1);
655653
self.join_metrics.input_rows.add(batch.num_rows());
656-
657-
self.join_metrics.output_batches.add(1);
658654
}
659655
timer.done();
660656

datafusion/physical-plan/src/joins/nested_loop_join.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1483,10 +1483,6 @@ impl NestedLoopJoinStream {
14831483
fn maybe_flush_ready_batch(&mut self) -> Option<Poll<Option<Result<RecordBatch>>>> {
14841484
if self.output_buffer.has_completed_batch() {
14851485
if let Some(batch) = self.output_buffer.next_completed_batch() {
1486-
// HACK: this is not part of `BaselineMetrics` yet, so update it
1487-
// manually
1488-
self.metrics.join_metrics.output_batches.add(1);
1489-
14901486
// Update output rows for selectivity metric
14911487
let output_rows = batch.num_rows();
14921488
self.metrics.selectivity.add_part(output_rows);

datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ pub(super) struct SortMergeJoinMetrics {
3131
input_batches: Count,
3232
/// Number of rows consumed by this operator
3333
input_rows: Count,
34-
/// Number of batches produced by this operator
35-
output_batches: Count,
3634
/// Execution metrics
3735
baseline_metrics: BaselineMetrics,
3836
/// Peak memory used for buffered data.
@@ -49,8 +47,6 @@ impl SortMergeJoinMetrics {
4947
let input_batches =
5048
MetricBuilder::new(metrics).counter("input_batches", partition);
5149
let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
52-
let output_batches =
53-
MetricBuilder::new(metrics).counter("output_batches", partition);
5450
let peak_mem_used = MetricBuilder::new(metrics).gauge("peak_mem_used", partition);
5551
let spill_metrics = SpillMetrics::new(metrics, partition);
5652

@@ -60,7 +56,6 @@ impl SortMergeJoinMetrics {
6056
join_time,
6157
input_batches,
6258
input_rows,
63-
output_batches,
6459
baseline_metrics,
6560
peak_mem_used,
6661
spill_metrics,
@@ -82,9 +77,6 @@ impl SortMergeJoinMetrics {
8277
pub fn input_rows(&self) -> Count {
8378
self.input_rows.clone()
8479
}
85-
pub fn output_batches(&self) -> Count {
86-
self.output_batches.clone()
87-
}
8880

8981
pub fn peak_mem_used(&self) -> Gauge {
9082
self.peak_mem_used.clone()

datafusion/physical-plan/src/joins/sort_merge_join/stream.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use std::task::{Context, Poll};
3535

3636
use crate::joins::sort_merge_join::metrics::SortMergeJoinMetrics;
3737
use crate::joins::utils::{compare_join_arrays, JoinFilter};
38+
use crate::metrics::RecordOutput;
3839
use crate::spill::spill_manager::SpillManager;
3940
use crate::{PhysicalExpr, RecordBatchStream, SendableRecordBatchStream};
4041

@@ -1462,10 +1463,7 @@ impl SortMergeJoinStream {
14621463
fn output_record_batch_and_reset(&mut self) -> Result<RecordBatch> {
14631464
let record_batch =
14641465
concat_batches(&self.schema, &self.staging_output_record_batches.batches)?;
1465-
self.join_metrics.output_batches().add(1);
1466-
self.join_metrics
1467-
.baseline_metrics()
1468-
.record_output(record_batch.num_rows());
1466+
(&record_batch).record_output(&self.join_metrics.baseline_metrics());
14691467
// If join filter exists, `self.output_size` is not accurate as we don't know the exact
14701468
// number of rows in the output record batch. If streamed row joined with buffered rows,
14711469
// once join filter is applied, the number of output rows may be more than 1.

datafusion/physical-plan/src/joins/stream_join_utils.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -682,8 +682,6 @@ pub struct StreamJoinMetrics {
682682
pub(crate) right: StreamJoinSideMetrics,
683683
/// Memory used by sides in bytes
684684
pub(crate) stream_memory_usage: metrics::Gauge,
685-
/// Number of batches produced by this operator
686-
pub(crate) output_batches: metrics::Count,
687685
/// Number of rows produced by this operator
688686
pub(crate) baseline_metrics: BaselineMetrics,
689687
}
@@ -709,13 +707,9 @@ impl StreamJoinMetrics {
709707
let stream_memory_usage =
710708
MetricBuilder::new(metrics).gauge("stream_memory_usage", partition);
711709

712-
let output_batches =
713-
MetricBuilder::new(metrics).counter("output_batches", partition);
714-
715710
Self {
716711
left,
717712
right,
718-
output_batches,
719713
stream_memory_usage,
720714
baseline_metrics: BaselineMetrics::new(metrics, partition),
721715
}

datafusion/physical-plan/src/joins/symmetric_hash_join.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1376,7 +1376,6 @@ impl<T: BatchTransformer> SymmetricHashJoinStream<T> {
13761376
}
13771377
}
13781378
Some((batch, _)) => {
1379-
self.metrics.output_batches.add(1);
13801379
return self
13811380
.metrics
13821381
.baseline_metrics

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1327,8 +1327,6 @@ pub(crate) struct BuildProbeJoinMetrics {
13271327
pub(crate) input_batches: metrics::Count,
13281328
/// Number of rows consumed by probe-side this operator
13291329
pub(crate) input_rows: metrics::Count,
1330-
/// Number of batches produced by this operator
1331-
pub(crate) output_batches: metrics::Count,
13321330
}
13331331

13341332
// This Drop implementation updates the elapsed compute part of the metrics.
@@ -1372,9 +1370,6 @@ impl BuildProbeJoinMetrics {
13721370

13731371
let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
13741372

1375-
let output_batches =
1376-
MetricBuilder::new(metrics).counter("output_batches", partition);
1377-
13781373
Self {
13791374
build_time,
13801375
build_input_batches,
@@ -1383,7 +1378,6 @@ impl BuildProbeJoinMetrics {
13831378
join_time,
13841379
input_batches,
13851380
input_rows,
1386-
output_batches,
13871381
baseline,
13881382
}
13891383
}

0 commit comments

Comments
 (0)