Skip to content

Commit

Permalink
add reset to ExecutionPlan, make sure execute() is called on join inp…
Browse files Browse the repository at this point in the history
…uts immediately.
  • Loading branch information
Jackson Newhouse committed Apr 12, 2024
1 parent 1fa25ae commit cadc2ff
Show file tree
Hide file tree
Showing 49 changed files with 343 additions and 78 deletions.
4 changes: 4 additions & 0 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,8 @@ impl ExecutionPlan for CustomExec {
None,
)?))
}

fn reset(&self) -> Result<()> {
Ok(())
}
}
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ impl ExecutionPlan for ArrowExec {
fn statistics(&self) -> Result<Statistics> {
Ok(self.projected_statistics.clone())
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
Ok(())
}
}

pub struct ArrowOpener {
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ impl ExecutionPlan for AvroExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
Ok(())
}
}

#[cfg(feature = "avro")]
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ impl ExecutionPlan for CsvExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
Ok(())
}
}

/// A Config for [`CsvOpener`]
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ impl ExecutionPlan for NdJsonExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
Ok(())
}
}

/// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`]
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,11 @@ impl ExecutionPlan for ParquetExec {
fn statistics(&self) -> Result<Statistics> {
Ok(self.projected_statistics.clone())
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
Ok(())
}
}

/// Implements [`FileOpener`] for a parquet file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,10 @@ pub(crate) mod tests {
fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
}

fn reset(&self) -> Result<()> {
self.input.reset()
}
}

pub(crate) fn schema() -> SchemaRef {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_optimizer/output_requirements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ impl ExecutionPlan for OutputRequirementExec {
fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
}

fn reset(&self) -> Result<()> {
self.input.reset()
}
}

impl PhysicalOptimizerRule for OutputRequirements {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2647,6 +2647,10 @@ mod tests {
) -> Result<SendableRecordBatchStream> {
unimplemented!("NoOpExecutionPlan::execute");
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

// Produces an execution plan where the schema is mismatched from
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,10 @@ impl ExecutionPlan for StatisticsExec {
fn statistics(&self) -> Result<Statistics> {
Ok(self.stats.clone())
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

pub mod object_store;
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ impl ExecutionPlan for UnboundedExec {
batch: self.batch.clone(),
}))
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

#[derive(Debug)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/tests/custom_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ impl ExecutionPlan for CustomExecutionPlan {
.collect(),
})
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

#[async_trait]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ impl ExecutionPlan for CustomPlan {
// but we want to test the filter pushdown not the CBOs
Ok(Statistics::new_unknown(&self.schema()))
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

#[derive(Clone)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/tests/custom_sources_cases/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ impl ExecutionPlan for StatisticsValidation {
fn statistics(&self) -> Result<Statistics> {
Ok(self.stats.clone())
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

fn init_ctx(stats: Statistics, schema: Schema) -> Result<SessionContext> {
Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/tests/user_defined/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,6 @@ impl DisplayAs for TopKExec {
}
}

#[async_trait]
impl ExecutionPlan for TopKExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
Expand Down Expand Up @@ -502,6 +501,10 @@ impl ExecutionPlan for TopKExec {
// better statistics inference could be provided
Ok(Statistics::new_unknown(&self.schema()))
}

fn reset(&self) -> Result<()> {
self.input.reset()
}
}

// A very specialized TopK implementation
Expand Down
9 changes: 9 additions & 0 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,11 @@ impl ExecutionPlan for AggregateExec {
}
}
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
self.input.reset()
}
}

fn create_schema(
Expand Down Expand Up @@ -1707,6 +1712,10 @@ mod tests {
None,
))
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

/// A stream using the demo data. If inited as new, it will first yield to runtime before returning records
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ impl ExecutionPlan for AnalyzeExec {
futures::stream::once(output),
)))
}

fn reset(&self) -> Result<()> {
self.input.reset()
}
}

/// Creates the ouput of AnalyzeExec as a RecordBatch
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ impl ExecutionPlan for CoalesceBatchesExec {
fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
self.input.reset()
}
}

