From 18e1c45d48ab723fa17d29d97a658f028e1ff1e8 Mon Sep 17 00:00:00 2001 From: Jackson Newhouse Date: Tue, 5 Mar 2024 15:49:24 -0800 Subject: [PATCH] add reset to ExecutionPlan, make sure execute() is called on join inputs immediately. --- .../examples/custom_datasource.rs | 4 ++ .../datasource/physical_plan/arrow_file.rs | 5 ++ .../core/src/datasource/physical_plan/avro.rs | 5 ++ .../core/src/datasource/physical_plan/csv.rs | 5 ++ .../core/src/datasource/physical_plan/json.rs | 5 ++ .../datasource/physical_plan/parquet/mod.rs | 5 ++ .../enforce_distribution.rs | 4 ++ .../physical_optimizer/output_requirements.rs | 4 ++ datafusion/core/src/physical_planner.rs | 4 ++ datafusion/core/src/test/mod.rs | 4 ++ datafusion/core/src/test_util/mod.rs | 4 ++ datafusion/core/tests/custom_sources.rs | 4 ++ .../provider_filter_pushdown.rs | 4 ++ .../tests/custom_sources_cases/statistics.rs | 4 ++ .../tests/user_defined/user_defined_plan.rs | 5 +- .../physical-plan/src/aggregates/mod.rs | 9 +++ datafusion/physical-plan/src/analyze.rs | 4 ++ .../physical-plan/src/coalesce_batches.rs | 5 ++ .../physical-plan/src/coalesce_partitions.rs | 5 ++ datafusion/physical-plan/src/display.rs | 4 ++ datafusion/physical-plan/src/empty.rs | 4 ++ datafusion/physical-plan/src/explain.rs | 4 ++ datafusion/physical-plan/src/filter.rs | 5 ++ datafusion/physical-plan/src/insert.rs | 4 ++ .../physical-plan/src/joins/cross_join.rs | 46 ++++++------ .../physical-plan/src/joins/hash_join.rs | 70 ++++++++++--------- .../src/joins/nested_loop_join.rs | 49 ++++++------- .../src/joins/sort_merge_join.rs | 6 ++ .../src/joins/symmetric_hash_join.rs | 6 ++ datafusion/physical-plan/src/joins/utils.rs | 2 + datafusion/physical-plan/src/lib.rs | 2 + datafusion/physical-plan/src/limit.rs | 10 +++ datafusion/physical-plan/src/memory.rs | 4 ++ datafusion/physical-plan/src/metrics/mod.rs | 5 ++ .../physical-plan/src/placeholder_row.rs | 4 ++ datafusion/physical-plan/src/projection.rs | 5 ++ .../physical-plan/src/recursive_query.rs | 7 ++ .../physical-plan/src/repartition/mod.rs | 5 ++ .../physical-plan/src/sorts/partial_sort.rs | 4 ++ datafusion/physical-plan/src/sorts/sort.rs | 5 ++ .../src/sorts/sort_preserving_merge.rs | 5 ++ datafusion/physical-plan/src/streaming.rs | 6 +- datafusion/physical-plan/src/test/exec.rs | 26 +++++++ datafusion/physical-plan/src/union.rs | 16 +++++ datafusion/physical-plan/src/unnest.rs | 5 ++ datafusion/physical-plan/src/values.rs | 4 ++ .../src/windows/bounded_window_agg_exec.rs | 5 ++ .../src/windows/window_agg_exec.rs | 5 ++ datafusion/physical-plan/src/work_table.rs | 10 +++ 49 files changed, 339 insertions(+), 83 deletions(-) diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs index 69f9c9530e871..98f11d4257873 100644 --- a/datafusion-examples/examples/custom_datasource.rs +++ b/datafusion-examples/examples/custom_datasource.rs @@ -270,4 +270,8 @@ impl ExecutionPlan for CustomExec { None, )?)) } + + fn reset(&self) -> Result<()> { + Ok(()) + } } diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index f6c310fb5da15..ebf387e9a305e 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -170,6 +170,11 @@ impl ExecutionPlan for ArrowExec { fn statistics(&self) -> Result { Ok(self.projected_statistics.clone()) } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + Ok(()) + } } pub struct ArrowOpener { diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index e448bf39f4272..ff0b33bb68215 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -155,6 +155,11 @@ impl ExecutionPlan for AvroExec { fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + Ok(()) + } } #[cfg(feature = "avro")] diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 04959c7904a9a..4cc2e8685d3fc 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -236,6 +236,11 @@ impl ExecutionPlan for CsvExec { fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + Ok(()) + } } /// A Config for [`CsvOpener`] diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index c033c4b89891b..ebbaee34bfeea 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -189,6 +189,11 @@ impl ExecutionPlan for NdJsonExec { fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + Ok(()) + } } /// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`] diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 171f7cdc59a42..55d942c596730 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -417,6 +417,11 @@ impl ExecutionPlan for ParquetExec { fn statistics(&self) -> Result { Ok(self.projected_statistics.clone()) } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + Ok(()) + } } /// Implements [`FileOpener`] for a parquet file diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 4f8806a685923..db877fad134ff 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1439,6 +1439,10 @@ pub(crate) mod tests { fn statistics(&self) -> Result { self.input.statistics() } + + fn reset(&self) -> Result<()> { + self.input.reset() + } } pub(crate) fn schema() -> SchemaRef { diff --git a/datafusion/core/src/physical_optimizer/output_requirements.rs b/datafusion/core/src/physical_optimizer/output_requirements.rs index 4d03840d3dd31..ac5eea3f8549e 100644 --- a/datafusion/core/src/physical_optimizer/output_requirements.rs +++ b/datafusion/core/src/physical_optimizer/output_requirements.rs @@ -186,6 +186,10 @@ impl ExecutionPlan for OutputRequirementExec { fn statistics(&self) -> Result { self.input.statistics() } + + fn reset(&self) -> Result<()> { + self.input.reset() + } } impl PhysicalOptimizerRule for OutputRequirements { diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 463d0cde82828..accf0b4973a08 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -2610,6 +2610,10 @@ mod tests { ) -> Result { unimplemented!("NoOpExecutionPlan::execute"); } + + fn reset(&self) -> Result<()> { + Ok(()) + } } // Produces an execution plan where the schema is mismatched from diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index ed5aa15e291b5..9150199222827 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -437,6 +437,10 @@ impl ExecutionPlan for StatisticsExec { fn statistics(&self) -> Result { Ok(self.stats.clone()) } + + fn reset(&self) -> Result<()> { + Ok(()) + } } pub mod object_store; diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 282b0f7079ee2..22c003cf2132f 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -309,6 +309,10 @@ impl ExecutionPlan for UnboundedExec { batch: self.batch.clone(), })) } + + fn reset(&self) -> Result<()> { + Ok(()) + } } #[derive(Debug)] diff --git a/datafusion/core/tests/custom_sources.rs b/datafusion/core/tests/custom_sources.rs index a9ea5cc2a35c8..820bedf418665 100644 --- a/datafusion/core/tests/custom_sources.rs +++ b/datafusion/core/tests/custom_sources.rs @@ -174,6 +174,10 @@ impl ExecutionPlan for CustomExecutionPlan { .collect(), }) } + + fn reset(&self) -> Result<()> { + Ok(()) + } } #[async_trait] diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs index e374abd6e8915..937749700a9bf 100644 --- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs +++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs @@ -124,6 +124,10 @@ impl ExecutionPlan for CustomPlan { // but we want to test the filter pushdown not the CBOs Ok(Statistics::new_unknown(&self.schema())) } + + fn reset(&self) -> Result<()> { + Ok(()) + } } #[derive(Clone)] diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index f0985f5546543..17447d2631806 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -165,6 +165,10 @@ impl ExecutionPlan for StatisticsValidation { fn statistics(&self) -> Result { Ok(self.stats.clone()) } + + fn reset(&self) -> Result<()> { + Ok(()) + } } fn init_ctx(stats: Statistics, schema: Schema) -> Result { diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index 29708c4422cac..817b532fdc4ce 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -434,7 +434,6 @@ impl DisplayAs for TopKExec { } } -#[async_trait] impl ExecutionPlan for TopKExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -494,6 +493,10 @@ impl ExecutionPlan for TopKExec { // better statistics inference could be provided Ok(Statistics::new_unknown(&self.schema())) } + + fn reset(&self) -> Result<()> { + self.input.reset() + } } // A very specialized TopK implementation diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 1563624305584..a277526c6e316 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -754,6 +754,11 @@ impl ExecutionPlan for AggregateExec { } } } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + self.input.reset() + } } fn create_schema( @@ -1695,6 +1700,10 @@ mod tests { None, )) } + + fn reset(&self) -> Result<()> { + Ok(()) + } } /// A stream using the demo data. If inited as new, it will first yield to runtime before returning records diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 4f1578e220ddd..2b153e613cc28 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -199,6 +199,10 @@ impl ExecutionPlan for AnalyzeExec { futures::stream::once(output), ))) } + + fn reset(&self) -> Result<()> { + self.input.reset() + } } /// Creates the ouput of AnalyzeExec as a RecordBatch diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 09d1ea87ca370..1def5bf503f45 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -171,6 +171,11 @@ impl ExecutionPlan for CoalesceBatchesExec { fn statistics(&self) -> Result { self.input.statistics() } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + self.input.reset() + } } struct CoalesceBatchesStream { diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index bfcff28535386..6625b75eb841a 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -170,6 +170,11 @@ impl ExecutionPlan for CoalescePartitionsExec { fn statistics(&self) -> Result { self.input.statistics() } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + self.input.reset() + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index ff106dceb974d..c3d2bac22fe5e 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -541,6 +541,10 @@ mod tests { )), } } + + fn reset(&self) -> datafusion_common::Result<()> { + Ok(()) + } } fn test_stats_display(exec: TestStatsExecPlan, show_stats: bool) { diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 41c8dbed14536..b337f426b71d8 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -136,6 +136,10 @@ impl ExecutionPlan for EmptyExec { None, )) } + + fn reset(&self) -> Result<()> { + Ok(()) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/explain.rs b/datafusion/physical-plan/src/explain.rs index e4904ddd34100..82968280020a9 100644 --- a/datafusion/physical-plan/src/explain.rs +++ b/datafusion/physical-plan/src/explain.rs @@ -167,6 +167,10 @@ impl ExecutionPlan for ExplainExec { futures::stream::iter(vec![Ok(record_batch)]), ))) } + + fn reset(&self) -> Result<()> { + Ok(()) + } } /// If this plan should be shown, given the previous plan that was diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 362fa10efc9f5..6f29794c9acc1 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -252,6 +252,11 @@ impl ExecutionPlan for FilterExec { column_statistics, }) } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + self.input.reset() + } } /// This function ensures that all bounds in the `ExprBoundaries` vector are diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 81cdfd753fe69..a71efcfd62fda 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -278,6 +278,10 @@ impl ExecutionPlan for FileSinkExec { stream, ))) } + + fn reset(&self) -> Result<()> { + Ok(()) + } } /// Create a output record batch with a count diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 938c9e4d343d6..9a48aa23598b6 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -20,9 +20,7 @@ use std::{any::Any, sync::Arc, task::Poll}; -use super::utils::{ - adjust_right_output_partitioning, BuildProbeJoinMetrics, OnceAsync, OnceFut, -}; +use super::utils::{adjust_right_output_partitioning, BuildProbeJoinMetrics, OnceFut}; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::DisplayAs; use crate::{ @@ -57,8 +55,6 @@ pub struct CrossJoinExec { pub right: Arc, /// The schema once the join is applied schema: SchemaRef, - /// Build-side data - left_fut: OnceAsync, /// Execution plan metrics metrics: ExecutionPlanMetricsSet, } @@ -81,7 +77,6 @@ impl CrossJoinExec { left, right, schema, - left_fut: Default::default(), metrics: ExecutionPlanMetricsSet::default(), } } @@ -97,15 +92,11 @@ impl CrossJoinExec { } } -/// Asynchronously collect the result of the left child -async fn load_left_input( +fn merge_stream( left: Arc, context: Arc, - metrics: BuildProbeJoinMetrics, - reservation: MemoryReservation, -) -> Result { +) -> Result { // merge all left parts into a single stream - let left_schema = left.schema(); let merge = if left.output_partitioning().partition_count() != 1 { Arc::new(CoalescePartitionsExec::new(left)) } else { @@ -113,6 +104,16 @@ async fn load_left_input( }; let stream = merge.execute(0, context)?; + Ok(stream) +} + +/// Asynchronously collect the result of the left child +async fn load_left_input( + stream: SendableRecordBatchStream, + metrics: BuildProbeJoinMetrics, + reservation: MemoryReservation, +) -> Result { + let left_schema = stream.schema(); // Load all batches and count the rows let (batches, num_rows, _, reservation) = stream .try_fold( @@ -239,14 +240,13 @@ impl ExecutionPlan for CrossJoinExec { let reservation = MemoryConsumer::new("CrossJoinExec").register(context.memory_pool()); - let left_fut = self.left_fut.once(|| { - load_left_input( - self.left.clone(), - context, - join_metrics.clone(), - reservation, - ) - }); + let left_stream = merge_stream(self.left.clone(), context.clone())?; + + let left_fut = OnceFut::new(load_left_input( + left_stream, + join_metrics.clone(), + reservation, + )); Ok(Box::pin(CrossJoinStream { schema: self.schema.clone(), @@ -264,6 +264,12 @@ impl ExecutionPlan for CrossJoinExec { self.right.statistics()?, )) } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + self.left.reset()?; + self.right.reset() + } } /// [left/right]_col_count are required in case the column statistics are None diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index cd8b17d135986..b15f104fae84d 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -43,10 +43,7 @@ use crate::{ }; use crate::{handle_state, DisplayAs}; -use super::{ - utils::{OnceAsync, OnceFut}, - PartitionMode, -}; +use super::{utils::OnceFut, PartitionMode}; use arrow::array::{ Array, ArrayRef, BooleanArray, BooleanBufferBuilder, PrimitiveArray, UInt32Array, @@ -284,8 +281,6 @@ pub struct HashJoinExec { pub join_type: JoinType, /// The output schema for the join schema: SchemaRef, - /// Future that consumes left input and builds the hash table - left_fut: OnceAsync, /// Shared the `RandomState` for the hashing algorithm random_state: RandomState, /// Output order @@ -347,7 +342,6 @@ impl HashJoinExec { filter, join_type: *join_type, schema: Arc::new(schema), - left_fut: Default::default(), random_state, mode: partition_mode, metrics: ExecutionPlanMetricsSet::new(), @@ -600,30 +594,31 @@ impl ExecutionPlan for HashJoinExec { let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); let left_fut = match self.mode { - PartitionMode::CollectLeft => self.left_fut.once(|| { + PartitionMode::CollectLeft => { let reservation = MemoryConsumer::new("HashJoinInput").register(context.memory_pool()); - collect_left_input( - None, + let left_stream = + coalesce_exec(None, self.left.clone(), context.clone())?; + OnceFut::new(collect_left_input( self.random_state.clone(), - self.left.clone(), + left_stream, on_left.clone(), - context.clone(), join_metrics.clone(), reservation, - ) - }), + )) + } PartitionMode::Partitioned => { let reservation = MemoryConsumer::new(format!("HashJoinInput[{partition}]")) .register(context.memory_pool()); + let left_stream = + coalesce_exec(Some(partition), self.left.clone(), context.clone())?; + OnceFut::new(collect_left_input( - Some(partition), self.random_state.clone(), - self.left.clone(), + left_stream, on_left.clone(), - context.clone(), join_metrics.clone(), reservation, )) @@ -680,37 +675,46 @@ impl ExecutionPlan for HashJoinExec { &self.schema, ) } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + self.left.reset()?; + self.right.reset()?; + Ok(()) + } +} + +fn coalesce_exec( + partition: Option, + exec: Arc, + context: Arc, +) -> Result { + let (left_input, left_input_partition) = if let Some(partition) = partition { + (exec, partition) + } else if exec.output_partitioning().partition_count() != 1 { + (Arc::new(CoalescePartitionsExec::new(exec)) as _, 0) + } else { + (exec, 0) + }; + left_input.execute(left_input_partition, context.clone()) } /// Reads the left (build) side of the input, buffering it in memory, to build a /// hash table (`LeftJoinData`) async fn collect_left_input( - partition: Option, random_state: RandomState, - left: Arc, + left_stream: SendableRecordBatchStream, on_left: Vec, - context: Arc, metrics: BuildProbeJoinMetrics, reservation: MemoryReservation, ) -> Result { - let schema = left.schema(); - - let (left_input, left_input_partition) = if let Some(partition) = partition { - (left, partition) - } else if left.output_partitioning().partition_count() != 1 { - (Arc::new(CoalescePartitionsExec::new(left)) as _, 0) - } else { - (left, 0) - }; - - // Depending on partition argument load single partition or whole left side in memory - let stream = left_input.execute(left_input_partition, context.clone())?; + let schema = left_stream.schema(); // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. let initial = (Vec::new(), 0, metrics, reservation); - let (batches, num_rows, metrics, mut reservation) = stream + let (batches, num_rows, metrics, mut reservation) = left_stream .try_fold(initial, |mut acc, batch| async { let batch_size = batch.get_array_memory_size(); // Reserve memory for incoming batch diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index f89a2445fd070..803799e954c1e 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -30,7 +30,7 @@ use crate::joins::utils::{ build_join_schema, check_join_is_valid, estimate_join_statistics, get_anti_indices, get_final_indices_from_bit_map, get_semi_indices, partitioned_join_output_partitioning, BuildProbeJoinMetrics, ColumnIndex, JoinFilter, - OnceAsync, OnceFut, + OnceFut, }; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::{ @@ -87,8 +87,6 @@ pub struct NestedLoopJoinExec { pub(crate) join_type: JoinType, /// The schema once the join is applied schema: SchemaRef, - /// Build-side data - inner_table: OnceAsync, /// Information of index and left / right placement of columns column_indices: Vec, /// Execution metrics @@ -114,7 +112,6 @@ impl NestedLoopJoinExec { filter, join_type: *join_type, schema: Arc::new(schema), - inner_table: Default::default(), column_indices, metrics: Default::default(), }) @@ -238,29 +235,23 @@ impl ExecutionPlan for NestedLoopJoinExec { .register(context.memory_pool()); let (outer_table, inner_table) = if left_is_build_side(self.join_type) { + let left_stream = self.left.execute(0, context.clone())?; // left must be single partition - let inner_table = self.inner_table.once(|| { - load_specified_partition_of_input( - 0, - self.left.clone(), - context.clone(), - join_metrics.clone(), - load_reservation, - ) - }); + let inner_table = OnceFut::new(load_specified_partition_of_input( + left_stream, + join_metrics.clone(), + load_reservation, + )); let outer_table = self.right.execute(partition, context)?; (outer_table, inner_table) } else { + let right_stream = self.right.execute(0, context.clone())?; // right must be single partition - let inner_table = self.inner_table.once(|| { - load_specified_partition_of_input( - 0, - self.right.clone(), - context.clone(), - join_metrics.clone(), - load_reservation, - ) - }); + let inner_table = OnceFut::new(load_specified_partition_of_input( + right_stream, + join_metrics.clone(), + load_reservation, + )); let outer_table = self.left.execute(partition, context)?; (outer_table, inner_table) }; @@ -292,6 +283,12 @@ impl ExecutionPlan for NestedLoopJoinExec { &self.schema, ) } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + self.left.reset()?; + self.right.reset() + } } // For the nested loop join, different join type need the different distribution for @@ -321,13 +318,11 @@ fn distribution_from_join_type(join_type: &JoinType) -> Vec { /// Asynchronously collect the specified partition data of the input async fn load_specified_partition_of_input( - partition: usize, - input: Arc, - context: Arc, + stream: SendableRecordBatchStream, join_metrics: BuildProbeJoinMetrics, reservation: MemoryReservation, ) -> Result { - let stream = input.execute(partition, context)?; + let schema = stream.schema(); // Load all batches and count the rows let (batches, num_rows, _, reservation) = stream @@ -350,7 +345,7 @@ async fn load_specified_partition_of_input( ) .await?; - let merged_batch = concat_batches(&input.schema(), &batches, num_rows)?; + let merged_batch = concat_batches(&schema, &batches, num_rows)?; Ok((merged_batch, reservation)) } diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 7af614e534917..d4b6496ab32a5 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -384,6 +384,12 @@ impl ExecutionPlan for SortMergeJoinExec { &self.schema, ) } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + self.left.reset()?; + self.right.reset() + } } /// Metrics for SortMergeJoinExec diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 42c7029edcc17..59fcd83e145bc 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -518,6 +518,12 @@ impl ExecutionPlan for SymmetricHashJoinExec { reservation, })) } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + self.left.reset()?; + self.right.reset() + } } /// A stream that issues [RecordBatch]es as they arrive from the right of the join. diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index e6e3f83fd7e83..85cf1acc52ccf 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -733,6 +733,7 @@ pub fn build_join_schema( /// This is useful for joins where the results of one child are buffered in memory /// and shared across potentially multiple output partitions pub(crate) struct OnceAsync { + #[allow(dead_code)] fut: Mutex>>, } @@ -756,6 +757,7 @@ impl OnceAsync { /// /// If this is not the first call, will return a [`OnceFut`] referring /// to the same future as was returned by the first call + #[allow(dead_code)] pub(crate) fn once(&self, f: F) -> OnceFut where F: FnOnce() -> Fut, diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 1c4a6ac0ecaf5..1ad2d194e24d6 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -444,6 +444,8 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { fn statistics(&self) -> Result { Ok(Statistics::new_unknown(&self.schema())) } + + fn reset(&self) -> Result<()>; } /// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index c31d5f62c7262..3b79ec978db50 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -261,6 +261,11 @@ impl ExecutionPlan for GlobalLimitExec { }; Ok(stats) } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + self.input.reset() + } } /// LocalLimitExec applies a limit to a single partition @@ -429,6 +434,11 @@ impl ExecutionPlan for LocalLimitExec { }; Ok(stats) } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + self.input.reset() + } } /// A Limit stream skips `skip` rows, and then fetch up to `fetch` rows. diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 7de474fda11c3..a3871a725ac2d 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -157,6 +157,10 @@ impl ExecutionPlan for MemoryExec { self.projection.clone(), )) } + + fn reset(&self) -> Result<()> { + Ok(()) + } } impl MemoryExec { diff --git a/datafusion/physical-plan/src/metrics/mod.rs b/datafusion/physical-plan/src/metrics/mod.rs index b2e0086f69e9a..7b50351751a0b 100644 --- a/datafusion/physical-plan/src/metrics/mod.rs +++ b/datafusion/physical-plan/src/metrics/mod.rs @@ -362,6 +362,11 @@ impl ExecutionPlanMetricsSet { let guard = self.inner.lock(); (*guard).clone() } + + pub fn reset(&self) { + let mut guard = self.inner.lock(); + *guard = MetricsSet::new(); + } } /// `name=value` pairs identifiying a metric. This concept is called various things diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index 3ab3de62f37a7..029d1d8fab4c2 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -157,6 +157,10 @@ impl ExecutionPlan for PlaceholderRowExec { None, )) } + + fn reset(&self) -> Result<()> { + Ok(()) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index cc2ab62049ed5..2bd7955052978 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -249,6 +249,11 @@ impl ExecutionPlan for ProjectionExec { self.schema.clone(), )) } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + self.input.reset() + } } /// If e is a direct column reference, returns the field level diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 614ab990ac49a..b951244d14c77 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -180,6 +180,13 @@ impl ExecutionPlan for RecursiveQueryExec { fn statistics(&self) -> Result { Ok(Statistics::new_unknown(&self.schema())) } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + self.work_table.reset(); + self.static_term.reset()?; + self.recursive_term.reset() + } } impl DisplayAs for RecursiveQueryExec { diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 07693f747feec..06bde0aa99d9e 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -623,6 +623,11 @@ impl ExecutionPlan for RepartitionExec { fn statistics(&self) -> Result { self.input.statistics() } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + self.input.reset() + } } impl RepartitionExec { diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 563a82f3ea7b3..f08cd201396ad 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -282,6 +282,10 @@ impl ExecutionPlan for PartialSortExec { fn statistics(&self) -> Result { self.input.statistics() } + + fn reset(&self) -> Result<()> { + todo!() + } } struct PartialSortStream { diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 2d8237011fff6..1c453c4039234 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -922,6 +922,11 @@ impl ExecutionPlan for SortExec { fn statistics(&self) -> Result { self.input.statistics() } + + fn reset(&self) -> Result<()> { + self.metrics_set.reset(); + self.input.reset() + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index f4b57e8bfb45c..90a3f66793145 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -261,6 +261,11 @@ impl ExecutionPlan for SortPreservingMergeExec { fn statistics(&self) -> Result { self.input.statistics() } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + self.input.reset() + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index 8976820928311..25ca464e405a7 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -31,7 +31,6 @@ use datafusion_common::{internal_err, plan_err, DataFusionError, Result}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr}; -use async_trait::async_trait; use futures::stream::StreamExt; use log::debug; @@ -157,7 +156,6 @@ impl DisplayAs for StreamingTableExec { } } -#[async_trait] impl ExecutionPlan for StreamingTableExec { fn as_any(&self) -> &dyn Any { self @@ -219,4 +217,8 @@ impl ExecutionPlan for StreamingTableExec { None => stream, }) } + + fn reset(&self) -> Result<()> { + Ok(()) + } } diff --git a/datafusion/physical-plan/src/test/exec.rs b/datafusion/physical-plan/src/test/exec.rs index 5a8ef2db77c28..b4ceb716bffe6 100644 --- a/datafusion/physical-plan/src/test/exec.rs +++ b/datafusion/physical-plan/src/test/exec.rs @@ -255,6 +255,10 @@ impl ExecutionPlan for MockExec { None, )) } + + fn reset(&self) -> Result<()> { + Ok(()) + } } fn clone_error(e: &DataFusionError) -> DataFusionError { @@ -378,6 +382,12 @@ impl ExecutionPlan for BarrierExec { None, )) } + + fn reset(&self) -> Result<()> { + Err(DataFusionError::NotImplemented( + "BarrierExec::reset".to_string(), + )) + } } /// A mock execution plan that errors on a call to execute @@ -453,6 +463,10 @@ impl ExecutionPlan for ErrorExec { ) -> Result { internal_err!("ErrorExec, unsurprisingly, errored in partition {partition}") } + + fn reset(&self) -> Result<()> { + Ok(()) + } } /// A mock execution plan that simply returns the provided statistics @@ -533,6 +547,10 @@ impl ExecutionPlan for StatisticsExec { fn statistics(&self) -> Result { Ok(self.stats.clone()) } + + fn reset(&self) -> Result<()> { + Ok(()) + } } /// Execution plan that emits streams that block forever. @@ -623,6 +641,10 @@ impl ExecutionPlan for BlockingExec { _refs: Arc::clone(&self.refs), })) } + + fn reset(&self) -> Result<()> { + Ok(()) + } } /// A [`RecordBatchStream`] that is pending forever. @@ -756,6 +778,10 @@ impl ExecutionPlan for PanicExec { ready: false, })) } + + fn reset(&self) -> Result<()> { + Ok(()) + } } /// A [`RecordBatchStream`] that yields every other batch and panics diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index d01ea55074498..2b2d2fc2253cc 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -288,6 +288,14 @@ impl ExecutionPlan for UnionExec { fn benefits_from_input_partitioning(&self) -> Vec { vec![false; self.children().len()] } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + for input in &self.inputs { + input.reset()?; + } + Ok(()) + } } /// Combines multiple input streams by interleaving them. @@ -467,6 +475,14 @@ impl ExecutionPlan for InterleaveExec { fn benefits_from_input_partitioning(&self) -> Vec { vec![false; self.children().len()] } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + for input in &self.inputs { + input.reset()?; + } + Ok(()) + } } /// If all the input partitions have the same Hash partition spec with the first_input_partition diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index b9e732c317af4..5a76ec97ac29f 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -158,6 +158,11 @@ impl ExecutionPlan for UnnestExec { fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + self.input.reset() + } } #[derive(Clone, Debug)] diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index f82f7ea2f869a..2d2979a26815d 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -194,6 +194,10 @@ impl ExecutionPlan for ValuesExec { None, )) } + + fn reset(&self) -> Result<()> { + Ok(()) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 9d247d689c076..74848ac73c24f 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -324,6 +324,11 @@ impl ExecutionPlan for BoundedWindowAggExec { total_byte_size: Precision::Absent, }) } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + self.input.reset() + } } /// Trait that specifies how we search for (or calculate) partitions. It has two diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index 6c245f65ba4f9..01edc877ab6c8 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -264,6 +264,11 @@ impl ExecutionPlan for WindowAggExec { total_byte_size: Precision::Absent, }) } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + self.input.reset() + } } fn create_schema( diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index c74a596f3dae7..810cff52f4830 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -63,6 +63,10 @@ impl WorkTable { pub(super) fn write(&self, input: Vec) { self.batches.lock().unwrap().replace(input); } + + pub(super) fn reset(&self) { + self.batches.lock().unwrap().take(); + } } /// A temporary "working table" operation where the input data will be @@ -186,6 +190,12 @@ impl ExecutionPlan for WorkTableExec { fn statistics(&self) -> Result { Ok(Statistics::new_unknown(&self.schema())) } + + fn reset(&self) -> Result<()> { + self.metrics.reset(); + self.work_table.reset(); + Ok(()) + } } #[cfg(test)]