diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index d8aaad801e5c..edaad87c5f4e 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -66,6 +66,7 @@ datafusion-expr = { workspace = true } datafusion-functions-window-common = { workspace = true } datafusion-optimizer = { workspace = true, default-features = true } datafusion-physical-expr = { workspace = true, default-features = true } +datafusion-physical-optimizer = { workspace = true, default-features = true } datafusion-proto = { workspace = true } datafusion-sql = { workspace = true } env_logger = { workspace = true } diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index 28a3a2f1de09..3f7879d2e5d5 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -15,10 +15,16 @@ // specific language governing permissions and limitations // under the License. +use std::any::Any; +use std::collections::{HashMap, HashSet}; +use std::fs::File; +use std::ops::Range; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray}; use arrow_schema::SchemaRef; -use async_trait::async_trait; -use bytes::Bytes; use datafusion::catalog::Session; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::physical_plan::parquet::{ @@ -38,7 +44,6 @@ use datafusion::parquet::file::metadata::ParquetMetaData; use datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties}; use datafusion::parquet::schema::types::ColumnPath; use datafusion::physical_expr::PhysicalExpr; -use datafusion::physical_optimizer::pruning::PruningPredicate; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*; @@ -48,16 +53,13 @@ use datafusion_common::{ use datafusion_expr::utils::conjunction; use datafusion_expr::{TableProviderFilterPushDown, TableType}; use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee}; +use datafusion_physical_optimizer::pruning::PruningPredicate; + +use async_trait::async_trait; +use bytes::Bytes; use futures::future::BoxFuture; use futures::FutureExt; use object_store::ObjectStore; -use std::any::Any; -use std::collections::{HashMap, HashSet}; -use std::fs::File; -use std::ops::Range; -use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; use tempfile::TempDir; use url::Url; diff --git a/datafusion-examples/examples/parquet_index.rs b/datafusion-examples/examples/parquet_index.rs index d6e17764442d..aab5fa0f9d33 100644 --- a/datafusion-examples/examples/parquet_index.rs +++ b/datafusion-examples/examples/parquet_index.rs @@ -15,6 +15,17 @@ // specific language governing permissions and limitations // under the License. +use std::any::Any; +use std::collections::HashSet; +use std::fmt::Display; +use std::fs::{self, DirEntry, File}; +use std::ops::Range; +use std::path::{Path, PathBuf}; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + use arrow::array::{ Array, ArrayRef, AsArray, BooleanArray, Int32Array, RecordBatch, StringArray, UInt64Array, @@ -22,7 +33,6 @@ use arrow::array::{ use arrow::datatypes::Int32Type; use arrow::util::pretty::pretty_format_batches; use arrow_schema::SchemaRef; -use async_trait::async_trait; use datafusion::catalog::Session; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; @@ -32,7 +42,6 @@ use datafusion::parquet::arrow::arrow_reader::statistics::StatisticsConverter; use datafusion::parquet::arrow::{ arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter, }; -use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*; use datafusion_common::{ @@ -40,16 +49,9 @@ use datafusion_common::{ }; use datafusion_expr::{utils::conjunction, TableProviderFilterPushDown, TableType}; use datafusion_physical_expr::PhysicalExpr; -use std::any::Any; -use std::collections::HashSet; -use std::fmt::Display; -use std::fs::{self, DirEntry, File}; -use std::ops::Range; -use std::path::{Path, PathBuf}; -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; +use datafusion_physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; + +use async_trait::async_trait; use tempfile::TempDir; use url::Url; diff --git a/datafusion-examples/examples/pruning.rs b/datafusion-examples/examples/pruning.rs index c090cd2bcca9..88f573056442 100644 --- a/datafusion-examples/examples/pruning.rs +++ b/datafusion-examples/examples/pruning.rs @@ -15,15 +15,16 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashSet; +use std::sync::Arc; + use arrow::array::{ArrayRef, BooleanArray, Int32Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::common::{DFSchema, ScalarValue}; use datafusion::execution::context::ExecutionProps; use datafusion::physical_expr::create_physical_expr; -use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use datafusion::prelude::*; -use std::collections::HashSet; -use std::sync::Arc; +use datafusion_physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; /// This example shows how to use DataFusion's `PruningPredicate` to prove /// filter expressions can never be true based on statistics such as min/max diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 085f44191b8a..25bbe86e8b5c 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -17,6 +17,15 @@ //! [`ParquetExec`] Execution plan for reading Parquet files +mod access_plan; +mod metrics; +mod opener; +mod page_filter; +mod reader; +mod row_filter; +mod row_group_filter; +mod writer; + use std::any::Any; use std::fmt::Debug; use std::sync::Arc; @@ -27,11 +36,13 @@ use crate::datasource::physical_plan::{ parquet::page_filter::PagePruningAccessPlanFilter, DisplayAs, FileGroupPartitioner, FileScanConfig, }; +use crate::datasource::schema_adapter::{ + DefaultSchemaAdapterFactory, SchemaAdapterFactory, +}; use crate::{ config::{ConfigOptions, TableParquetOptions}, error::Result, execution::context::TaskContext, - physical_optimizer::pruning::PruningPredicate, physical_plan::{ metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, @@ -39,33 +50,21 @@ use crate::{ }, }; +pub use access_plan::{ParquetAccessPlan, RowGroupAccess}; use arrow::datatypes::SchemaRef; use datafusion_common::Constraints; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr}; +use datafusion_physical_optimizer::pruning::PruningPredicate; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; - -use itertools::Itertools; -use log::debug; - -mod access_plan; -mod metrics; -mod opener; -mod page_filter; -mod reader; -mod row_filter; -mod row_group_filter; -mod writer; - -use crate::datasource::schema_adapter::{ - DefaultSchemaAdapterFactory, SchemaAdapterFactory, -}; -pub use access_plan::{ParquetAccessPlan, RowGroupAccess}; pub use metrics::ParquetFileMetrics; use opener::ParquetOpener; pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory}; pub use row_filter::can_expr_be_pushed_down_with_schemas; pub use writer::plan_to_parquet; +use itertools::Itertools; +use log::debug; + /// Execution plan for reading one or more Parquet files. /// /// ```text diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index 883f296f3b95..a1f8f0172ce4 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -17,6 +17,8 @@ //! [`ParquetOpener`] for opening Parquet files +use std::sync::Arc; + use crate::datasource::file_format::{ coerce_file_schema_to_string_type, coerce_file_schema_to_view_type, }; @@ -29,17 +31,18 @@ use crate::datasource::physical_plan::{ FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics, ParquetFileReaderFactory, }; use crate::datasource::schema_adapter::SchemaAdapterFactory; -use crate::physical_optimizer::pruning::PruningPredicate; + use arrow_schema::{ArrowError, SchemaRef}; use datafusion_common::{exec_err, Result}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_optimizer::pruning::PruningPredicate; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; + use futures::{StreamExt, TryStreamExt}; use log::debug; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; -use std::sync::Arc; /// Implements [`FileOpener`] for a parquet file pub(super) struct ParquetOpener { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index 4d0a8451a0d4..dcc4b0bc8150 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -17,14 +17,19 @@ //! Contains code to filter entire pages +use std::collections::HashSet; +use std::sync::Arc; + use super::metrics::ParquetFileMetrics; use crate::datasource::physical_plan::parquet::ParquetAccessPlan; -use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; + use arrow::array::BooleanArray; use arrow::{array::ArrayRef, datatypes::SchemaRef}; use arrow_schema::Schema; use datafusion_common::ScalarValue; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; +use datafusion_physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; + use log::{debug, trace}; use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex}; @@ -34,8 +39,6 @@ use parquet::{ arrow::arrow_reader::{RowSelection, RowSelector}, file::metadata::{ParquetMetaData, RowGroupMetaData}, }; -use std::collections::HashSet; -use std::sync::Arc; /// Filters a [`ParquetAccessPlan`] based on the [Parquet PageIndex], if present /// diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs index 39d86fe857f7..b008157a8324 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs @@ -15,11 +15,17 @@ // specific language governing permissions and limitations // under the License. +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use super::{ParquetAccessPlan, ParquetFileMetrics}; use crate::datasource::listing::FileRange; -use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; + use arrow::{array::ArrayRef, datatypes::Schema}; use arrow_array::BooleanArray; use datafusion_common::{Column, Result, ScalarValue}; +use datafusion_physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; + use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::parquet_column; use parquet::basic::Type; @@ -30,10 +36,6 @@ use parquet::{ bloom_filter::Sbbf, file::metadata::RowGroupMetaData, }; -use std::collections::{HashMap, HashSet}; -use std::sync::Arc; - -use super::{ParquetAccessPlan, ParquetFileMetrics}; /// Reduces the [`ParquetAccessPlan`] based on row group level metadata. /// diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index c5874deb6ed5..54d505e1b4b9 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -17,6 +17,12 @@ //! [`SessionState`]: information required to run queries in a session +use std::any::Any; +use std::collections::hash_map::Entry; +use std::collections::{HashMap, HashSet}; +use std::fmt::Debug; +use std::sync::Arc; + use crate::catalog::{CatalogProviderList, SchemaProvider, TableProviderFactory}; use crate::catalog_common::information_schema::{ InformationSchemaProvider, INFORMATION_SCHEMA, @@ -27,11 +33,9 @@ use crate::datasource::file_format::{format_as_file_type, FileFormatFactory}; use crate::datasource::provider_as_source; use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner}; use crate::execution::SessionStateDefaults; -use crate::physical_optimizer::optimizer::PhysicalOptimizer; use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; + use arrow_schema::{DataType, SchemaRef}; -use async_trait::async_trait; -use chrono::{DateTime, Utc}; use datafusion_catalog::{Session, TableFunction, TableFunctionImpl}; use datafusion_common::alias::AliasGenerator; use datafusion_common::config::{ConfigExtension, ConfigOptions, TableOptions}; @@ -61,20 +65,19 @@ use datafusion_optimizer::{ }; use datafusion_physical_expr::create_physical_expr; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_optimizer::optimizer::PhysicalOptimizer; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::ExecutionPlan; use datafusion_sql::parser::{DFParser, Statement}; use datafusion_sql::planner::{ContextProvider, ParserOptions, PlannerContext, SqlToRel}; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; use itertools::Itertools; use log::{debug, info}; use object_store::ObjectStore; use sqlparser::ast::{Expr as SQLExpr, ExprWithAlias as SQLExprWithAlias}; use sqlparser::dialect::dialect_from_str; -use std::any::Any; -use std::collections::hash_map::Entry; -use std::collections::{HashMap, HashSet}; -use std::fmt::Debug; -use std::sync::Arc; use url::Url; use uuid::Uuid; diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index e9501bd37a8a..416b69c3afdd 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -207,7 +207,7 @@ //! [`QueryPlanner`]: execution::context::QueryPlanner //! [`OptimizerRule`]: datafusion_optimizer::optimizer::OptimizerRule //! [`AnalyzerRule`]: datafusion_optimizer::analyzer::AnalyzerRule -//! [`PhysicalOptimizerRule`]: crate::physical_optimizer::PhysicalOptimizerRule +//! [`PhysicalOptimizerRule`]: datafusion_physical_optimizer::PhysicalOptimizerRule //! //! ## Query Planning and Execution Overview //! @@ -349,7 +349,7 @@ //! filtering can never be `true` using additional statistical information. //! //! [cp_solver]: crate::physical_expr::intervals::cp_solver -//! [`PruningPredicate`]: crate::physical_optimizer::pruning::PruningPredicate +//! [`PruningPredicate`]: datafusion_physical_optimizer::pruning::PruningPredicate //! [`PhysicalExpr`]: crate::physical_plan::PhysicalExpr //! //! ## Execution @@ -659,7 +659,7 @@ //! [`OptimizerRule`]: optimizer::optimizer::OptimizerRule //! [`ExecutionPlan`]: physical_plan::ExecutionPlan //! [`PhysicalPlanner`]: physical_planner::PhysicalPlanner -//! [`PhysicalOptimizerRule`]: datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule +//! [`PhysicalOptimizerRule`]: datafusion_physical_optimizer::PhysicalOptimizerRule //! [`Schema`]: arrow::datatypes::Schema //! [`PhysicalExpr`]: physical_plan::PhysicalExpr //! [`RecordBatch`]: arrow::record_batch::RecordBatch @@ -677,7 +677,6 @@ pub mod dataframe; pub mod datasource; pub mod error; pub mod execution; -pub mod physical_optimizer; pub mod physical_planner; pub mod prelude; pub mod scalar; diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs deleted file mode 100644 index e6aa15a4c09d..000000000000 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ /dev/null @@ -1,28 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Optimizer that rewrites [`ExecutionPlan`]s. -//! -//! These rules take advantage of physical plan properties , such as -//! "Repartition" or "Sortedness" -//! -//! [`ExecutionPlan`]: crate::physical_plan::ExecutionPlan - -pub mod optimizer; -pub mod projection_pushdown; - -pub use datafusion_physical_optimizer::*; diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs b/datafusion/core/src/physical_optimizer/optimizer.rs deleted file mode 100644 index 7a6f991121ef..000000000000 --- a/datafusion/core/src/physical_optimizer/optimizer.rs +++ /dev/null @@ -1,124 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Physical optimizer traits - -use datafusion_physical_optimizer::PhysicalOptimizerRule; -use std::sync::Arc; - -use super::projection_pushdown::ProjectionPushdown; -use super::update_aggr_exprs::OptimizeAggregateOrder; -use crate::physical_optimizer::aggregate_statistics::AggregateStatistics; -use crate::physical_optimizer::coalesce_batches::CoalesceBatches; -use crate::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate; -use crate::physical_optimizer::enforce_distribution::EnforceDistribution; -use crate::physical_optimizer::enforce_sorting::EnforceSorting; -use crate::physical_optimizer::join_selection::JoinSelection; -use crate::physical_optimizer::limit_pushdown::LimitPushdown; -use crate::physical_optimizer::limited_distinct_aggregation::LimitedDistinctAggregation; -use crate::physical_optimizer::output_requirements::OutputRequirements; -use crate::physical_optimizer::sanity_checker::SanityCheckPlan; -use crate::physical_optimizer::topk_aggregation::TopKAggregation; - -/// A rule-based physical optimizer. -#[derive(Clone, Debug)] -pub struct PhysicalOptimizer { - /// All rules to apply - pub rules: Vec>, -} - -impl Default for PhysicalOptimizer { - fn default() -> Self { - Self::new() - } -} - -impl PhysicalOptimizer { - /// Create a new optimizer using the recommended list of rules - pub fn new() -> Self { - let rules: Vec> = vec![ - // If there is a output requirement of the query, make sure that - // this information is not lost across different rules during optimization. - Arc::new(OutputRequirements::new_add_mode()), - Arc::new(AggregateStatistics::new()), - // Statistics-based join selection will change the Auto mode to a real join implementation, - // like collect left, or hash join, or future sort merge join, which will influence the - // EnforceDistribution and EnforceSorting rules as they decide whether to add additional - // repartitioning and local sorting steps to meet distribution and ordering requirements. - // Therefore, it should run before EnforceDistribution and EnforceSorting. - Arc::new(JoinSelection::new()), - // The LimitedDistinctAggregation rule should be applied before the EnforceDistribution rule, - // as that rule may inject other operations in between the different AggregateExecs. - // Applying the rule early means only directly-connected AggregateExecs must be examined. - Arc::new(LimitedDistinctAggregation::new()), - // The EnforceDistribution rule is for adding essential repartitioning to satisfy distribution - // requirements. Please make sure that the whole plan tree is determined before this rule. - // This rule increases parallelism if doing so is beneficial to the physical plan; i.e. at - // least one of the operators in the plan benefits from increased parallelism. - Arc::new(EnforceDistribution::new()), - // The CombinePartialFinalAggregate rule should be applied after the EnforceDistribution rule - Arc::new(CombinePartialFinalAggregate::new()), - // The EnforceSorting rule is for adding essential local sorting to satisfy the required - // ordering. Please make sure that the whole plan tree is determined before this rule. - // Note that one should always run this rule after running the EnforceDistribution rule - // as the latter may break local sorting requirements. - Arc::new(EnforceSorting::new()), - // Run once after the local sorting requirement is changed - Arc::new(OptimizeAggregateOrder::new()), - // TODO: `try_embed_to_hash_join` in the ProjectionPushdown rule would be block by the CoalesceBatches, so add it before CoalesceBatches. Maybe optimize it in the future. - Arc::new(ProjectionPushdown::new()), - // The CoalesceBatches rule will not influence the distribution and ordering of the - // whole plan tree. Therefore, to avoid influencing other rules, it should run last. - Arc::new(CoalesceBatches::new()), - // Remove the ancillary output requirement operator since we are done with the planning - // phase. - Arc::new(OutputRequirements::new_remove_mode()), - // The aggregation limiter will try to find situations where the accumulator count - // is not tied to the cardinality, i.e. when the output of the aggregation is passed - // into an `order by max(x) limit y`. In this case it will copy the limit value down - // to the aggregation, allowing it to use only y number of accumulators. - Arc::new(TopKAggregation::new()), - // The ProjectionPushdown rule tries to push projections towards - // the sources in the execution plan. As a result of this process, - // a projection can disappear if it reaches the source providers, and - // sequential projections can merge into one. Even if these two cases - // are not present, the load of executors such as join or union will be - // reduced by narrowing their input tables. - Arc::new(ProjectionPushdown::new()), - // The LimitPushdown rule tries to push limits down as far as possible, - // replacing operators with fetching variants, or adding limits - // past operators that support limit pushdown. - Arc::new(LimitPushdown::new()), - // The SanityCheckPlan rule checks whether the order and - // distribution requirements of each node in the plan - // is satisfied. It will also reject non-runnable query - // plans that use pipeline-breaking operators on infinite - // input(s). The rule generates a diagnostic error - // message for invalid plans. It makes no changes to the - // given query plan; i.e. it only acts as a final - // gatekeeping rule. - Arc::new(SanityCheckPlan::new()), - ]; - - Self::with_rules(rules) - } - - /// Create a new optimizer with the given rules - pub fn with_rules(rules: Vec>) -> Self { - Self { rules } - } -} diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs deleted file mode 100644 index 46fddbd3c936..000000000000 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ /dev/null @@ -1,1454 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! This file implements the `ProjectionPushdown` physical optimization rule. -//! The function [`remove_unnecessary_projections`] tries to push down all -//! projections one by one if the operator below is amenable to this. If a -//! projection reaches a source, it can even disappear from the plan entirely. - -use std::sync::Arc; - -use crate::error::Result; -use crate::physical_plan::ExecutionPlan; - -use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{TransformedResult, TreeNode}; - -use datafusion_physical_optimizer::PhysicalOptimizerRule; -use datafusion_physical_plan::projection::remove_unnecessary_projections; - -/// This rule inspects `ProjectionExec`'s in the given physical plan and tries to -/// remove or swap with its child. -#[derive(Default, Debug)] -pub struct ProjectionPushdown {} - -impl ProjectionPushdown { - #[allow(missing_docs)] - pub fn new() -> Self { - Self {} - } -} - -impl PhysicalOptimizerRule for ProjectionPushdown { - fn optimize( - &self, - plan: Arc, - _config: &ConfigOptions, - ) -> Result> { - plan.transform_down(remove_unnecessary_projections).data() - } - - fn name(&self) -> &str { - "ProjectionPushdown" - } - - fn schema_check(&self) -> bool { - true - } -} - -#[cfg(test)] -mod tests { - use std::any::Any; - - use super::*; - use crate::datasource::file_format::file_compression_type::FileCompressionType; - use crate::datasource::listing::PartitionedFile; - use crate::datasource::physical_plan::CsvExec; - use crate::datasource::physical_plan::FileScanConfig; - use crate::physical_plan::get_plan_string; - use crate::physical_plan::joins::{ - HashJoinExec, NestedLoopJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec, - }; - use crate::physical_plan::memory::MemoryExec; - use crate::physical_plan::repartition::RepartitionExec; - use crate::physical_plan::sorts::sort::SortExec; - use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; - - use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions}; - use datafusion_common::{JoinSide, JoinType, ScalarValue}; - use datafusion_execution::object_store::ObjectStoreUrl; - use datafusion_execution::{SendableRecordBatchStream, TaskContext}; - use datafusion_expr::{ - ColumnarValue, Operator, ScalarUDF, ScalarUDFImpl, Signature, Volatility, - }; - use datafusion_physical_expr::expressions::{ - binary, col, BinaryExpr, CaseExpr, CastExpr, Column, Literal, NegativeExpr, - }; - use datafusion_physical_expr::ScalarFunctionExpr; - use datafusion_physical_expr::{ - Distribution, Partitioning, PhysicalExpr, PhysicalSortExpr, - PhysicalSortRequirement, - }; - use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; - use datafusion_physical_optimizer::output_requirements::OutputRequirementExec; - use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; - use datafusion_physical_plan::filter::FilterExec; - use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter}; - use datafusion_physical_plan::joins::PartitionMode; - use datafusion_physical_plan::projection::{update_expr, ProjectionExec}; - use datafusion_physical_plan::streaming::PartitionStream; - use datafusion_physical_plan::streaming::StreamingTableExec; - use datafusion_physical_plan::union::UnionExec; - - use itertools::Itertools; - - /// Mocked UDF - #[derive(Debug)] - struct DummyUDF { - signature: Signature, - } - - impl DummyUDF { - fn new() -> Self { - Self { - signature: Signature::variadic_any(Volatility::Immutable), - } - } - } - - impl ScalarUDFImpl for DummyUDF { - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - "dummy_udf" - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, _arg_types: &[DataType]) -> Result { - Ok(DataType::Int32) - } - - fn invoke_batch( - &self, - _args: &[ColumnarValue], - _number_rows: usize, - ) -> Result { - unimplemented!("DummyUDF::invoke") - } - } - - #[test] - fn test_update_matching_exprs() -> Result<()> { - let exprs: Vec> = vec![ - Arc::new(BinaryExpr::new( - Arc::new(Column::new("a", 3)), - Operator::Divide, - Arc::new(Column::new("e", 5)), - )), - Arc::new(CastExpr::new( - Arc::new(Column::new("a", 3)), - DataType::Float32, - None, - )), - Arc::new(NegativeExpr::new(Arc::new(Column::new("f", 4)))), - Arc::new(ScalarFunctionExpr::new( - "scalar_expr", - Arc::new(ScalarUDF::new_from_impl(DummyUDF::new())), - vec![ - Arc::new(BinaryExpr::new( - Arc::new(Column::new("b", 1)), - Operator::Divide, - Arc::new(Column::new("c", 0)), - )), - Arc::new(BinaryExpr::new( - Arc::new(Column::new("c", 0)), - Operator::Divide, - Arc::new(Column::new("b", 1)), - )), - ], - DataType::Int32, - )), - Arc::new(CaseExpr::try_new( - Some(Arc::new(Column::new("d", 2))), - vec![ - ( - Arc::new(Column::new("a", 3)) as Arc, - Arc::new(BinaryExpr::new( - Arc::new(Column::new("d", 2)), - Operator::Plus, - Arc::new(Column::new("e", 5)), - )) as Arc, - ), - ( - Arc::new(Column::new("a", 3)) as Arc, - Arc::new(BinaryExpr::new( - Arc::new(Column::new("e", 5)), - Operator::Plus, - Arc::new(Column::new("d", 2)), - )) as Arc, - ), - ], - Some(Arc::new(BinaryExpr::new( - Arc::new(Column::new("a", 3)), - Operator::Modulo, - Arc::new(Column::new("e", 5)), - ))), - )?), - ]; - let child: Vec<(Arc, String)> = vec![ - (Arc::new(Column::new("c", 2)), "c".to_owned()), - (Arc::new(Column::new("b", 1)), "b".to_owned()), - (Arc::new(Column::new("d", 3)), "d".to_owned()), - (Arc::new(Column::new("a", 0)), "a".to_owned()), - (Arc::new(Column::new("f", 5)), "f".to_owned()), - (Arc::new(Column::new("e", 4)), "e".to_owned()), - ]; - - let expected_exprs: Vec> = vec![ - Arc::new(BinaryExpr::new( - Arc::new(Column::new("a", 0)), - Operator::Divide, - Arc::new(Column::new("e", 4)), - )), - Arc::new(CastExpr::new( - Arc::new(Column::new("a", 0)), - DataType::Float32, - None, - )), - Arc::new(NegativeExpr::new(Arc::new(Column::new("f", 5)))), - Arc::new(ScalarFunctionExpr::new( - "scalar_expr", - Arc::new(ScalarUDF::new_from_impl(DummyUDF::new())), - vec![ - Arc::new(BinaryExpr::new( - Arc::new(Column::new("b", 1)), - Operator::Divide, - Arc::new(Column::new("c", 2)), - )), - Arc::new(BinaryExpr::new( - Arc::new(Column::new("c", 2)), - Operator::Divide, - Arc::new(Column::new("b", 1)), - )), - ], - DataType::Int32, - )), - Arc::new(CaseExpr::try_new( - Some(Arc::new(Column::new("d", 3))), - vec![ - ( - Arc::new(Column::new("a", 0)) as Arc, - Arc::new(BinaryExpr::new( - Arc::new(Column::new("d", 3)), - Operator::Plus, - Arc::new(Column::new("e", 4)), - )) as Arc, - ), - ( - Arc::new(Column::new("a", 0)) as Arc, - Arc::new(BinaryExpr::new( - Arc::new(Column::new("e", 4)), - Operator::Plus, - Arc::new(Column::new("d", 3)), - )) as Arc, - ), - ], - Some(Arc::new(BinaryExpr::new( - Arc::new(Column::new("a", 0)), - Operator::Modulo, - Arc::new(Column::new("e", 4)), - ))), - )?), - ]; - - for (expr, expected_expr) in exprs.into_iter().zip(expected_exprs.into_iter()) { - assert!(update_expr(&expr, &child, true)? - .unwrap() - .eq(&expected_expr)); - } - - Ok(()) - } - - #[test] - fn test_update_projected_exprs() -> Result<()> { - let exprs: Vec> = vec![ - Arc::new(BinaryExpr::new( - Arc::new(Column::new("a", 3)), - Operator::Divide, - Arc::new(Column::new("e", 5)), - )), - Arc::new(CastExpr::new( - Arc::new(Column::new("a", 3)), - DataType::Float32, - None, - )), - Arc::new(NegativeExpr::new(Arc::new(Column::new("f", 4)))), - Arc::new(ScalarFunctionExpr::new( - "scalar_expr", - Arc::new(ScalarUDF::new_from_impl(DummyUDF::new())), - vec![ - Arc::new(BinaryExpr::new( - Arc::new(Column::new("b", 1)), - Operator::Divide, - Arc::new(Column::new("c", 0)), - )), - Arc::new(BinaryExpr::new( - Arc::new(Column::new("c", 0)), - Operator::Divide, - Arc::new(Column::new("b", 1)), - )), - ], - DataType::Int32, - )), - Arc::new(CaseExpr::try_new( - Some(Arc::new(Column::new("d", 2))), - vec![ - ( - Arc::new(Column::new("a", 3)) as Arc, - Arc::new(BinaryExpr::new( - Arc::new(Column::new("d", 2)), - Operator::Plus, - Arc::new(Column::new("e", 5)), - )) as Arc, - ), - ( - Arc::new(Column::new("a", 3)) as Arc, - Arc::new(BinaryExpr::new( - Arc::new(Column::new("e", 5)), - Operator::Plus, - Arc::new(Column::new("d", 2)), - )) as Arc, - ), - ], - Some(Arc::new(BinaryExpr::new( - Arc::new(Column::new("a", 3)), - Operator::Modulo, - Arc::new(Column::new("e", 5)), - ))), - )?), - ]; - let projected_exprs: Vec<(Arc, String)> = vec![ - (Arc::new(Column::new("a", 3)), "a".to_owned()), - (Arc::new(Column::new("b", 1)), "b_new".to_owned()), - (Arc::new(Column::new("c", 0)), "c".to_owned()), - (Arc::new(Column::new("d", 2)), "d_new".to_owned()), - (Arc::new(Column::new("e", 5)), "e".to_owned()), - (Arc::new(Column::new("f", 4)), "f_new".to_owned()), - ]; - - let expected_exprs: Vec> = vec![ - Arc::new(BinaryExpr::new( - Arc::new(Column::new("a", 0)), - Operator::Divide, - Arc::new(Column::new("e", 4)), - )), - Arc::new(CastExpr::new( - Arc::new(Column::new("a", 0)), - DataType::Float32, - None, - )), - Arc::new(NegativeExpr::new(Arc::new(Column::new("f_new", 5)))), - Arc::new(ScalarFunctionExpr::new( - "scalar_expr", - Arc::new(ScalarUDF::new_from_impl(DummyUDF::new())), - vec![ - Arc::new(BinaryExpr::new( - Arc::new(Column::new("b_new", 1)), - Operator::Divide, - Arc::new(Column::new("c", 2)), - )), - Arc::new(BinaryExpr::new( - Arc::new(Column::new("c", 2)), - Operator::Divide, - Arc::new(Column::new("b_new", 1)), - )), - ], - DataType::Int32, - )), - Arc::new(CaseExpr::try_new( - Some(Arc::new(Column::new("d_new", 3))), - vec![ - ( - Arc::new(Column::new("a", 0)) as Arc, - Arc::new(BinaryExpr::new( - Arc::new(Column::new("d_new", 3)), - Operator::Plus, - Arc::new(Column::new("e", 4)), - )) as Arc, - ), - ( - Arc::new(Column::new("a", 0)) as Arc, - Arc::new(BinaryExpr::new( - Arc::new(Column::new("e", 4)), - Operator::Plus, - Arc::new(Column::new("d_new", 3)), - )) as Arc, - ), - ], - Some(Arc::new(BinaryExpr::new( - Arc::new(Column::new("a", 0)), - Operator::Modulo, - Arc::new(Column::new("e", 4)), - ))), - )?), - ]; - - for (expr, expected_expr) in exprs.into_iter().zip(expected_exprs.into_iter()) { - assert!(update_expr(&expr, &projected_exprs, false)? - .unwrap() - .eq(&expected_expr)); - } - - Ok(()) - } - - fn create_simple_csv_exec() -> Arc { - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Int32, true), - Field::new("c", DataType::Int32, true), - Field::new("d", DataType::Int32, true), - Field::new("e", DataType::Int32, true), - ])); - Arc::new( - CsvExec::builder( - FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema) - .with_file(PartitionedFile::new("x".to_string(), 100)) - .with_projection(Some(vec![0, 1, 2, 3, 4])), - ) - .with_has_header(false) - .with_delimeter(0) - .with_quote(0) - .with_escape(None) - .with_comment(None) - .with_newlines_in_values(false) - .with_file_compression_type(FileCompressionType::UNCOMPRESSED) - .build(), - ) - } - - fn create_projecting_csv_exec() -> Arc { - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Int32, true), - Field::new("c", DataType::Int32, true), - Field::new("d", DataType::Int32, true), - ])); - Arc::new( - CsvExec::builder( - FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema) - .with_file(PartitionedFile::new("x".to_string(), 100)) - .with_projection(Some(vec![3, 2, 1])), - ) - .with_has_header(false) - .with_delimeter(0) - .with_quote(0) - .with_escape(None) - .with_comment(None) - .with_newlines_in_values(false) - .with_file_compression_type(FileCompressionType::UNCOMPRESSED) - .build(), - ) - } - - fn create_projecting_memory_exec() -> Arc { - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Int32, true), - Field::new("c", DataType::Int32, true), - Field::new("d", DataType::Int32, true), - Field::new("e", DataType::Int32, true), - ])); - - Arc::new(MemoryExec::try_new(&[], schema, Some(vec![2, 0, 3, 4])).unwrap()) - } - - #[test] - fn test_csv_after_projection() -> Result<()> { - let csv = create_projecting_csv_exec(); - let projection: Arc = Arc::new(ProjectionExec::try_new( - vec![ - (Arc::new(Column::new("b", 2)), "b".to_string()), - (Arc::new(Column::new("d", 0)), "d".to_string()), - ], - csv.clone(), - )?); - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[b@2 as b, d@0 as d]", - " CsvExec: file_groups={1 group: [[x]]}, projection=[d, c, b], has_header=false", - ]; - assert_eq!(initial, expected_initial); - - let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - - let expected = [ - "CsvExec: file_groups={1 group: [[x]]}, projection=[b, d], has_header=false", - ]; - assert_eq!(get_plan_string(&after_optimize), expected); - - Ok(()) - } - - #[test] - fn test_memory_after_projection() -> Result<()> { - let memory = create_projecting_memory_exec(); - let projection: Arc = Arc::new(ProjectionExec::try_new( - vec![ - (Arc::new(Column::new("d", 2)), "d".to_string()), - (Arc::new(Column::new("e", 3)), "e".to_string()), - (Arc::new(Column::new("a", 1)), "a".to_string()), - ], - memory.clone(), - )?); - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[d@2 as d, e@3 as e, a@1 as a]", - " MemoryExec: partitions=0, partition_sizes=[]", - ]; - assert_eq!(initial, expected_initial); - - let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - - let expected = ["MemoryExec: partitions=0, partition_sizes=[]"]; - assert_eq!(get_plan_string(&after_optimize), expected); - assert_eq!( - after_optimize - .clone() - .as_any() - .downcast_ref::() - .unwrap() - .projection() - .clone() - .unwrap(), - vec![3, 4, 0] - ); - - Ok(()) - } - - #[test] - fn test_streaming_table_after_projection() -> Result<()> { - #[derive(Debug)] - struct DummyStreamPartition { - schema: SchemaRef, - } - impl PartitionStream for DummyStreamPartition { - fn schema(&self) -> &SchemaRef { - &self.schema - } - fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { - unreachable!() - } - } - - let streaming_table = StreamingTableExec::try_new( - Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Int32, true), - Field::new("c", DataType::Int32, true), - Field::new("d", DataType::Int32, true), - Field::new("e", DataType::Int32, true), - ])), - vec![Arc::new(DummyStreamPartition { - schema: Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Int32, true), - Field::new("c", DataType::Int32, true), - Field::new("d", DataType::Int32, true), - Field::new("e", DataType::Int32, true), - ])), - }) as _], - Some(&vec![0_usize, 2, 4, 3]), - vec![ - LexOrdering::new(vec![ - PhysicalSortExpr { - expr: Arc::new(Column::new("e", 2)), - options: SortOptions::default(), - }, - PhysicalSortExpr { - expr: Arc::new(Column::new("a", 0)), - options: SortOptions::default(), - }, - ]), - LexOrdering::new(vec![PhysicalSortExpr { - expr: Arc::new(Column::new("d", 3)), - options: SortOptions::default(), - }]), - ] - .into_iter(), - true, - None, - )?; - let projection = Arc::new(ProjectionExec::try_new( - vec![ - (Arc::new(Column::new("d", 3)), "d".to_string()), - (Arc::new(Column::new("e", 2)), "e".to_string()), - (Arc::new(Column::new("a", 0)), "a".to_string()), - ], - Arc::new(streaming_table) as _, - )?) as _; - - let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - - let result = after_optimize - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!( - result.partition_schema(), - &Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Int32, true), - Field::new("c", DataType::Int32, true), - Field::new("d", DataType::Int32, true), - Field::new("e", DataType::Int32, true), - ])) - ); - assert_eq!( - result.projection().clone().unwrap().to_vec(), - vec![3_usize, 4, 0] - ); - assert_eq!( - result.projected_schema(), - &Schema::new(vec![ - Field::new("d", DataType::Int32, true), - Field::new("e", DataType::Int32, true), - Field::new("a", DataType::Int32, true), - ]) - ); - assert_eq!( - result.projected_output_ordering().into_iter().collect_vec(), - vec![ - LexOrdering::new(vec![ - PhysicalSortExpr { - expr: Arc::new(Column::new("e", 1)), - options: SortOptions::default(), - }, - PhysicalSortExpr { - expr: Arc::new(Column::new("a", 2)), - options: SortOptions::default(), - }, - ]), - LexOrdering::new(vec![PhysicalSortExpr { - expr: Arc::new(Column::new("d", 0)), - options: SortOptions::default(), - }]), - ] - ); - assert!(result.is_infinite()); - - Ok(()) - } - - #[test] - fn test_projection_after_projection() -> Result<()> { - let csv = create_simple_csv_exec(); - let child_projection: Arc = Arc::new(ProjectionExec::try_new( - vec![ - (Arc::new(Column::new("c", 2)), "c".to_string()), - (Arc::new(Column::new("e", 4)), "new_e".to_string()), - (Arc::new(Column::new("a", 0)), "a".to_string()), - (Arc::new(Column::new("b", 1)), "new_b".to_string()), - ], - csv.clone(), - )?); - let top_projection: Arc = Arc::new(ProjectionExec::try_new( - vec![ - (Arc::new(Column::new("new_b", 3)), "new_b".to_string()), - ( - Arc::new(BinaryExpr::new( - Arc::new(Column::new("c", 0)), - Operator::Plus, - Arc::new(Column::new("new_e", 1)), - )), - "binary".to_string(), - ), - (Arc::new(Column::new("new_b", 3)), "newest_b".to_string()), - ], - child_projection.clone(), - )?); - - let initial = get_plan_string(&top_projection); - let expected_initial = [ - "ProjectionExec: expr=[new_b@3 as new_b, c@0 + new_e@1 as binary, new_b@3 as newest_b]", - " ProjectionExec: expr=[c@2 as c, e@4 as new_e, a@0 as a, b@1 as new_b]", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" - ]; - assert_eq!(initial, expected_initial); - - let after_optimize = - ProjectionPushdown::new().optimize(top_projection, &ConfigOptions::new())?; - - let expected = [ - "ProjectionExec: expr=[b@1 as new_b, c@2 + e@4 as binary, b@1 as newest_b]", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" - ]; - assert_eq!(get_plan_string(&after_optimize), expected); - - Ok(()) - } - - #[test] - fn test_output_req_after_projection() -> Result<()> { - let csv = create_simple_csv_exec(); - let sort_req: Arc = Arc::new(OutputRequirementExec::new( - csv.clone(), - Some(LexRequirement::new(vec![ - PhysicalSortRequirement { - expr: Arc::new(Column::new("b", 1)), - options: Some(SortOptions::default()), - }, - PhysicalSortRequirement { - expr: Arc::new(BinaryExpr::new( - Arc::new(Column::new("c", 2)), - Operator::Plus, - Arc::new(Column::new("a", 0)), - )), - options: Some(SortOptions::default()), - }, - ])), - Distribution::HashPartitioned(vec![ - Arc::new(Column::new("a", 0)), - Arc::new(Column::new("b", 1)), - ]), - )); - let projection: Arc = Arc::new(ProjectionExec::try_new( - vec![ - (Arc::new(Column::new("c", 2)), "c".to_string()), - (Arc::new(Column::new("a", 0)), "new_a".to_string()), - (Arc::new(Column::new("b", 1)), "b".to_string()), - ], - sort_req.clone(), - )?); - - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", - " OutputRequirementExec", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" - ]; - assert_eq!(initial, expected_initial); - - let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - - let expected: [&str; 3] = [ - "OutputRequirementExec", - " ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" - ]; - - assert_eq!(get_plan_string(&after_optimize), expected); - let expected_reqs = LexRequirement::new(vec![ - PhysicalSortRequirement { - expr: Arc::new(Column::new("b", 2)), - options: Some(SortOptions::default()), - }, - PhysicalSortRequirement { - expr: Arc::new(BinaryExpr::new( - Arc::new(Column::new("c", 0)), - Operator::Plus, - Arc::new(Column::new("new_a", 1)), - )), - options: Some(SortOptions::default()), - }, - ]); - assert_eq!( - after_optimize - .as_any() - .downcast_ref::() - .unwrap() - .required_input_ordering()[0] - .clone() - .unwrap(), - expected_reqs - ); - let expected_distribution: Vec> = vec![ - Arc::new(Column::new("new_a", 1)), - Arc::new(Column::new("b", 2)), - ]; - if let Distribution::HashPartitioned(vec) = after_optimize - .as_any() - .downcast_ref::() - .unwrap() - .required_input_distribution()[0] - .clone() - { - assert!(vec - .iter() - .zip(expected_distribution) - .all(|(actual, expected)| actual.eq(&expected))); - } else { - panic!("Expected HashPartitioned distribution!"); - }; - - Ok(()) - } - - #[test] - fn test_coalesce_partitions_after_projection() -> Result<()> { - let csv = create_simple_csv_exec(); - let coalesce_partitions: Arc = - Arc::new(CoalescePartitionsExec::new(csv)); - let projection: Arc = Arc::new(ProjectionExec::try_new( - vec![ - (Arc::new(Column::new("b", 1)), "b".to_string()), - (Arc::new(Column::new("a", 0)), "a_new".to_string()), - (Arc::new(Column::new("d", 3)), "d".to_string()), - ], - coalesce_partitions, - )?); - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[b@1 as b, a@0 as a_new, d@3 as d]", - " CoalescePartitionsExec", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", - ]; - assert_eq!(initial, expected_initial); - - let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - - let expected = [ - "CoalescePartitionsExec", - " ProjectionExec: expr=[b@1 as b, a@0 as a_new, d@3 as d]", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", - ]; - assert_eq!(get_plan_string(&after_optimize), expected); - - Ok(()) - } - - #[test] - fn test_filter_after_projection() -> Result<()> { - let csv = create_simple_csv_exec(); - let predicate = Arc::new(BinaryExpr::new( - Arc::new(BinaryExpr::new( - Arc::new(Column::new("b", 1)), - Operator::Minus, - Arc::new(Column::new("a", 0)), - )), - Operator::Gt, - Arc::new(BinaryExpr::new( - Arc::new(Column::new("d", 3)), - Operator::Minus, - Arc::new(Column::new("a", 0)), - )), - )); - let filter: Arc = - Arc::new(FilterExec::try_new(predicate, csv)?); - let projection: Arc = Arc::new(ProjectionExec::try_new( - vec![ - (Arc::new(Column::new("a", 0)), "a_new".to_string()), - (Arc::new(Column::new("b", 1)), "b".to_string()), - (Arc::new(Column::new("d", 3)), "d".to_string()), - ], - filter.clone(), - )?); - - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[a@0 as a_new, b@1 as b, d@3 as d]", - " FilterExec: b@1 - a@0 > d@3 - a@0", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", - ]; - assert_eq!(initial, expected_initial); - - let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - - let expected = [ - "FilterExec: b@1 - a_new@0 > d@2 - a_new@0", - " ProjectionExec: expr=[a@0 as a_new, b@1 as b, d@3 as d]", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", - ]; - assert_eq!(get_plan_string(&after_optimize), expected); - - Ok(()) - } - - #[test] - fn test_join_after_projection() -> Result<()> { - let left_csv = create_simple_csv_exec(); - let right_csv = create_simple_csv_exec(); - - let join: Arc = Arc::new(SymmetricHashJoinExec::try_new( - left_csv, - right_csv, - vec![(Arc::new(Column::new("b", 1)), Arc::new(Column::new("c", 2)))], - // b_left-(1+a_right)<=a_right+c_left - Some(JoinFilter::new( - Arc::new(BinaryExpr::new( - Arc::new(BinaryExpr::new( - Arc::new(Column::new("b_left_inter", 0)), - Operator::Minus, - Arc::new(BinaryExpr::new( - Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), - Operator::Plus, - Arc::new(Column::new("a_right_inter", 1)), - )), - )), - Operator::LtEq, - Arc::new(BinaryExpr::new( - Arc::new(Column::new("a_right_inter", 1)), - Operator::Plus, - Arc::new(Column::new("c_left_inter", 2)), - )), - )), - vec![ - ColumnIndex { - index: 1, - side: JoinSide::Left, - }, - ColumnIndex { - index: 0, - side: JoinSide::Right, - }, - ColumnIndex { - index: 2, - side: JoinSide::Left, - }, - ], - Arc::new(Schema::new(vec![ - Field::new("b_left_inter", DataType::Int32, true), - Field::new("a_right_inter", DataType::Int32, true), - Field::new("c_left_inter", DataType::Int32, true), - ])), - )), - &JoinType::Inner, - true, - None, - None, - StreamJoinPartitionMode::SinglePartition, - )?); - let projection: Arc = Arc::new(ProjectionExec::try_new( - vec![ - (Arc::new(Column::new("c", 2)), "c_from_left".to_string()), - (Arc::new(Column::new("b", 1)), "b_from_left".to_string()), - (Arc::new(Column::new("a", 0)), "a_from_left".to_string()), - (Arc::new(Column::new("a", 5)), "a_from_right".to_string()), - (Arc::new(Column::new("c", 7)), "c_from_right".to_string()), - ], - join, - )?); - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left, a@5 as a_from_right, c@7 as c_from_right]", - " SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" - ]; - assert_eq!(initial, expected_initial); - - let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - - let expected = [ - "SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b_from_left@1, c_from_right@1)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2", - " ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left]", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", - " ProjectionExec: expr=[a@0 as a_from_right, c@2 as c_from_right]", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" - ]; - assert_eq!(get_plan_string(&after_optimize), expected); - - let expected_filter_col_ind = vec![ - ColumnIndex { - index: 1, - side: JoinSide::Left, - }, - ColumnIndex { - index: 0, - side: JoinSide::Right, - }, - ColumnIndex { - index: 0, - side: JoinSide::Left, - }, - ]; - - assert_eq!( - expected_filter_col_ind, - after_optimize - .as_any() - .downcast_ref::() - .unwrap() - .filter() - .unwrap() - .column_indices() - ); - - Ok(()) - } - - #[test] - fn test_join_after_required_projection() -> Result<()> { - let left_csv = create_simple_csv_exec(); - let right_csv = create_simple_csv_exec(); - - let join: Arc = Arc::new(SymmetricHashJoinExec::try_new( - left_csv, - right_csv, - vec![(Arc::new(Column::new("b", 1)), Arc::new(Column::new("c", 2)))], - // b_left-(1+a_right)<=a_right+c_left - Some(JoinFilter::new( - Arc::new(BinaryExpr::new( - Arc::new(BinaryExpr::new( - Arc::new(Column::new("b_left_inter", 0)), - Operator::Minus, - Arc::new(BinaryExpr::new( - Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), - Operator::Plus, - Arc::new(Column::new("a_right_inter", 1)), - )), - )), - Operator::LtEq, - Arc::new(BinaryExpr::new( - Arc::new(Column::new("a_right_inter", 1)), - Operator::Plus, - Arc::new(Column::new("c_left_inter", 2)), - )), - )), - vec![ - ColumnIndex { - index: 1, - side: JoinSide::Left, - }, - ColumnIndex { - index: 0, - side: JoinSide::Right, - }, - ColumnIndex { - index: 2, - side: JoinSide::Left, - }, - ], - Arc::new(Schema::new(vec![ - Field::new("b_left_inter", DataType::Int32, true), - Field::new("a_right_inter", DataType::Int32, true), - Field::new("c_left_inter", DataType::Int32, true), - ])), - )), - &JoinType::Inner, - true, - None, - None, - StreamJoinPartitionMode::SinglePartition, - )?); - let projection: Arc = Arc::new(ProjectionExec::try_new( - vec![ - (Arc::new(Column::new("a", 5)), "a".to_string()), - (Arc::new(Column::new("b", 6)), "b".to_string()), - (Arc::new(Column::new("c", 7)), "c".to_string()), - (Arc::new(Column::new("d", 8)), "d".to_string()), - (Arc::new(Column::new("e", 9)), "e".to_string()), - (Arc::new(Column::new("a", 0)), "a".to_string()), - (Arc::new(Column::new("b", 1)), "b".to_string()), - (Arc::new(Column::new("c", 2)), "c".to_string()), - (Arc::new(Column::new("d", 3)), "d".to_string()), - (Arc::new(Column::new("e", 4)), "e".to_string()), - ], - join, - )?); - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[a@5 as a, b@6 as b, c@7 as c, d@8 as d, e@9 as e, a@0 as a, b@1 as b, c@2 as c, d@3 as d, e@4 as e]", - " SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" - ]; - assert_eq!(initial, expected_initial); - - let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - - let expected = [ - "ProjectionExec: expr=[a@5 as a, b@6 as b, c@7 as c, d@8 as d, e@9 as e, a@0 as a, b@1 as b, c@2 as c, d@3 as d, e@4 as e]", - " SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" - ]; - assert_eq!(get_plan_string(&after_optimize), expected); - Ok(()) - } - - #[test] - fn test_nested_loop_join_after_projection() -> Result<()> { - let left_csv = create_simple_csv_exec(); - let right_csv = create_simple_csv_exec(); - - let col_left_a = col("a", &left_csv.schema())?; - let col_right_b = col("b", &right_csv.schema())?; - let col_left_c = col("c", &left_csv.schema())?; - // left_a < right_b - let filter_expr = - binary(col_left_a, Operator::Lt, col_right_b, &Schema::empty())?; - let filter_column_indices = vec![ - ColumnIndex { - index: 0, - side: JoinSide::Left, - }, - ColumnIndex { - index: 1, - side: JoinSide::Right, - }, - ColumnIndex { - index: 2, - side: JoinSide::Right, - }, - ]; - let filter_schema = Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Int32, true), - Field::new("c", DataType::Int32, true), - ]); - - let join: Arc = Arc::new(NestedLoopJoinExec::try_new( - left_csv, - right_csv, - Some(JoinFilter::new( - filter_expr, - filter_column_indices, - Arc::new(filter_schema), - )), - &JoinType::Inner, - None, - )?); - - let projection: Arc = Arc::new(ProjectionExec::try_new( - vec![(col_left_c, "c".to_string())], - Arc::clone(&join), - )?); - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[c@2 as c]", - " NestedLoopJoinExec: join_type=Inner, filter=a@0 < b@1", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", - ]; - assert_eq!(initial, expected_initial); - - let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - let expected = [ - "NestedLoopJoinExec: join_type=Inner, filter=a@0 < b@1, projection=[c@2]", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", - ]; - assert_eq!(get_plan_string(&after_optimize), expected); - Ok(()) - } - - #[test] - fn test_hash_join_after_projection() -> Result<()> { - // sql like - // SELECT t1.c as c_from_left, t1.b as b_from_left, t1.a as a_from_left, t2.c as c_from_right FROM t1 JOIN t2 ON t1.b = t2.c WHERE t1.b - (1 + t2.a) <= t2.a + t1.c - let left_csv = create_simple_csv_exec(); - let right_csv = create_simple_csv_exec(); - - let join: Arc = Arc::new(HashJoinExec::try_new( - left_csv, - right_csv, - vec![(Arc::new(Column::new("b", 1)), Arc::new(Column::new("c", 2)))], - // b_left-(1+a_right)<=a_right+c_left - Some(JoinFilter::new( - Arc::new(BinaryExpr::new( - Arc::new(BinaryExpr::new( - Arc::new(Column::new("b_left_inter", 0)), - Operator::Minus, - Arc::new(BinaryExpr::new( - Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), - Operator::Plus, - Arc::new(Column::new("a_right_inter", 1)), - )), - )), - Operator::LtEq, - Arc::new(BinaryExpr::new( - Arc::new(Column::new("a_right_inter", 1)), - Operator::Plus, - Arc::new(Column::new("c_left_inter", 2)), - )), - )), - vec![ - ColumnIndex { - index: 1, - side: JoinSide::Left, - }, - ColumnIndex { - index: 0, - side: JoinSide::Right, - }, - ColumnIndex { - index: 2, - side: JoinSide::Left, - }, - ], - Arc::new(Schema::new(vec![ - Field::new("b_left_inter", DataType::Int32, true), - Field::new("a_right_inter", DataType::Int32, true), - Field::new("c_left_inter", DataType::Int32, true), - ])), - )), - &JoinType::Inner, - None, - PartitionMode::Auto, - true, - )?); - let projection: Arc = Arc::new(ProjectionExec::try_new( - vec![ - (Arc::new(Column::new("c", 2)), "c_from_left".to_string()), - (Arc::new(Column::new("b", 1)), "b_from_left".to_string()), - (Arc::new(Column::new("a", 0)), "a_from_left".to_string()), - (Arc::new(Column::new("c", 7)), "c_from_right".to_string()), - ], - join.clone(), - )?); - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left, c@7 as c_from_right]", " HashJoinExec: mode=Auto, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2", " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" - ]; - assert_eq!(initial, expected_initial); - - let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - - // HashJoinExec only returns result after projection. Because there are some alias columns in the projection, the ProjectionExec is not removed. - let expected = ["ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left, c@3 as c_from_right]", " HashJoinExec: mode=Auto, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2, projection=[a@0, b@1, c@2, c@7]", " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"]; - assert_eq!(get_plan_string(&after_optimize), expected); - - let projection: Arc = Arc::new(ProjectionExec::try_new( - vec![ - (Arc::new(Column::new("a", 0)), "a".to_string()), - (Arc::new(Column::new("b", 1)), "b".to_string()), - (Arc::new(Column::new("c", 2)), "c".to_string()), - (Arc::new(Column::new("c", 7)), "c".to_string()), - ], - join.clone(), - )?); - - let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - - // Comparing to the previous result, this projection don't have alias columns either change the order of output fields. So the ProjectionExec is removed. - let expected = ["HashJoinExec: mode=Auto, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2, projection=[a@0, b@1, c@2, c@7]", " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"]; - assert_eq!(get_plan_string(&after_optimize), expected); - - Ok(()) - } - - #[test] - fn test_repartition_after_projection() -> Result<()> { - let csv = create_simple_csv_exec(); - let repartition: Arc = Arc::new(RepartitionExec::try_new( - csv, - Partitioning::Hash( - vec![ - Arc::new(Column::new("a", 0)), - Arc::new(Column::new("b", 1)), - Arc::new(Column::new("d", 3)), - ], - 6, - ), - )?); - let projection: Arc = Arc::new(ProjectionExec::try_new( - vec![ - (Arc::new(Column::new("b", 1)), "b_new".to_string()), - (Arc::new(Column::new("a", 0)), "a".to_string()), - (Arc::new(Column::new("d", 3)), "d_new".to_string()), - ], - repartition, - )?); - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[b@1 as b_new, a@0 as a, d@3 as d_new]", - " RepartitionExec: partitioning=Hash([a@0, b@1, d@3], 6), input_partitions=1", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", - ]; - assert_eq!(initial, expected_initial); - - let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - - let expected = [ - "RepartitionExec: partitioning=Hash([a@1, b_new@0, d_new@2], 6), input_partitions=1", - " ProjectionExec: expr=[b@1 as b_new, a@0 as a, d@3 as d_new]", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", - ]; - assert_eq!(get_plan_string(&after_optimize), expected); - - assert_eq!( - after_optimize - .as_any() - .downcast_ref::() - .unwrap() - .partitioning() - .clone(), - Partitioning::Hash( - vec![ - Arc::new(Column::new("a", 1)), - Arc::new(Column::new("b_new", 0)), - Arc::new(Column::new("d_new", 2)), - ], - 6, - ), - ); - - Ok(()) - } - - #[test] - fn test_sort_after_projection() -> Result<()> { - let csv = create_simple_csv_exec(); - let sort_req: Arc = Arc::new(SortExec::new( - LexOrdering::new(vec![ - PhysicalSortExpr { - expr: Arc::new(Column::new("b", 1)), - options: SortOptions::default(), - }, - PhysicalSortExpr { - expr: Arc::new(BinaryExpr::new( - Arc::new(Column::new("c", 2)), - Operator::Plus, - Arc::new(Column::new("a", 0)), - )), - options: SortOptions::default(), - }, - ]), - csv.clone(), - )); - let projection: Arc = Arc::new(ProjectionExec::try_new( - vec![ - (Arc::new(Column::new("c", 2)), "c".to_string()), - (Arc::new(Column::new("a", 0)), "new_a".to_string()), - (Arc::new(Column::new("b", 1)), "b".to_string()), - ], - sort_req.clone(), - )?); - - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", - " SortExec: expr=[b@1 ASC, c@2 + a@0 ASC], preserve_partitioning=[false]", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" - ]; - assert_eq!(initial, expected_initial); - - let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - - let expected = [ - "SortExec: expr=[b@2 ASC, c@0 + new_a@1 ASC], preserve_partitioning=[false]", - " ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" - ]; - assert_eq!(get_plan_string(&after_optimize), expected); - - Ok(()) - } - - #[test] - fn test_sort_preserving_after_projection() -> Result<()> { - let csv = create_simple_csv_exec(); - let sort_req: Arc = Arc::new(SortPreservingMergeExec::new( - LexOrdering::new(vec![ - PhysicalSortExpr { - expr: Arc::new(Column::new("b", 1)), - options: SortOptions::default(), - }, - PhysicalSortExpr { - expr: Arc::new(BinaryExpr::new( - Arc::new(Column::new("c", 2)), - Operator::Plus, - Arc::new(Column::new("a", 0)), - )), - options: SortOptions::default(), - }, - ]), - csv.clone(), - )); - let projection: Arc = Arc::new(ProjectionExec::try_new( - vec![ - (Arc::new(Column::new("c", 2)), "c".to_string()), - (Arc::new(Column::new("a", 0)), "new_a".to_string()), - (Arc::new(Column::new("b", 1)), "b".to_string()), - ], - sort_req.clone(), - )?); - - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", - " SortPreservingMergeExec: [b@1 ASC, c@2 + a@0 ASC]", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" - ]; - assert_eq!(initial, expected_initial); - - let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - - let expected = [ - "SortPreservingMergeExec: [b@2 ASC, c@0 + new_a@1 ASC]", - " ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" - ]; - assert_eq!(get_plan_string(&after_optimize), expected); - - Ok(()) - } - - #[test] - fn test_union_after_projection() -> Result<()> { - let csv = create_simple_csv_exec(); - let union: Arc = - Arc::new(UnionExec::new(vec![csv.clone(), csv.clone(), csv])); - let projection: Arc = Arc::new(ProjectionExec::try_new( - vec![ - (Arc::new(Column::new("c", 2)), "c".to_string()), - (Arc::new(Column::new("a", 0)), "new_a".to_string()), - (Arc::new(Column::new("b", 1)), "b".to_string()), - ], - union.clone(), - )?); - - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", - " UnionExec", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" - ]; - assert_eq!(initial, expected_initial); - - let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - - let expected = [ - "UnionExec", - " ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", - " ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", - " ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" - ]; - assert_eq!(get_plan_string(&after_optimize), expected); - - Ok(()) - } -} diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index b6f2f8e9ac4a..77e4b491da6d 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -16,45 +16,45 @@ // under the License. //! This module contains tests for limiting memory at runtime in DataFusion + +use std::any::Any; +use std::num::NonZeroUsize; +use std::sync::{Arc, LazyLock}; + #[cfg(feature = "extended_tests")] mod memory_limit_validation; - use arrow::datatypes::{Int32Type, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::{ArrayRef, DictionaryArray}; use arrow_schema::SortOptions; -use async_trait::async_trait; use datafusion::assert_batches_eq; -use datafusion::physical_optimizer::PhysicalOptimizerRule; -use datafusion::physical_plan::memory::MemoryExec; -use datafusion::physical_plan::streaming::PartitionStream; -use datafusion_execution::memory_pool::{ - GreedyMemoryPool, MemoryPool, TrackConsumersPool, -}; -use datafusion_expr::{Expr, TableType}; -use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; -use datafusion_physical_plan::spill::get_record_batch_memory_size; -use futures::StreamExt; -use std::any::Any; -use std::num::NonZeroUsize; -use std::sync::{Arc, LazyLock}; -use tokio::fs::File; - use datafusion::datasource::streaming::StreamingTable; use datafusion::datasource::{MemTable, TableProvider}; use datafusion::execution::disk_manager::DiskManagerConfig; use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::execution::session_state::SessionStateBuilder; -use datafusion::physical_optimizer::join_selection::JoinSelection; +use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::streaming::PartitionStream; use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; -use datafusion_common::{assert_contains, Result}; - use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_catalog::Session; +use datafusion_common::{assert_contains, Result}; +use datafusion_execution::memory_pool::{ + GreedyMemoryPool, MemoryPool, TrackConsumersPool, +}; use datafusion_execution::TaskContext; +use datafusion_expr::{Expr, TableType}; +use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion_physical_optimizer::join_selection::JoinSelection; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::spill::get_record_batch_memory_size; use test_utils::AccessLogGenerator; +use async_trait::async_trait; +use futures::StreamExt; +use tokio::fs::File; + #[cfg(test)] #[ctor::ctor] fn init() { diff --git a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs index 74388618015b..f0588e45cc6a 100644 --- a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs @@ -25,7 +25,6 @@ use std::sync::Arc; use crate::physical_optimizer::test_utils::{parquet_exec, trim_plan_display}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate; use datafusion_common::config::ConfigOptions; use datafusion_functions_aggregate::count::count_udaf; use datafusion_functions_aggregate::sum::sum_udaf; @@ -33,6 +32,7 @@ use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctio use datafusion_physical_expr::expressions::{col, lit}; use datafusion_physical_expr::Partitioning; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, diff --git a/datafusion/core/tests/physical_optimizer/mod.rs b/datafusion/core/tests/physical_optimizer/mod.rs index 1cf8ce6007d0..7d5d07715eeb 100644 --- a/datafusion/core/tests/physical_optimizer/mod.rs +++ b/datafusion/core/tests/physical_optimizer/mod.rs @@ -24,6 +24,7 @@ mod enforce_sorting; mod join_selection; mod limit_pushdown; mod limited_distinct_aggregation; +mod projection_pushdown; mod replace_with_order_preserving_variants; mod sanity_checker; mod test_utils; diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs new file mode 100644 index 000000000000..fc576e929591 --- /dev/null +++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs @@ -0,0 +1,1403 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions}; +use datafusion::datasource::file_format::file_compression_type::FileCompressionType; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::physical_plan::{CsvExec, FileScanConfig}; +use datafusion_common::config::ConfigOptions; +use datafusion_common::Result; +use datafusion_common::{JoinSide, JoinType, ScalarValue}; +use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_expr::{ + ColumnarValue, Operator, ScalarUDF, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion_physical_expr::expressions::{ + binary, col, BinaryExpr, CaseExpr, CastExpr, Column, Literal, NegativeExpr, +}; +use datafusion_physical_expr::ScalarFunctionExpr; +use datafusion_physical_expr::{ + Distribution, Partitioning, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, +}; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; +use datafusion_physical_optimizer::output_requirements::OutputRequirementExec; +use datafusion_physical_optimizer::projection_pushdown::ProjectionPushdown; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter}; +use datafusion_physical_plan::joins::{ + HashJoinExec, NestedLoopJoinExec, PartitionMode, StreamJoinPartitionMode, + SymmetricHashJoinExec, +}; +use datafusion_physical_plan::memory::MemoryExec; +use datafusion_physical_plan::projection::{update_expr, ProjectionExec}; +use datafusion_physical_plan::repartition::RepartitionExec; +use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use datafusion_physical_plan::streaming::PartitionStream; +use datafusion_physical_plan::streaming::StreamingTableExec; +use datafusion_physical_plan::union::UnionExec; +use datafusion_physical_plan::{get_plan_string, ExecutionPlan}; + +use itertools::Itertools; + +/// Mocked UDF +#[derive(Debug)] +struct DummyUDF { + signature: Signature, +} + +impl DummyUDF { + fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for DummyUDF { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "dummy_udf" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int32) + } + + fn invoke_batch( + &self, + _args: &[ColumnarValue], + _number_rows: usize, + ) -> Result { + unimplemented!("DummyUDF::invoke") + } +} + +#[test] +fn test_update_matching_exprs() -> Result<()> { + let exprs: Vec> = vec![ + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 3)), + Operator::Divide, + Arc::new(Column::new("e", 5)), + )), + Arc::new(CastExpr::new( + Arc::new(Column::new("a", 3)), + DataType::Float32, + None, + )), + Arc::new(NegativeExpr::new(Arc::new(Column::new("f", 4)))), + Arc::new(ScalarFunctionExpr::new( + "scalar_expr", + Arc::new(ScalarUDF::new_from_impl(DummyUDF::new())), + vec![ + Arc::new(BinaryExpr::new( + Arc::new(Column::new("b", 1)), + Operator::Divide, + Arc::new(Column::new("c", 0)), + )), + Arc::new(BinaryExpr::new( + Arc::new(Column::new("c", 0)), + Operator::Divide, + Arc::new(Column::new("b", 1)), + )), + ], + DataType::Int32, + )), + Arc::new(CaseExpr::try_new( + Some(Arc::new(Column::new("d", 2))), + vec![ + ( + Arc::new(Column::new("a", 3)) as Arc, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("d", 2)), + Operator::Plus, + Arc::new(Column::new("e", 5)), + )) as Arc, + ), + ( + Arc::new(Column::new("a", 3)) as Arc, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("e", 5)), + Operator::Plus, + Arc::new(Column::new("d", 2)), + )) as Arc, + ), + ], + Some(Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 3)), + Operator::Modulo, + Arc::new(Column::new("e", 5)), + ))), + )?), + ]; + let child: Vec<(Arc, String)> = vec![ + (Arc::new(Column::new("c", 2)), "c".to_owned()), + (Arc::new(Column::new("b", 1)), "b".to_owned()), + (Arc::new(Column::new("d", 3)), "d".to_owned()), + (Arc::new(Column::new("a", 0)), "a".to_owned()), + (Arc::new(Column::new("f", 5)), "f".to_owned()), + (Arc::new(Column::new("e", 4)), "e".to_owned()), + ]; + + let expected_exprs: Vec> = vec![ + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Divide, + Arc::new(Column::new("e", 4)), + )), + Arc::new(CastExpr::new( + Arc::new(Column::new("a", 0)), + DataType::Float32, + None, + )), + Arc::new(NegativeExpr::new(Arc::new(Column::new("f", 5)))), + Arc::new(ScalarFunctionExpr::new( + "scalar_expr", + Arc::new(ScalarUDF::new_from_impl(DummyUDF::new())), + vec![ + Arc::new(BinaryExpr::new( + Arc::new(Column::new("b", 1)), + Operator::Divide, + Arc::new(Column::new("c", 2)), + )), + Arc::new(BinaryExpr::new( + Arc::new(Column::new("c", 2)), + Operator::Divide, + Arc::new(Column::new("b", 1)), + )), + ], + DataType::Int32, + )), + Arc::new(CaseExpr::try_new( + Some(Arc::new(Column::new("d", 3))), + vec![ + ( + Arc::new(Column::new("a", 0)) as Arc, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("d", 3)), + Operator::Plus, + Arc::new(Column::new("e", 4)), + )) as Arc, + ), + ( + Arc::new(Column::new("a", 0)) as Arc, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("e", 4)), + Operator::Plus, + Arc::new(Column::new("d", 3)), + )) as Arc, + ), + ], + Some(Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Modulo, + Arc::new(Column::new("e", 4)), + ))), + )?), + ]; + + for (expr, expected_expr) in exprs.into_iter().zip(expected_exprs.into_iter()) { + assert!(update_expr(&expr, &child, true)? + .unwrap() + .eq(&expected_expr)); + } + + Ok(()) +} + +#[test] +fn test_update_projected_exprs() -> Result<()> { + let exprs: Vec> = vec![ + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 3)), + Operator::Divide, + Arc::new(Column::new("e", 5)), + )), + Arc::new(CastExpr::new( + Arc::new(Column::new("a", 3)), + DataType::Float32, + None, + )), + Arc::new(NegativeExpr::new(Arc::new(Column::new("f", 4)))), + Arc::new(ScalarFunctionExpr::new( + "scalar_expr", + Arc::new(ScalarUDF::new_from_impl(DummyUDF::new())), + vec![ + Arc::new(BinaryExpr::new( + Arc::new(Column::new("b", 1)), + Operator::Divide, + Arc::new(Column::new("c", 0)), + )), + Arc::new(BinaryExpr::new( + Arc::new(Column::new("c", 0)), + Operator::Divide, + Arc::new(Column::new("b", 1)), + )), + ], + DataType::Int32, + )), + Arc::new(CaseExpr::try_new( + Some(Arc::new(Column::new("d", 2))), + vec![ + ( + Arc::new(Column::new("a", 3)) as Arc, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("d", 2)), + Operator::Plus, + Arc::new(Column::new("e", 5)), + )) as Arc, + ), + ( + Arc::new(Column::new("a", 3)) as Arc, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("e", 5)), + Operator::Plus, + Arc::new(Column::new("d", 2)), + )) as Arc, + ), + ], + Some(Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 3)), + Operator::Modulo, + Arc::new(Column::new("e", 5)), + ))), + )?), + ]; + let projected_exprs: Vec<(Arc, String)> = vec![ + (Arc::new(Column::new("a", 3)), "a".to_owned()), + (Arc::new(Column::new("b", 1)), "b_new".to_owned()), + (Arc::new(Column::new("c", 0)), "c".to_owned()), + (Arc::new(Column::new("d", 2)), "d_new".to_owned()), + (Arc::new(Column::new("e", 5)), "e".to_owned()), + (Arc::new(Column::new("f", 4)), "f_new".to_owned()), + ]; + + let expected_exprs: Vec> = vec![ + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Divide, + Arc::new(Column::new("e", 4)), + )), + Arc::new(CastExpr::new( + Arc::new(Column::new("a", 0)), + DataType::Float32, + None, + )), + Arc::new(NegativeExpr::new(Arc::new(Column::new("f_new", 5)))), + Arc::new(ScalarFunctionExpr::new( + "scalar_expr", + Arc::new(ScalarUDF::new_from_impl(DummyUDF::new())), + vec![ + Arc::new(BinaryExpr::new( + Arc::new(Column::new("b_new", 1)), + Operator::Divide, + Arc::new(Column::new("c", 2)), + )), + Arc::new(BinaryExpr::new( + Arc::new(Column::new("c", 2)), + Operator::Divide, + Arc::new(Column::new("b_new", 1)), + )), + ], + DataType::Int32, + )), + Arc::new(CaseExpr::try_new( + Some(Arc::new(Column::new("d_new", 3))), + vec![ + ( + Arc::new(Column::new("a", 0)) as Arc, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("d_new", 3)), + Operator::Plus, + Arc::new(Column::new("e", 4)), + )) as Arc, + ), + ( + Arc::new(Column::new("a", 0)) as Arc, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("e", 4)), + Operator::Plus, + Arc::new(Column::new("d_new", 3)), + )) as Arc, + ), + ], + Some(Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Modulo, + Arc::new(Column::new("e", 4)), + ))), + )?), + ]; + + for (expr, expected_expr) in exprs.into_iter().zip(expected_exprs.into_iter()) { + assert!(update_expr(&expr, &projected_exprs, false)? + .unwrap() + .eq(&expected_expr)); + } + + Ok(()) +} + +fn create_simple_csv_exec() -> Arc { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + Field::new("d", DataType::Int32, true), + Field::new("e", DataType::Int32, true), + ])); + Arc::new( + CsvExec::builder( + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_projection(Some(vec![0, 1, 2, 3, 4])), + ) + .with_has_header(false) + .with_delimeter(0) + .with_quote(0) + .with_escape(None) + .with_comment(None) + .with_newlines_in_values(false) + .with_file_compression_type(FileCompressionType::UNCOMPRESSED) + .build(), + ) +} + +fn create_projecting_csv_exec() -> Arc { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + Field::new("d", DataType::Int32, true), + ])); + Arc::new( + CsvExec::builder( + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_projection(Some(vec![3, 2, 1])), + ) + .with_has_header(false) + .with_delimeter(0) + .with_quote(0) + .with_escape(None) + .with_comment(None) + .with_newlines_in_values(false) + .with_file_compression_type(FileCompressionType::UNCOMPRESSED) + .build(), + ) +} + +fn create_projecting_memory_exec() -> Arc { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + Field::new("d", DataType::Int32, true), + Field::new("e", DataType::Int32, true), + ])); + + Arc::new(MemoryExec::try_new(&[], schema, Some(vec![2, 0, 3, 4])).unwrap()) +} + +#[test] +fn test_csv_after_projection() -> Result<()> { + let csv = create_projecting_csv_exec(); + let projection: Arc = Arc::new(ProjectionExec::try_new( + vec![ + (Arc::new(Column::new("b", 2)), "b".to_string()), + (Arc::new(Column::new("d", 0)), "d".to_string()), + ], + csv.clone(), + )?); + let initial = get_plan_string(&projection); + let expected_initial = [ + "ProjectionExec: expr=[b@2 as b, d@0 as d]", + " CsvExec: file_groups={1 group: [[x]]}, projection=[d, c, b], has_header=false", + ]; + assert_eq!(initial, expected_initial); + + let after_optimize = + ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + + let expected = + ["CsvExec: file_groups={1 group: [[x]]}, projection=[b, d], has_header=false"]; + assert_eq!(get_plan_string(&after_optimize), expected); + + Ok(()) +} + +#[test] +fn test_memory_after_projection() -> Result<()> { + let memory = create_projecting_memory_exec(); + let projection: Arc = Arc::new(ProjectionExec::try_new( + vec![ + (Arc::new(Column::new("d", 2)), "d".to_string()), + (Arc::new(Column::new("e", 3)), "e".to_string()), + (Arc::new(Column::new("a", 1)), "a".to_string()), + ], + memory.clone(), + )?); + let initial = get_plan_string(&projection); + let expected_initial = [ + "ProjectionExec: expr=[d@2 as d, e@3 as e, a@1 as a]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + assert_eq!(initial, expected_initial); + + let after_optimize = + ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + + let expected = ["MemoryExec: partitions=0, partition_sizes=[]"]; + assert_eq!(get_plan_string(&after_optimize), expected); + assert_eq!( + after_optimize + .clone() + .as_any() + .downcast_ref::() + .unwrap() + .projection() + .clone() + .unwrap(), + vec![3, 4, 0] + ); + + Ok(()) +} + +#[test] +fn test_streaming_table_after_projection() -> Result<()> { + #[derive(Debug)] + struct DummyStreamPartition { + schema: SchemaRef, + } + impl PartitionStream for DummyStreamPartition { + fn schema(&self) -> &SchemaRef { + &self.schema + } + fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { + unreachable!() + } + } + + let streaming_table = StreamingTableExec::try_new( + Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + Field::new("d", DataType::Int32, true), + Field::new("e", DataType::Int32, true), + ])), + vec![Arc::new(DummyStreamPartition { + schema: Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + Field::new("d", DataType::Int32, true), + Field::new("e", DataType::Int32, true), + ])), + }) as _], + Some(&vec![0_usize, 2, 4, 3]), + vec![ + LexOrdering::new(vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("e", 2)), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options: SortOptions::default(), + }, + ]), + LexOrdering::new(vec![PhysicalSortExpr { + expr: Arc::new(Column::new("d", 3)), + options: SortOptions::default(), + }]), + ] + .into_iter(), + true, + None, + )?; + let projection = Arc::new(ProjectionExec::try_new( + vec![ + (Arc::new(Column::new("d", 3)), "d".to_string()), + (Arc::new(Column::new("e", 2)), "e".to_string()), + (Arc::new(Column::new("a", 0)), "a".to_string()), + ], + Arc::new(streaming_table) as _, + )?) as _; + + let after_optimize = + ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + + let result = after_optimize + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!( + result.partition_schema(), + &Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + Field::new("d", DataType::Int32, true), + Field::new("e", DataType::Int32, true), + ])) + ); + assert_eq!( + result.projection().clone().unwrap().to_vec(), + vec![3_usize, 4, 0] + ); + assert_eq!( + result.projected_schema(), + &Schema::new(vec![ + Field::new("d", DataType::Int32, true), + Field::new("e", DataType::Int32, true), + Field::new("a", DataType::Int32, true), + ]) + ); + assert_eq!( + result.projected_output_ordering().into_iter().collect_vec(), + vec![ + LexOrdering::new(vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("e", 1)), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("a", 2)), + options: SortOptions::default(), + }, + ]), + LexOrdering::new(vec![PhysicalSortExpr { + expr: Arc::new(Column::new("d", 0)), + options: SortOptions::default(), + }]), + ] + ); + assert!(result.is_infinite()); + + Ok(()) +} + +#[test] +fn test_projection_after_projection() -> Result<()> { + let csv = create_simple_csv_exec(); + let child_projection: Arc = Arc::new(ProjectionExec::try_new( + vec![ + (Arc::new(Column::new("c", 2)), "c".to_string()), + (Arc::new(Column::new("e", 4)), "new_e".to_string()), + (Arc::new(Column::new("a", 0)), "a".to_string()), + (Arc::new(Column::new("b", 1)), "new_b".to_string()), + ], + csv.clone(), + )?); + let top_projection: Arc = Arc::new(ProjectionExec::try_new( + vec![ + (Arc::new(Column::new("new_b", 3)), "new_b".to_string()), + ( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("c", 0)), + Operator::Plus, + Arc::new(Column::new("new_e", 1)), + )), + "binary".to_string(), + ), + (Arc::new(Column::new("new_b", 3)), "newest_b".to_string()), + ], + child_projection.clone(), + )?); + + let initial = get_plan_string(&top_projection); + let expected_initial = [ + "ProjectionExec: expr=[new_b@3 as new_b, c@0 + new_e@1 as binary, new_b@3 as newest_b]", + " ProjectionExec: expr=[c@2 as c, e@4 as new_e, a@0 as a, b@1 as new_b]", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" + ]; + assert_eq!(initial, expected_initial); + + let after_optimize = + ProjectionPushdown::new().optimize(top_projection, &ConfigOptions::new())?; + + let expected = [ + "ProjectionExec: expr=[b@1 as new_b, c@2 + e@4 as binary, b@1 as newest_b]", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" + ]; + assert_eq!(get_plan_string(&after_optimize), expected); + + Ok(()) +} + +#[test] +fn test_output_req_after_projection() -> Result<()> { + let csv = create_simple_csv_exec(); + let sort_req: Arc = Arc::new(OutputRequirementExec::new( + csv.clone(), + Some(LexRequirement::new(vec![ + PhysicalSortRequirement { + expr: Arc::new(Column::new("b", 1)), + options: Some(SortOptions::default()), + }, + PhysicalSortRequirement { + expr: Arc::new(BinaryExpr::new( + Arc::new(Column::new("c", 2)), + Operator::Plus, + Arc::new(Column::new("a", 0)), + )), + options: Some(SortOptions::default()), + }, + ])), + Distribution::HashPartitioned(vec![ + Arc::new(Column::new("a", 0)), + Arc::new(Column::new("b", 1)), + ]), + )); + let projection: Arc = Arc::new(ProjectionExec::try_new( + vec![ + (Arc::new(Column::new("c", 2)), "c".to_string()), + (Arc::new(Column::new("a", 0)), "new_a".to_string()), + (Arc::new(Column::new("b", 1)), "b".to_string()), + ], + sort_req.clone(), + )?); + + let initial = get_plan_string(&projection); + let expected_initial = [ + "ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", + " OutputRequirementExec", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" + ]; + assert_eq!(initial, expected_initial); + + let after_optimize = + ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + + let expected: [&str; 3] = [ + "OutputRequirementExec", + " ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" + ]; + + assert_eq!(get_plan_string(&after_optimize), expected); + let expected_reqs = LexRequirement::new(vec![ + PhysicalSortRequirement { + expr: Arc::new(Column::new("b", 2)), + options: Some(SortOptions::default()), + }, + PhysicalSortRequirement { + expr: Arc::new(BinaryExpr::new( + Arc::new(Column::new("c", 0)), + Operator::Plus, + Arc::new(Column::new("new_a", 1)), + )), + options: Some(SortOptions::default()), + }, + ]); + assert_eq!( + after_optimize + .as_any() + .downcast_ref::() + .unwrap() + .required_input_ordering()[0] + .clone() + .unwrap(), + expected_reqs + ); + let expected_distribution: Vec> = vec![ + Arc::new(Column::new("new_a", 1)), + Arc::new(Column::new("b", 2)), + ]; + if let Distribution::HashPartitioned(vec) = after_optimize + .as_any() + .downcast_ref::() + .unwrap() + .required_input_distribution()[0] + .clone() + { + assert!(vec + .iter() + .zip(expected_distribution) + .all(|(actual, expected)| actual.eq(&expected))); + } else { + panic!("Expected HashPartitioned distribution!"); + }; + + Ok(()) +} + +#[test] +fn test_coalesce_partitions_after_projection() -> Result<()> { + let csv = create_simple_csv_exec(); + let coalesce_partitions: Arc = + Arc::new(CoalescePartitionsExec::new(csv)); + let projection: Arc = Arc::new(ProjectionExec::try_new( + vec![ + (Arc::new(Column::new("b", 1)), "b".to_string()), + (Arc::new(Column::new("a", 0)), "a_new".to_string()), + (Arc::new(Column::new("d", 3)), "d".to_string()), + ], + coalesce_partitions, + )?); + let initial = get_plan_string(&projection); + let expected_initial = [ + "ProjectionExec: expr=[b@1 as b, a@0 as a_new, d@3 as d]", + " CoalescePartitionsExec", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", + ]; + assert_eq!(initial, expected_initial); + + let after_optimize = + ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + + let expected = [ + "CoalescePartitionsExec", + " ProjectionExec: expr=[b@1 as b, a@0 as a_new, d@3 as d]", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", + ]; + assert_eq!(get_plan_string(&after_optimize), expected); + + Ok(()) +} + +#[test] +fn test_filter_after_projection() -> Result<()> { + let csv = create_simple_csv_exec(); + let predicate = Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("b", 1)), + Operator::Minus, + Arc::new(Column::new("a", 0)), + )), + Operator::Gt, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("d", 3)), + Operator::Minus, + Arc::new(Column::new("a", 0)), + )), + )); + let filter: Arc = Arc::new(FilterExec::try_new(predicate, csv)?); + let projection: Arc = Arc::new(ProjectionExec::try_new( + vec![ + (Arc::new(Column::new("a", 0)), "a_new".to_string()), + (Arc::new(Column::new("b", 1)), "b".to_string()), + (Arc::new(Column::new("d", 3)), "d".to_string()), + ], + filter.clone(), + )?); + + let initial = get_plan_string(&projection); + let expected_initial = [ + "ProjectionExec: expr=[a@0 as a_new, b@1 as b, d@3 as d]", + " FilterExec: b@1 - a@0 > d@3 - a@0", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", + ]; + assert_eq!(initial, expected_initial); + + let after_optimize = + ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + + let expected = [ + "FilterExec: b@1 - a_new@0 > d@2 - a_new@0", + " ProjectionExec: expr=[a@0 as a_new, b@1 as b, d@3 as d]", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", + ]; + assert_eq!(get_plan_string(&after_optimize), expected); + + Ok(()) +} + +#[test] +fn test_join_after_projection() -> Result<()> { + let left_csv = create_simple_csv_exec(); + let right_csv = create_simple_csv_exec(); + + let join: Arc = Arc::new(SymmetricHashJoinExec::try_new( + left_csv, + right_csv, + vec![(Arc::new(Column::new("b", 1)), Arc::new(Column::new("c", 2)))], + // b_left-(1+a_right)<=a_right+c_left + Some(JoinFilter::new( + Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("b_left_inter", 0)), + Operator::Minus, + Arc::new(BinaryExpr::new( + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + Operator::Plus, + Arc::new(Column::new("a_right_inter", 1)), + )), + )), + Operator::LtEq, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a_right_inter", 1)), + Operator::Plus, + Arc::new(Column::new("c_left_inter", 2)), + )), + )), + vec![ + ColumnIndex { + index: 1, + side: JoinSide::Left, + }, + ColumnIndex { + index: 0, + side: JoinSide::Right, + }, + ColumnIndex { + index: 2, + side: JoinSide::Left, + }, + ], + Arc::new(Schema::new(vec![ + Field::new("b_left_inter", DataType::Int32, true), + Field::new("a_right_inter", DataType::Int32, true), + Field::new("c_left_inter", DataType::Int32, true), + ])), + )), + &JoinType::Inner, + true, + None, + None, + StreamJoinPartitionMode::SinglePartition, + )?); + let projection: Arc = Arc::new(ProjectionExec::try_new( + vec![ + (Arc::new(Column::new("c", 2)), "c_from_left".to_string()), + (Arc::new(Column::new("b", 1)), "b_from_left".to_string()), + (Arc::new(Column::new("a", 0)), "a_from_left".to_string()), + (Arc::new(Column::new("a", 5)), "a_from_right".to_string()), + (Arc::new(Column::new("c", 7)), "c_from_right".to_string()), + ], + join, + )?); + let initial = get_plan_string(&projection); + let expected_initial = [ + "ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left, a@5 as a_from_right, c@7 as c_from_right]", + " SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" + ]; + assert_eq!(initial, expected_initial); + + let after_optimize = + ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + + let expected = [ + "SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b_from_left@1, c_from_right@1)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2", + " ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left]", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", + " ProjectionExec: expr=[a@0 as a_from_right, c@2 as c_from_right]", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" + ]; + assert_eq!(get_plan_string(&after_optimize), expected); + + let expected_filter_col_ind = vec![ + ColumnIndex { + index: 1, + side: JoinSide::Left, + }, + ColumnIndex { + index: 0, + side: JoinSide::Right, + }, + ColumnIndex { + index: 0, + side: JoinSide::Left, + }, + ]; + + assert_eq!( + expected_filter_col_ind, + after_optimize + .as_any() + .downcast_ref::() + .unwrap() + .filter() + .unwrap() + .column_indices() + ); + + Ok(()) +} + +#[test] +fn test_join_after_required_projection() -> Result<()> { + let left_csv = create_simple_csv_exec(); + let right_csv = create_simple_csv_exec(); + + let join: Arc = Arc::new(SymmetricHashJoinExec::try_new( + left_csv, + right_csv, + vec![(Arc::new(Column::new("b", 1)), Arc::new(Column::new("c", 2)))], + // b_left-(1+a_right)<=a_right+c_left + Some(JoinFilter::new( + Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("b_left_inter", 0)), + Operator::Minus, + Arc::new(BinaryExpr::new( + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + Operator::Plus, + Arc::new(Column::new("a_right_inter", 1)), + )), + )), + Operator::LtEq, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a_right_inter", 1)), + Operator::Plus, + Arc::new(Column::new("c_left_inter", 2)), + )), + )), + vec![ + ColumnIndex { + index: 1, + side: JoinSide::Left, + }, + ColumnIndex { + index: 0, + side: JoinSide::Right, + }, + ColumnIndex { + index: 2, + side: JoinSide::Left, + }, + ], + Arc::new(Schema::new(vec![ + Field::new("b_left_inter", DataType::Int32, true), + Field::new("a_right_inter", DataType::Int32, true), + Field::new("c_left_inter", DataType::Int32, true), + ])), + )), + &JoinType::Inner, + true, + None, + None, + StreamJoinPartitionMode::SinglePartition, + )?); + let projection: Arc = Arc::new(ProjectionExec::try_new( + vec![ + (Arc::new(Column::new("a", 5)), "a".to_string()), + (Arc::new(Column::new("b", 6)), "b".to_string()), + (Arc::new(Column::new("c", 7)), "c".to_string()), + (Arc::new(Column::new("d", 8)), "d".to_string()), + (Arc::new(Column::new("e", 9)), "e".to_string()), + (Arc::new(Column::new("a", 0)), "a".to_string()), + (Arc::new(Column::new("b", 1)), "b".to_string()), + (Arc::new(Column::new("c", 2)), "c".to_string()), + (Arc::new(Column::new("d", 3)), "d".to_string()), + (Arc::new(Column::new("e", 4)), "e".to_string()), + ], + join, + )?); + let initial = get_plan_string(&projection); + let expected_initial = [ + "ProjectionExec: expr=[a@5 as a, b@6 as b, c@7 as c, d@8 as d, e@9 as e, a@0 as a, b@1 as b, c@2 as c, d@3 as d, e@4 as e]", + " SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" + ]; + assert_eq!(initial, expected_initial); + + let after_optimize = + ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + + let expected = [ + "ProjectionExec: expr=[a@5 as a, b@6 as b, c@7 as c, d@8 as d, e@9 as e, a@0 as a, b@1 as b, c@2 as c, d@3 as d, e@4 as e]", + " SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" + ]; + assert_eq!(get_plan_string(&after_optimize), expected); + Ok(()) +} + +#[test] +fn test_nested_loop_join_after_projection() -> Result<()> { + let left_csv = create_simple_csv_exec(); + let right_csv = create_simple_csv_exec(); + + let col_left_a = col("a", &left_csv.schema())?; + let col_right_b = col("b", &right_csv.schema())?; + let col_left_c = col("c", &left_csv.schema())?; + // left_a < right_b + let filter_expr = binary(col_left_a, Operator::Lt, col_right_b, &Schema::empty())?; + let filter_column_indices = vec![ + ColumnIndex { + index: 0, + side: JoinSide::Left, + }, + ColumnIndex { + index: 1, + side: JoinSide::Right, + }, + ColumnIndex { + index: 2, + side: JoinSide::Right, + }, + ]; + let filter_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + ]); + + let join: Arc = Arc::new(NestedLoopJoinExec::try_new( + left_csv, + right_csv, + Some(JoinFilter::new( + filter_expr, + filter_column_indices, + Arc::new(filter_schema), + )), + &JoinType::Inner, + None, + )?); + + let projection: Arc = Arc::new(ProjectionExec::try_new( + vec![(col_left_c, "c".to_string())], + Arc::clone(&join), + )?); + let initial = get_plan_string(&projection); + let expected_initial = [ + "ProjectionExec: expr=[c@2 as c]", + " NestedLoopJoinExec: join_type=Inner, filter=a@0 < b@1", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", + ]; + assert_eq!(initial, expected_initial); + + let after_optimize = + ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + let expected = [ + "NestedLoopJoinExec: join_type=Inner, filter=a@0 < b@1, projection=[c@2]", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", + ]; + assert_eq!(get_plan_string(&after_optimize), expected); + Ok(()) +} + +#[test] +fn test_hash_join_after_projection() -> Result<()> { + // sql like + // SELECT t1.c as c_from_left, t1.b as b_from_left, t1.a as a_from_left, t2.c as c_from_right FROM t1 JOIN t2 ON t1.b = t2.c WHERE t1.b - (1 + t2.a) <= t2.a + t1.c + let left_csv = create_simple_csv_exec(); + let right_csv = create_simple_csv_exec(); + + let join: Arc = Arc::new(HashJoinExec::try_new( + left_csv, + right_csv, + vec![(Arc::new(Column::new("b", 1)), Arc::new(Column::new("c", 2)))], + // b_left-(1+a_right)<=a_right+c_left + Some(JoinFilter::new( + Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("b_left_inter", 0)), + Operator::Minus, + Arc::new(BinaryExpr::new( + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + Operator::Plus, + Arc::new(Column::new("a_right_inter", 1)), + )), + )), + Operator::LtEq, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a_right_inter", 1)), + Operator::Plus, + Arc::new(Column::new("c_left_inter", 2)), + )), + )), + vec![ + ColumnIndex { + index: 1, + side: JoinSide::Left, + }, + ColumnIndex { + index: 0, + side: JoinSide::Right, + }, + ColumnIndex { + index: 2, + side: JoinSide::Left, + }, + ], + Arc::new(Schema::new(vec![ + Field::new("b_left_inter", DataType::Int32, true), + Field::new("a_right_inter", DataType::Int32, true), + Field::new("c_left_inter", DataType::Int32, true), + ])), + )), + &JoinType::Inner, + None, + PartitionMode::Auto, + true, + )?); + let projection: Arc = Arc::new(ProjectionExec::try_new( + vec![ + (Arc::new(Column::new("c", 2)), "c_from_left".to_string()), + (Arc::new(Column::new("b", 1)), "b_from_left".to_string()), + (Arc::new(Column::new("a", 0)), "a_from_left".to_string()), + (Arc::new(Column::new("c", 7)), "c_from_right".to_string()), + ], + join.clone(), + )?); + let initial = get_plan_string(&projection); + let expected_initial = [ + "ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left, c@7 as c_from_right]", " HashJoinExec: mode=Auto, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2", " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" + ]; + assert_eq!(initial, expected_initial); + + let after_optimize = + ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + + // HashJoinExec only returns result after projection. Because there are some alias columns in the projection, the ProjectionExec is not removed. + let expected = ["ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left, c@3 as c_from_right]", " HashJoinExec: mode=Auto, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2, projection=[a@0, b@1, c@2, c@7]", " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"]; + assert_eq!(get_plan_string(&after_optimize), expected); + + let projection: Arc = Arc::new(ProjectionExec::try_new( + vec![ + (Arc::new(Column::new("a", 0)), "a".to_string()), + (Arc::new(Column::new("b", 1)), "b".to_string()), + (Arc::new(Column::new("c", 2)), "c".to_string()), + (Arc::new(Column::new("c", 7)), "c".to_string()), + ], + join.clone(), + )?); + + let after_optimize = + ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + + // Comparing to the previous result, this projection don't have alias columns either change the order of output fields. So the ProjectionExec is removed. + let expected = ["HashJoinExec: mode=Auto, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2, projection=[a@0, b@1, c@2, c@7]", " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"]; + assert_eq!(get_plan_string(&after_optimize), expected); + + Ok(()) +} + +#[test] +fn test_repartition_after_projection() -> Result<()> { + let csv = create_simple_csv_exec(); + let repartition: Arc = Arc::new(RepartitionExec::try_new( + csv, + Partitioning::Hash( + vec![ + Arc::new(Column::new("a", 0)), + Arc::new(Column::new("b", 1)), + Arc::new(Column::new("d", 3)), + ], + 6, + ), + )?); + let projection: Arc = Arc::new(ProjectionExec::try_new( + vec![ + (Arc::new(Column::new("b", 1)), "b_new".to_string()), + (Arc::new(Column::new("a", 0)), "a".to_string()), + (Arc::new(Column::new("d", 3)), "d_new".to_string()), + ], + repartition, + )?); + let initial = get_plan_string(&projection); + let expected_initial = [ + "ProjectionExec: expr=[b@1 as b_new, a@0 as a, d@3 as d_new]", + " RepartitionExec: partitioning=Hash([a@0, b@1, d@3], 6), input_partitions=1", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", + ]; + assert_eq!(initial, expected_initial); + + let after_optimize = + ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + + let expected = [ + "RepartitionExec: partitioning=Hash([a@1, b_new@0, d_new@2], 6), input_partitions=1", + " ProjectionExec: expr=[b@1 as b_new, a@0 as a, d@3 as d_new]", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", + ]; + assert_eq!(get_plan_string(&after_optimize), expected); + + assert_eq!( + after_optimize + .as_any() + .downcast_ref::() + .unwrap() + .partitioning() + .clone(), + Partitioning::Hash( + vec![ + Arc::new(Column::new("a", 1)), + Arc::new(Column::new("b_new", 0)), + Arc::new(Column::new("d_new", 2)), + ], + 6, + ), + ); + + Ok(()) +} + +#[test] +fn test_sort_after_projection() -> Result<()> { + let csv = create_simple_csv_exec(); + let sort_req: Arc = Arc::new(SortExec::new( + LexOrdering::new(vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("b", 1)), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: Arc::new(BinaryExpr::new( + Arc::new(Column::new("c", 2)), + Operator::Plus, + Arc::new(Column::new("a", 0)), + )), + options: SortOptions::default(), + }, + ]), + csv.clone(), + )); + let projection: Arc = Arc::new(ProjectionExec::try_new( + vec![ + (Arc::new(Column::new("c", 2)), "c".to_string()), + (Arc::new(Column::new("a", 0)), "new_a".to_string()), + (Arc::new(Column::new("b", 1)), "b".to_string()), + ], + sort_req.clone(), + )?); + + let initial = get_plan_string(&projection); + let expected_initial = [ + "ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", + " SortExec: expr=[b@1 ASC, c@2 + a@0 ASC], preserve_partitioning=[false]", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" + ]; + assert_eq!(initial, expected_initial); + + let after_optimize = + ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + + let expected = [ + "SortExec: expr=[b@2 ASC, c@0 + new_a@1 ASC], preserve_partitioning=[false]", + " ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" + ]; + assert_eq!(get_plan_string(&after_optimize), expected); + + Ok(()) +} + +#[test] +fn test_sort_preserving_after_projection() -> Result<()> { + let csv = create_simple_csv_exec(); + let sort_req: Arc = Arc::new(SortPreservingMergeExec::new( + LexOrdering::new(vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("b", 1)), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: Arc::new(BinaryExpr::new( + Arc::new(Column::new("c", 2)), + Operator::Plus, + Arc::new(Column::new("a", 0)), + )), + options: SortOptions::default(), + }, + ]), + csv.clone(), + )); + let projection: Arc = Arc::new(ProjectionExec::try_new( + vec![ + (Arc::new(Column::new("c", 2)), "c".to_string()), + (Arc::new(Column::new("a", 0)), "new_a".to_string()), + (Arc::new(Column::new("b", 1)), "b".to_string()), + ], + sort_req.clone(), + )?); + + let initial = get_plan_string(&projection); + let expected_initial = [ + "ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", + " SortPreservingMergeExec: [b@1 ASC, c@2 + a@0 ASC]", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" + ]; + assert_eq!(initial, expected_initial); + + let after_optimize = + ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + + let expected = [ + "SortPreservingMergeExec: [b@2 ASC, c@0 + new_a@1 ASC]", + " ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" + ]; + assert_eq!(get_plan_string(&after_optimize), expected); + + Ok(()) +} + +#[test] +fn test_union_after_projection() -> Result<()> { + let csv = create_simple_csv_exec(); + let union: Arc = + Arc::new(UnionExec::new(vec![csv.clone(), csv.clone(), csv])); + let projection: Arc = Arc::new(ProjectionExec::try_new( + vec![ + (Arc::new(Column::new("c", 2)), "c".to_string()), + (Arc::new(Column::new("a", 0)), "new_a".to_string()), + (Arc::new(Column::new("b", 1)), "b".to_string()), + ], + union.clone(), + )?); + + let initial = get_plan_string(&projection); + let expected_initial = [ + "ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", + " UnionExec", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" + ]; + assert_eq!(initial, expected_initial); + + let after_optimize = + ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + + let expected = [ + "UnionExec", + " ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", + " ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", + " ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false" + ]; + assert_eq!(get_plan_string(&after_optimize), expected); + + Ok(()) +} diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index e52e46a16375..c2beab032049 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -26,8 +26,9 @@ pub mod enforce_sorting; pub mod join_selection; pub mod limit_pushdown; pub mod limited_distinct_aggregation; -mod optimizer; +pub mod optimizer; pub mod output_requirements; +pub mod projection_pushdown; pub mod pruning; pub mod sanity_checker; pub mod topk_aggregation; diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index 609890e2d43f..88f11f53491e 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -17,11 +17,26 @@ //! Physical optimizer traits +use std::fmt::Debug; +use std::sync::Arc; + +use crate::aggregate_statistics::AggregateStatistics; +use crate::coalesce_batches::CoalesceBatches; +use crate::combine_partial_final_agg::CombinePartialFinalAggregate; +use crate::enforce_distribution::EnforceDistribution; +use crate::enforce_sorting::EnforceSorting; +use crate::join_selection::JoinSelection; +use crate::limit_pushdown::LimitPushdown; +use crate::limited_distinct_aggregation::LimitedDistinctAggregation; +use crate::output_requirements::OutputRequirements; +use crate::projection_pushdown::ProjectionPushdown; +use crate::sanity_checker::SanityCheckPlan; +use crate::topk_aggregation::TopKAggregation; +use crate::update_aggr_exprs::OptimizeAggregateOrder; + use datafusion_common::config::ConfigOptions; use datafusion_common::Result; use datafusion_physical_plan::ExecutionPlan; -use std::fmt::Debug; -use std::sync::Arc; /// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which /// computes the same results, but in a potentially more efficient way. @@ -47,3 +62,92 @@ pub trait PhysicalOptimizerRule: Debug { /// and should disable the schema check. fn schema_check(&self) -> bool; } + +/// A rule-based physical optimizer. +#[derive(Clone, Debug)] +pub struct PhysicalOptimizer { + /// All rules to apply + pub rules: Vec>, +} + +impl Default for PhysicalOptimizer { + fn default() -> Self { + Self::new() + } +} + +impl PhysicalOptimizer { + /// Create a new optimizer using the recommended list of rules + pub fn new() -> Self { + let rules: Vec> = vec![ + // If there is a output requirement of the query, make sure that + // this information is not lost across different rules during optimization. + Arc::new(OutputRequirements::new_add_mode()), + Arc::new(AggregateStatistics::new()), + // Statistics-based join selection will change the Auto mode to a real join implementation, + // like collect left, or hash join, or future sort merge join, which will influence the + // EnforceDistribution and EnforceSorting rules as they decide whether to add additional + // repartitioning and local sorting steps to meet distribution and ordering requirements. + // Therefore, it should run before EnforceDistribution and EnforceSorting. + Arc::new(JoinSelection::new()), + // The LimitedDistinctAggregation rule should be applied before the EnforceDistribution rule, + // as that rule may inject other operations in between the different AggregateExecs. + // Applying the rule early means only directly-connected AggregateExecs must be examined. + Arc::new(LimitedDistinctAggregation::new()), + // The EnforceDistribution rule is for adding essential repartitioning to satisfy distribution + // requirements. Please make sure that the whole plan tree is determined before this rule. + // This rule increases parallelism if doing so is beneficial to the physical plan; i.e. at + // least one of the operators in the plan benefits from increased parallelism. + Arc::new(EnforceDistribution::new()), + // The CombinePartialFinalAggregate rule should be applied after the EnforceDistribution rule + Arc::new(CombinePartialFinalAggregate::new()), + // The EnforceSorting rule is for adding essential local sorting to satisfy the required + // ordering. Please make sure that the whole plan tree is determined before this rule. + // Note that one should always run this rule after running the EnforceDistribution rule + // as the latter may break local sorting requirements. + Arc::new(EnforceSorting::new()), + // Run once after the local sorting requirement is changed + Arc::new(OptimizeAggregateOrder::new()), + // TODO: `try_embed_to_hash_join` in the ProjectionPushdown rule would be block by the CoalesceBatches, so add it before CoalesceBatches. Maybe optimize it in the future. + Arc::new(ProjectionPushdown::new()), + // The CoalesceBatches rule will not influence the distribution and ordering of the + // whole plan tree. Therefore, to avoid influencing other rules, it should run last. + Arc::new(CoalesceBatches::new()), + // Remove the ancillary output requirement operator since we are done with the planning + // phase. + Arc::new(OutputRequirements::new_remove_mode()), + // The aggregation limiter will try to find situations where the accumulator count + // is not tied to the cardinality, i.e. when the output of the aggregation is passed + // into an `order by max(x) limit y`. In this case it will copy the limit value down + // to the aggregation, allowing it to use only y number of accumulators. + Arc::new(TopKAggregation::new()), + // The ProjectionPushdown rule tries to push projections towards + // the sources in the execution plan. As a result of this process, + // a projection can disappear if it reaches the source providers, and + // sequential projections can merge into one. Even if these two cases + // are not present, the load of executors such as join or union will be + // reduced by narrowing their input tables. + Arc::new(ProjectionPushdown::new()), + // The LimitPushdown rule tries to push limits down as far as possible, + // replacing operators with fetching variants, or adding limits + // past operators that support limit pushdown. + Arc::new(LimitPushdown::new()), + // The SanityCheckPlan rule checks whether the order and + // distribution requirements of each node in the plan + // is satisfied. It will also reject non-runnable query + // plans that use pipeline-breaking operators on infinite + // input(s). The rule generates a diagnostic error + // message for invalid plans. It makes no changes to the + // given query plan; i.e. it only acts as a final + // gatekeeping rule. + Arc::new(SanityCheckPlan::new()), + ]; + + Self::with_rules(rules) + } + + /// Create a new optimizer with the given rules + pub fn with_rules(rules: Vec>) -> Self { + Self { rules } + } +} diff --git a/datafusion/physical-optimizer/src/projection_pushdown.rs b/datafusion/physical-optimizer/src/projection_pushdown.rs new file mode 100644 index 000000000000..34affcbd4a19 --- /dev/null +++ b/datafusion/physical-optimizer/src/projection_pushdown.rs @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This file implements the `ProjectionPushdown` physical optimization rule. +//! The function [`remove_unnecessary_projections`] tries to push down all +//! projections one by one if the operator below is amenable to this. If a +//! projection reaches a source, it can even disappear from the plan entirely. + +use std::sync::Arc; + +use crate::PhysicalOptimizerRule; + +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{TransformedResult, TreeNode}; +use datafusion_common::Result; +use datafusion_physical_plan::projection::remove_unnecessary_projections; +use datafusion_physical_plan::ExecutionPlan; + +/// This rule inspects `ProjectionExec`'s in the given physical plan and tries to +/// remove or swap with its child. +#[derive(Default, Debug)] +pub struct ProjectionPushdown {} + +impl ProjectionPushdown { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for ProjectionPushdown { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> Result> { + plan.transform_down(remove_unnecessary_projections).data() + } + + fn name(&self) -> &str { + "ProjectionPushdown" + } + + fn schema_check(&self) -> bool { + true + } +}