Skip to content

Commit

Permalink
add reset to ExecutionPlan, make sure execute() is called on join inp…
Browse files Browse the repository at this point in the history
…uts immediately.
  • Loading branch information
Jackson Newhouse committed Mar 6, 2024
1 parent bf6f83b commit 18e1c45
Show file tree
Hide file tree
Showing 49 changed files with 339 additions and 83 deletions.
4 changes: 4 additions & 0 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,4 +270,8 @@ impl ExecutionPlan for CustomExec {
None,
)?))
}

fn reset(&self) -> Result<()> {
Ok(())
}
}
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ impl ExecutionPlan for ArrowExec {
fn statistics(&self) -> Result<Statistics> {
Ok(self.projected_statistics.clone())
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
Ok(())
}
}

pub struct ArrowOpener {
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ impl ExecutionPlan for AvroExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
Ok(())
}
}

#[cfg(feature = "avro")]
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ impl ExecutionPlan for CsvExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
Ok(())
}
}

/// A Config for [`CsvOpener`]
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ impl ExecutionPlan for NdJsonExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
Ok(())
}
}

/// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`]
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,11 @@ impl ExecutionPlan for ParquetExec {
fn statistics(&self) -> Result<Statistics> {
Ok(self.projected_statistics.clone())
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
Ok(())
}
}

/// Implements [`FileOpener`] for a parquet file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,10 @@ pub(crate) mod tests {
fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
}

fn reset(&self) -> Result<()> {
self.input.reset()
}
}

pub(crate) fn schema() -> SchemaRef {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_optimizer/output_requirements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ impl ExecutionPlan for OutputRequirementExec {
fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
}

fn reset(&self) -> Result<()> {
self.input.reset()
}
}

impl PhysicalOptimizerRule for OutputRequirements {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2610,6 +2610,10 @@ mod tests {
) -> Result<SendableRecordBatchStream> {
unimplemented!("NoOpExecutionPlan::execute");
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

// Produces an execution plan where the schema is mismatched from
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ impl ExecutionPlan for StatisticsExec {
fn statistics(&self) -> Result<Statistics> {
Ok(self.stats.clone())
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

pub mod object_store;
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,10 @@ impl ExecutionPlan for UnboundedExec {
batch: self.batch.clone(),
}))
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

#[derive(Debug)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/tests/custom_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ impl ExecutionPlan for CustomExecutionPlan {
.collect(),
})
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

#[async_trait]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ impl ExecutionPlan for CustomPlan {
// but we want to test the filter pushdown not the CBOs
Ok(Statistics::new_unknown(&self.schema()))
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

#[derive(Clone)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/tests/custom_sources_cases/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ impl ExecutionPlan for StatisticsValidation {
fn statistics(&self) -> Result<Statistics> {
Ok(self.stats.clone())
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

fn init_ctx(stats: Statistics, schema: Schema) -> Result<SessionContext> {
Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/tests/user_defined/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,6 @@ impl DisplayAs for TopKExec {
}
}

#[async_trait]
impl ExecutionPlan for TopKExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
Expand Down Expand Up @@ -494,6 +493,10 @@ impl ExecutionPlan for TopKExec {
// better statistics inference could be provided
Ok(Statistics::new_unknown(&self.schema()))
}

fn reset(&self) -> Result<()> {
self.input.reset()
}
}

// A very specialized TopK implementation
Expand Down
9 changes: 9 additions & 0 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,11 @@ impl ExecutionPlan for AggregateExec {
}
}
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
self.input.reset()
}
}

fn create_schema(
Expand Down Expand Up @@ -1695,6 +1700,10 @@ mod tests {
None,
))
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

/// A stream using the demo data. If inited as new, it will first yield to runtime before returning records
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ impl ExecutionPlan for AnalyzeExec {
futures::stream::once(output),
)))
}

fn reset(&self) -> Result<()> {
self.input.reset()
}
}

/// Creates the ouput of AnalyzeExec as a RecordBatch
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ impl ExecutionPlan for CoalesceBatchesExec {
fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
self.input.reset()
}
}