struct CoalesceBatchesStream {
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ impl ExecutionPlan for CoalescePartitionsExec {
fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
self.input.reset()
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,10 @@ mod tests {
Self::Ok => Ok(Statistics::new_unknown(self.schema().as_ref())),
}
}

fn reset(&self) -> datafusion_common::Result<()> {
Ok(())
}
}

fn test_stats_display(exec: TestStatsExecPlan, show_stats: bool) {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ impl ExecutionPlan for EmptyExec {
None,
))
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ impl ExecutionPlan for ExplainExec {
futures::stream::iter(vec![Ok(record_batch)]),
)))
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

/// If this plan should be shown, given the previous plan that was
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@ impl ExecutionPlan for FilterExec {
fn statistics(&self) -> Result<Statistics> {
Self::statistics_helper(&self.input, self.predicate(), self.default_selectivity)
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
self.input.reset()
}
}

/// This function ensures that all bounds in the `ExprBoundaries` vector are
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ impl ExecutionPlan for FileSinkExec {
stream,
)))
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

/// Create a output record batch with a count
Expand Down
44 changes: 26 additions & 18 deletions datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use std::{any::Any, sync::Arc, task::Poll};

use super::utils::{
adjust_right_output_partitioning, BuildProbeJoinMetrics, OnceAsync, OnceFut,
adjust_right_output_partitioning, BuildProbeJoinMetrics, OnceFut,
};
use crate::coalesce_batches::concat_batches;
use crate::coalesce_partitions::CoalescePartitionsExec;
Expand Down Expand Up @@ -58,8 +58,6 @@ pub struct CrossJoinExec {
pub right: Arc<dyn ExecutionPlan>,
/// The schema once the join is applied
schema: SchemaRef,
/// Build-side data
left_fut: OnceAsync<JoinLeftData>,
/// Execution plan metrics
metrics: ExecutionPlanMetricsSet,
cache: PlanProperties,
Expand All @@ -83,7 +81,6 @@ impl CrossJoinExec {
left,
right,
schema,
left_fut: Default::default(),
metrics: ExecutionPlanMetricsSet::default(),
cache,
}
Expand Down Expand Up @@ -137,22 +134,28 @@ impl CrossJoinExec {
}
}

/// Asynchronously collect the result of the left child
async fn load_left_input(
fn merge_stream(
left: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
metrics: BuildProbeJoinMetrics,
reservation: MemoryReservation,
) -> Result<JoinLeftData> {
) -> Result<SendableRecordBatchStream> {
// merge all left parts into a single stream
let left_schema = left.schema();
let merge = if left.output_partitioning().partition_count() != 1 {
Arc::new(CoalescePartitionsExec::new(left))
} else {
left
};
let stream = merge.execute(0, context)?;

Ok(stream)
}

/// Asynchronously collect the result of the left child
async fn load_left_input(
stream: SendableRecordBatchStream,
metrics: BuildProbeJoinMetrics,
reservation: MemoryReservation,
) -> Result<JoinLeftData> {
let left_schema = stream.schema();
// Load all batches and count the rows
let (batches, num_rows, _, reservation) = stream
.try_fold(
Expand Down Expand Up @@ -244,14 +247,13 @@ impl ExecutionPlan for CrossJoinExec {
let reservation =
MemoryConsumer::new("CrossJoinExec").register(context.memory_pool());

let left_fut = self.left_fut.once(|| {
load_left_input(
self.left.clone(),
context,
join_metrics.clone(),
reservation,
)
});
let left_stream = merge_stream(self.left.clone(), context.clone())?;

let left_fut = OnceFut::new(load_left_input(
left_stream,
join_metrics.clone(),
reservation,
));

Ok(Box::pin(CrossJoinStream {
schema: self.schema.clone(),
Expand All @@ -269,6 +271,12 @@ impl ExecutionPlan for CrossJoinExec {
self.right.statistics()?,
))
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
self.left.reset()?;
self.right.reset()
}
}

/// [left/right]_col_count are required in case the column statistics are None
Expand Down
Loading

0 comments on commit cadc2ff

Please sign in to comment.