Skip to content

chore: refactor BuildProbeJoinMetrics to use BaselineMetrics #16500

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,8 @@ impl<T: BatchTransformer> CrossJoinStream<T> {
handle_state!(ready!(self.fetch_probe_batch(cx)))
}
CrossJoinStreamState::BuildBatches(_) => {
handle_state!(self.build_batches())
let poll = handle_state!(self.build_batches());
self.join_metrics.baseline.record_poll(poll)
}
};
}
Expand Down Expand Up @@ -632,7 +633,6 @@ impl<T: BatchTransformer> CrossJoinStream<T> {
}

self.join_metrics.output_batches.add(1);
self.join_metrics.output_rows.add(batch.num_rows());
return Ok(StatefulStreamResult::Ready(Some(batch)));
}
}
Expand Down
8 changes: 4 additions & 4 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1403,10 +1403,12 @@ impl HashJoinStream {
handle_state!(ready!(self.fetch_probe_batch(cx)))
}
HashJoinStreamState::ProcessProbeBatch(_) => {
handle_state!(self.process_probe_batch())
let poll = handle_state!(self.process_probe_batch());
self.join_metrics.baseline.record_poll(poll)
}
HashJoinStreamState::ExhaustedProbeSide => {
handle_state!(self.process_unmatched_build_batch())
let poll = handle_state!(self.process_unmatched_build_batch());
self.join_metrics.baseline.record_poll(poll)
}
HashJoinStreamState::Completed => Poll::Ready(None),
};
Expand Down Expand Up @@ -1582,7 +1584,6 @@ impl HashJoinStream {
};

self.join_metrics.output_batches.add(1);
self.join_metrics.output_rows.add(result.num_rows());
timer.done();

if next_offset.is_none() {
Expand Down Expand Up @@ -1639,7 +1640,6 @@ impl HashJoinStream {
self.join_metrics.input_rows.add(batch.num_rows());

self.join_metrics.output_batches.add(1);
self.join_metrics.output_rows.add(batch.num_rows());
}
timer.done();

Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,8 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
handle_state!(ready!(self.fetch_probe_batch(cx)))
}
NestedLoopJoinStreamState::ProcessProbeBatch(_) => {
handle_state!(self.process_probe_batch())
let poll = handle_state!(self.process_probe_batch());
self.join_metrics.baseline.record_poll(poll)
}
NestedLoopJoinStreamState::ExhaustedProbeSide => {
handle_state!(self.process_unmatched_build_batch())
Expand Down Expand Up @@ -912,7 +913,6 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
}

self.join_metrics.output_batches.add(1);
self.join_metrics.output_rows.add(batch.num_rows());
Ok(StatefulStreamResult::Ready(Some(batch)))
}
}
Expand Down
30 changes: 24 additions & 6 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};

use crate::joins::SharedBitmapBuilder;
use crate::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder};
use crate::metrics::{self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder};
use crate::projection::ProjectionExec;
use crate::{
ColumnStatistics, ExecutionPlan, ExecutionPlanProperties, Partitioning, Statistics,
Expand Down Expand Up @@ -1196,6 +1196,7 @@ fn append_probe_indices_in_order(
/// Metrics for build & probe joins
#[derive(Clone, Debug)]
pub(crate) struct BuildProbeJoinMetrics {
pub(crate) baseline: BaselineMetrics,
/// Total time for collecting build-side of join
pub(crate) build_time: metrics::Time,
/// Number of batches consumed by build-side
Expand All @@ -1212,12 +1213,31 @@ pub(crate) struct BuildProbeJoinMetrics {
pub(crate) input_rows: metrics::Count,
/// Number of batches produced by this operator
pub(crate) output_batches: metrics::Count,
/// Number of rows produced by this operator
pub(crate) output_rows: metrics::Count,
}

// This Drop implementation updates the elapsed compute part of the metrics.
//
// Why is this in a Drop?
// - We keep track of build_time and join_time separately, but baseline metrics have
// a total elapsed_compute time. Instead of remembering to update both the metrics
// at the same time, we chose to update elapsed_compute once at the end - summing up
// both the parts.
//
// How does this work?
// - The elapsed_compute `Time` is represented by an `Arc<AtomicUsize>`. So even when
// this `BuildProbeJoinMetrics` is dropped, the elapsed_compute is usable through the
// Arc reference.
impl Drop for BuildProbeJoinMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's the best way but I guess it's okay to merge 🤔 If we forget to count a specific time period in some join operator, we can fix it in the future.

Could you also add a brief comment to explain this drop?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. This is not intuitive at all. When I came back to this PR to act on your suggestions, it took me a while to remember how this works :)

I have added a comment for now. I would be open to other ways of handling this!

fn drop(&mut self) {
self.baseline.elapsed_compute().add(&self.build_time);
self.baseline.elapsed_compute().add(&self.join_time);
}
}

impl BuildProbeJoinMetrics {
pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
let baseline = BaselineMetrics::new(metrics, partition);

let join_time = MetricBuilder::new(metrics).subset_time("join_time", partition);

let build_time = MetricBuilder::new(metrics).subset_time("build_time", partition);
Expand All @@ -1239,8 +1259,6 @@ impl BuildProbeJoinMetrics {
let output_batches =
MetricBuilder::new(metrics).counter("output_batches", partition);

let output_rows = MetricBuilder::new(metrics).output_rows(partition);

Self {
build_time,
build_input_batches,
Expand All @@ -1250,7 +1268,7 @@ impl BuildProbeJoinMetrics {
input_batches,
input_rows,
output_batches,
output_rows,
baseline,
}
}
}
Expand Down