Skip to content

Commit 6fb44a6

Browse files
committed
chore: refactor BuildProbeJoinMetrics to use BaselineMetrics
Closes #16495 Here's an example of an `explain analyze` of a hash join showing these metrics: ``` [(WatchID@0, WatchID@0)], metrics=[output_rows=100, elapsed_compute=2.313624ms, build_input_batches=1, build_input_rows=100, input_batches=1, input_rows=100, output_batches=1, build_mem_used=3688, build_time=865.832µs, join_time=1.369875ms] ``` Notice `output_rows=100, elapsed_compute=2.313624ms` in the above.
1 parent 85eebcd commit 6fb44a6

File tree

4 files changed

+32
-14
lines changed

4 files changed

+32
-14
lines changed

datafusion/physical-plan/src/joins/cross_join.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,8 @@ impl<T: BatchTransformer> CrossJoinStream<T> {
559559
handle_state!(ready!(self.fetch_probe_batch(cx)))
560560
}
561561
CrossJoinStreamState::BuildBatches(_) => {
562-
handle_state!(self.build_batches())
562+
let poll = handle_state!(self.build_batches());
563+
self.join_metrics.baseline.record_poll(poll)
563564
}
564565
};
565566
}
@@ -632,7 +633,6 @@ impl<T: BatchTransformer> CrossJoinStream<T> {
632633
}
633634

634635
self.join_metrics.output_batches.add(1);
635-
self.join_metrics.output_rows.add(batch.num_rows());
636636
return Ok(StatefulStreamResult::Ready(Some(batch)));
637637
}
638638
}

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1403,10 +1403,12 @@ impl HashJoinStream {
14031403
handle_state!(ready!(self.fetch_probe_batch(cx)))
14041404
}
14051405
HashJoinStreamState::ProcessProbeBatch(_) => {
1406-
handle_state!(self.process_probe_batch())
1406+
let poll = handle_state!(self.process_probe_batch());
1407+
self.join_metrics.baseline.record_poll(poll)
14071408
}
14081409
HashJoinStreamState::ExhaustedProbeSide => {
1409-
handle_state!(self.process_unmatched_build_batch())
1410+
let poll = handle_state!(self.process_unmatched_build_batch());
1411+
self.join_metrics.baseline.record_poll(poll)
14101412
}
14111413
HashJoinStreamState::Completed => Poll::Ready(None),
14121414
};
@@ -1582,7 +1584,6 @@ impl HashJoinStream {
15821584
};
15831585

15841586
self.join_metrics.output_batches.add(1);
1585-
self.join_metrics.output_rows.add(result.num_rows());
15861587
timer.done();
15871588

15881589
if next_offset.is_none() {
@@ -1639,7 +1640,6 @@ impl HashJoinStream {
16391640
self.join_metrics.input_rows.add(batch.num_rows());
16401641

16411642
self.join_metrics.output_batches.add(1);
1642-
self.join_metrics.output_rows.add(batch.num_rows());
16431643
}
16441644
timer.done();
16451645

datafusion/physical-plan/src/joins/nested_loop_join.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -825,7 +825,8 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
825825
handle_state!(ready!(self.fetch_probe_batch(cx)))
826826
}
827827
NestedLoopJoinStreamState::ProcessProbeBatch(_) => {
828-
handle_state!(self.process_probe_batch())
828+
let poll = handle_state!(self.process_probe_batch());
829+
self.join_metrics.baseline.record_poll(poll)
829830
}
830831
NestedLoopJoinStreamState::ExhaustedProbeSide => {
831832
handle_state!(self.process_unmatched_build_batch())
@@ -912,7 +913,6 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
912913
}
913914

914915
self.join_metrics.output_batches.add(1);
915-
self.join_metrics.output_rows.add(batch.num_rows());
916916
Ok(StatefulStreamResult::Ready(Some(batch)))
917917
}
918918
}

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use std::sync::Arc;
2626
use std::task::{Context, Poll};
2727

2828
use crate::joins::SharedBitmapBuilder;
29-
use crate::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder};
29+
use crate::metrics::{self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder};
3030
use crate::projection::ProjectionExec;
3131
use crate::{
3232
ColumnStatistics, ExecutionPlan, ExecutionPlanProperties, Partitioning, Statistics,
@@ -1196,6 +1196,7 @@ fn append_probe_indices_in_order(
11961196
/// Metrics for build & probe joins
11971197
#[derive(Clone, Debug)]
11981198
pub(crate) struct BuildProbeJoinMetrics {
1199+
pub(crate) baseline: BaselineMetrics,
11991200
/// Total time for collecting build-side of join
12001201
pub(crate) build_time: metrics::Time,
12011202
/// Number of batches consumed by build-side
@@ -1212,12 +1213,31 @@ pub(crate) struct BuildProbeJoinMetrics {
12121213
pub(crate) input_rows: metrics::Count,
12131214
/// Number of batches produced by this operator
12141215
pub(crate) output_batches: metrics::Count,
1215-
/// Number of rows produced by this operator
1216-
pub(crate) output_rows: metrics::Count,
1216+
}
1217+
1218+
// This Drop implementation updates the elapsed compute part of the metrics.
1219+
//
1220+
// Why is this in a Drop?
1221+
// - We keep track of build_time and join_time separately, but baseline metrics have
1222+
// a total elapsed_compute time. Instead of remembering to update both the metrics
1223+
// at the same time, we chose to update elapsed_compute once at the end - summing up
1224+
// both the parts.
1225+
//
1226+
// How does this work?
1227+
// - The elapsed_compute `Time` is represented by an `Arc<AtomicUsize>`. So even when
1228+
// this `BuildProbeJoinMetrics` is dropped, the elapsed_compute is usable through the
1229+
// Arc reference.
1230+
impl Drop for BuildProbeJoinMetrics {
1231+
fn drop(&mut self) {
1232+
self.baseline.elapsed_compute().add(&self.build_time);
1233+
self.baseline.elapsed_compute().add(&self.join_time);
1234+
}
12171235
}
12181236

12191237
impl BuildProbeJoinMetrics {
12201238
pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
1239+
let baseline = BaselineMetrics::new(metrics, partition);
1240+
12211241
let join_time = MetricBuilder::new(metrics).subset_time("join_time", partition);
12221242

12231243
let build_time = MetricBuilder::new(metrics).subset_time("build_time", partition);
@@ -1239,8 +1259,6 @@ impl BuildProbeJoinMetrics {
12391259
let output_batches =
12401260
MetricBuilder::new(metrics).counter("output_batches", partition);
12411261

1242-
let output_rows = MetricBuilder::new(metrics).output_rows(partition);
1243-
12441262
Self {
12451263
build_time,
12461264
build_input_batches,
@@ -1250,7 +1268,7 @@ impl BuildProbeJoinMetrics {
12501268
input_batches,
12511269
input_rows,
12521270
output_batches,
1253-
output_rows,
1271+
baseline,
12541272
}
12551273
}
12561274
}

0 commit comments

Comments
 (0)