struct CoalesceBatchesStream {
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ impl ExecutionPlan for CoalescePartitionsExec {
fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
self.input.reset()
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,10 @@ mod tests {
)),
}
}

fn reset(&self) -> datafusion_common::Result<()> {
Ok(())
}
}

fn test_stats_display(exec: TestStatsExecPlan, show_stats: bool) {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ impl ExecutionPlan for EmptyExec {
None,
))
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ impl ExecutionPlan for ExplainExec {
futures::stream::iter(vec![Ok(record_batch)]),
)))
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

/// If this plan should be shown, given the previous plan that was
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ impl ExecutionPlan for FilterExec {
column_statistics,
})
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
self.input.reset()
}
}

/// This function ensures that all bounds in the `ExprBoundaries` vector are
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ impl ExecutionPlan for FileSinkExec {
stream,
)))
}

fn reset(&self) -> Result<()> {
Ok(())
}
}

/// Create a output record batch with a count
Expand Down
46 changes: 26 additions & 20 deletions datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@

use std::{any::Any, sync::Arc, task::Poll};

use super::utils::{
adjust_right_output_partitioning, BuildProbeJoinMetrics, OnceAsync, OnceFut,
};
use super::utils::{adjust_right_output_partitioning, BuildProbeJoinMetrics, OnceFut};
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::DisplayAs;
use crate::{
Expand Down Expand Up @@ -57,8 +55,6 @@ pub struct CrossJoinExec {
pub right: Arc<dyn ExecutionPlan>,
/// The schema once the join is applied
schema: SchemaRef,
/// Build-side data
left_fut: OnceAsync<JoinLeftData>,
/// Execution plan metrics
metrics: ExecutionPlanMetricsSet,
}
Expand All @@ -81,7 +77,6 @@ impl CrossJoinExec {
left,
right,
schema,
left_fut: Default::default(),
metrics: ExecutionPlanMetricsSet::default(),
}
}
Expand All @@ -97,22 +92,28 @@ impl CrossJoinExec {
}
}

/// Asynchronously collect the result of the left child
async fn load_left_input(
fn merge_stream(
left: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
metrics: BuildProbeJoinMetrics,
reservation: MemoryReservation,
) -> Result<JoinLeftData> {
) -> Result<SendableRecordBatchStream> {
// merge all left parts into a single stream
let left_schema = left.schema();
let merge = if left.output_partitioning().partition_count() != 1 {
Arc::new(CoalescePartitionsExec::new(left))
} else {
left
};
let stream = merge.execute(0, context)?;

Ok(stream)
}

/// Asynchronously collect the result of the left child
async fn load_left_input(
stream: SendableRecordBatchStream,
metrics: BuildProbeJoinMetrics,
reservation: MemoryReservation,
) -> Result<JoinLeftData> {
let left_schema = stream.schema();
// Load all batches and count the rows
let (batches, num_rows, _, reservation) = stream
.try_fold(
Expand Down Expand Up @@ -239,14 +240,13 @@ impl ExecutionPlan for CrossJoinExec {
let reservation =
MemoryConsumer::new("CrossJoinExec").register(context.memory_pool());

let left_fut = self.left_fut.once(|| {
load_left_input(
self.left.clone(),
context,
join_metrics.clone(),
reservation,
)
});
let left_stream = merge_stream(self.left.clone(), context.clone())?;

let left_fut = OnceFut::new(load_left_input(
left_stream,
join_metrics.clone(),
reservation,
));

Ok(Box::pin(CrossJoinStream {
schema: self.schema.clone(),
Expand All @@ -264,6 +264,12 @@ impl ExecutionPlan for CrossJoinExec {
self.right.statistics()?,
))
}

fn reset(&self) -> Result<()> {
self.metrics.reset();
self.left.reset()?;
self.right.reset()
}
}

/// [left/right]_col_count are required in case the column statistics are None
Expand Down
Loading

0 comments on commit 18e1c45

Please sign in to comment.