diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index e4d554ceb62c..a5aaaa4d3391 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -559,7 +559,8 @@ impl CrossJoinStream { handle_state!(ready!(self.fetch_probe_batch(cx))) } CrossJoinStreamState::BuildBatches(_) => { - handle_state!(self.build_batches()) + let poll = handle_state!(self.build_batches()); + self.join_metrics.baseline.record_poll(poll) } }; } @@ -632,7 +633,6 @@ impl CrossJoinStream { } self.join_metrics.output_batches.add(1); - self.join_metrics.output_rows.add(batch.num_rows()); return Ok(StatefulStreamResult::Ready(Some(batch))); } } diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 770399290dca..1f8ea305117a 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -1403,10 +1403,12 @@ impl HashJoinStream { handle_state!(ready!(self.fetch_probe_batch(cx))) } HashJoinStreamState::ProcessProbeBatch(_) => { - handle_state!(self.process_probe_batch()) + let poll = handle_state!(self.process_probe_batch()); + self.join_metrics.baseline.record_poll(poll) } HashJoinStreamState::ExhaustedProbeSide => { - handle_state!(self.process_unmatched_build_batch()) + let poll = handle_state!(self.process_unmatched_build_batch()); + self.join_metrics.baseline.record_poll(poll) } HashJoinStreamState::Completed => Poll::Ready(None), }; @@ -1582,7 +1584,6 @@ impl HashJoinStream { }; self.join_metrics.output_batches.add(1); - self.join_metrics.output_rows.add(result.num_rows()); timer.done(); if next_offset.is_none() { @@ -1639,7 +1640,6 @@ impl HashJoinStream { self.join_metrics.input_rows.add(batch.num_rows()); self.join_metrics.output_batches.add(1); - self.join_metrics.output_rows.add(batch.num_rows()); } timer.done(); diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index fcc1107a0e26..8e4ffe313355 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -825,7 +825,8 @@ impl NestedLoopJoinStream { handle_state!(ready!(self.fetch_probe_batch(cx))) } NestedLoopJoinStreamState::ProcessProbeBatch(_) => { - handle_state!(self.process_probe_batch()) + let poll = handle_state!(self.process_probe_batch()); + self.join_metrics.baseline.record_poll(poll) } NestedLoopJoinStreamState::ExhaustedProbeSide => { handle_state!(self.process_unmatched_build_batch()) @@ -912,7 +913,6 @@ impl NestedLoopJoinStream { } self.join_metrics.output_batches.add(1); - self.join_metrics.output_rows.add(batch.num_rows()); Ok(StatefulStreamResult::Ready(Some(batch))) } } diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index c5f7087ac195..4249e479c978 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -26,7 +26,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use crate::joins::SharedBitmapBuilder; -use crate::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder}; +use crate::metrics::{self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder}; use crate::projection::ProjectionExec; use crate::{ ColumnStatistics, ExecutionPlan, ExecutionPlanProperties, Partitioning, Statistics, @@ -1196,6 +1196,7 @@ fn append_probe_indices_in_order( /// Metrics for build & probe joins #[derive(Clone, Debug)] pub(crate) struct BuildProbeJoinMetrics { + pub(crate) baseline: BaselineMetrics, /// Total time for collecting build-side of join pub(crate) build_time: metrics::Time, /// Number of batches consumed by build-side @@ -1212,12 +1213,31 @@ pub(crate) struct BuildProbeJoinMetrics { pub(crate) input_rows: metrics::Count, /// Number of batches produced by this operator pub(crate) output_batches: metrics::Count, - /// Number of rows produced by this operator - pub(crate) output_rows: metrics::Count, +} + +// This Drop implementation updates the elapsed compute part of the metrics. +// +// Why is this in a Drop? +// - We keep track of build_time and join_time separately, but baseline metrics have +// a total elapsed_compute time. Instead of remembering to update both the metrics +// at the same time, we chose to update elapsed_compute once at the end - summing up +// both the parts. +// +// How does this work? +// - The elapsed_compute `Time` is represented by an `Arc`. So even when +// this `BuildProbeJoinMetrics` is dropped, the elapsed_compute is usable through the +// Arc reference. +impl Drop for BuildProbeJoinMetrics { + fn drop(&mut self) { + self.baseline.elapsed_compute().add(&self.build_time); + self.baseline.elapsed_compute().add(&self.join_time); + } } impl BuildProbeJoinMetrics { pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self { + let baseline = BaselineMetrics::new(metrics, partition); + let join_time = MetricBuilder::new(metrics).subset_time("join_time", partition); let build_time = MetricBuilder::new(metrics).subset_time("build_time", partition); @@ -1239,8 +1259,6 @@ impl BuildProbeJoinMetrics { let output_batches = MetricBuilder::new(metrics).counter("output_batches", partition); - let output_rows = MetricBuilder::new(metrics).output_rows(partition); - Self { build_time, build_input_batches, @@ -1250,7 +1268,7 @@ impl BuildProbeJoinMetrics { input_batches, input_rows, output_batches, - output_rows, + baseline, } } }