Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complete moving PhysicalOptimizer into datafusion-physical-optimizer #14300

Merged
merged 7 commits into from
Jan 26, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ datafusion-expr = { workspace = true }
datafusion-functions-window-common = { workspace = true }
datafusion-optimizer = { workspace = true, default-features = true }
datafusion-physical-expr = { workspace = true, default-features = true }
datafusion-physical-optimizer = { workspace = true, default-features = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just follows the existing patterns so I think it is ok, but I think in general the examples should only be using the DataFusion crate (to ensure all relevant structures are exposed)

I will make a follow on PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datafusion-proto = { workspace = true }
datafusion-sql = { workspace = true }
env_logger = { workspace = true }
Expand Down
22 changes: 12 additions & 10 deletions datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::catalog::Session;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::{
Expand All @@ -38,7 +44,6 @@ use datafusion::parquet::file::metadata::ParquetMetaData;
use datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties};
use datafusion::parquet::schema::types::ColumnPath;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
Expand All @@ -48,16 +53,13 @@ use datafusion_common::{
use datafusion_expr::utils::conjunction;
use datafusion_expr::{TableProviderFilterPushDown, TableType};
use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee};
use datafusion_physical_optimizer::pruning::PruningPredicate;

use async_trait::async_trait;
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::FutureExt;
use object_store::ObjectStore;
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tempfile::TempDir;
use url::Url;

Expand Down
26 changes: 14 additions & 12 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,24 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::collections::HashSet;
use std::fmt::Display;
use std::fs::{self, DirEntry, File};
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};

use arrow::array::{
Array, ArrayRef, AsArray, BooleanArray, Int32Array, RecordBatch, StringArray,
UInt64Array,
};
use arrow::datatypes::Int32Type;
use arrow::util::pretty::pretty_format_batches;
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion::catalog::Session;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
Expand All @@ -32,24 +42,16 @@ use datafusion::parquet::arrow::arrow_reader::statistics::StatisticsConverter;
use datafusion::parquet::arrow::{
arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter,
};
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use datafusion_common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::{utils::conjunction, TableProviderFilterPushDown, TableType};
use datafusion_physical_expr::PhysicalExpr;
use std::any::Any;
use std::collections::HashSet;
use std::fmt::Display;
use std::fs::{self, DirEntry, File};
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use datafusion_physical_optimizer::pruning::{PruningPredicate, PruningStatistics};

use async_trait::async_trait;
use tempfile::TempDir;
use url::Url;

Expand Down
7 changes: 4 additions & 3 deletions datafusion-examples/examples/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashSet;
use std::sync::Arc;

use arrow::array::{ArrayRef, BooleanArray, Int32Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::common::{DFSchema, ScalarValue};
use datafusion::execution::context::ExecutionProps;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::prelude::*;
use std::collections::HashSet;
use std::sync::Arc;
use datafusion_physical_optimizer::pruning::{PruningPredicate, PruningStatistics};

/// This example shows how to use DataFusion's `PruningPredicate` to prove
/// filter expressions can never be true based on statistics such as min/max
Expand Down
35 changes: 17 additions & 18 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@

//! [`ParquetExec`] Execution plan for reading Parquet files

mod access_plan;
mod metrics;
mod opener;
mod page_filter;
mod reader;
mod row_filter;
mod row_group_filter;
mod writer;

use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;
Expand All @@ -27,45 +36,35 @@ use crate::datasource::physical_plan::{
parquet::page_filter::PagePruningAccessPlanFilter, DisplayAs, FileGroupPartitioner,
FileScanConfig,
};
use crate::datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
};
use crate::{
config::{ConfigOptions, TableParquetOptions},
error::Result,
execution::context::TaskContext,
physical_optimizer::pruning::PruningPredicate,
physical_plan::{
metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
SendableRecordBatchStream, Statistics,
},
};

pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
use arrow::datatypes::SchemaRef;
use datafusion_common::Constraints;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};
use datafusion_physical_optimizer::pruning::PruningPredicate;
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};

use itertools::Itertools;
use log::debug;

mod access_plan;
mod metrics;
mod opener;
mod page_filter;
mod reader;
mod row_filter;
mod row_group_filter;
mod writer;

use crate::datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
};
pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
pub use metrics::ParquetFileMetrics;
use opener::ParquetOpener;
pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
pub use row_filter::can_expr_be_pushed_down_with_schemas;
pub use writer::plan_to_parquet;

use itertools::Itertools;
use log::debug;

/// Execution plan for reading one or more Parquet files.
///
/// ```text
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

//! [`ParquetOpener`] for opening Parquet files

use std::sync::Arc;

use crate::datasource::file_format::{
coerce_file_schema_to_string_type, coerce_file_schema_to_view_type,
};
Expand All @@ -29,17 +31,18 @@ use crate::datasource::physical_plan::{
FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics, ParquetFileReaderFactory,
};
use crate::datasource::schema_adapter::SchemaAdapterFactory;
use crate::physical_optimizer::pruning::PruningPredicate;

use arrow_schema::{ArrowError, SchemaRef};
use datafusion_common::{exec_err, Result};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_optimizer::pruning::PruningPredicate;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;

use futures::{StreamExt, TryStreamExt};
use log::debug;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use std::sync::Arc;

/// Implements [`FileOpener`] for a parquet file
pub(super) struct ParquetOpener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@

//! Contains code to filter entire pages

use std::collections::HashSet;
use std::sync::Arc;

use super::metrics::ParquetFileMetrics;
use crate::datasource::physical_plan::parquet::ParquetAccessPlan;
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};

use arrow::array::BooleanArray;
use arrow::{array::ArrayRef, datatypes::SchemaRef};
use arrow_schema::Schema;
use datafusion_common::ScalarValue;
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
use datafusion_physical_optimizer::pruning::{PruningPredicate, PruningStatistics};

use log::{debug, trace};
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex};
Expand All @@ -34,8 +39,6 @@ use parquet::{
arrow::arrow_reader::{RowSelection, RowSelector},
file::metadata::{ParquetMetaData, RowGroupMetaData},
};
use std::collections::HashSet;
use std::sync::Arc;

/// Filters a [`ParquetAccessPlan`] based on the [Parquet PageIndex], if present
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use super::{ParquetAccessPlan, ParquetFileMetrics};
use crate::datasource::listing::FileRange;
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};

use arrow::{array::ArrayRef, datatypes::Schema};
use arrow_array::BooleanArray;
use datafusion_common::{Column, Result, ScalarValue};
use datafusion_physical_optimizer::pruning::{PruningPredicate, PruningStatistics};

use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
use parquet::arrow::parquet_column;
use parquet::basic::Type;
Expand All @@ -30,10 +36,6 @@ use parquet::{
bloom_filter::Sbbf,
file::metadata::RowGroupMetaData,
};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use super::{ParquetAccessPlan, ParquetFileMetrics};

/// Reduces the [`ParquetAccessPlan`] based on row group level metadata.
///
Expand Down
19 changes: 11 additions & 8 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

//! [`SessionState`]: information required to run queries in a session

use std::any::Any;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::sync::Arc;

use crate::catalog::{CatalogProviderList, SchemaProvider, TableProviderFactory};
use crate::catalog_common::information_schema::{
InformationSchemaProvider, INFORMATION_SCHEMA,
Expand All @@ -27,11 +33,9 @@ use crate::datasource::file_format::{format_as_file_type, FileFormatFactory};
use crate::datasource::provider_as_source;
use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner};
use crate::execution::SessionStateDefaults;
use crate::physical_optimizer::optimizer::PhysicalOptimizer;
use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};

use arrow_schema::{DataType, SchemaRef};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion_catalog::{Session, TableFunction, TableFunctionImpl};
use datafusion_common::alias::AliasGenerator;
use datafusion_common::config::{ConfigExtension, ConfigOptions, TableOptions};
Expand Down Expand Up @@ -61,20 +65,19 @@ use datafusion_optimizer::{
};
use datafusion_physical_expr::create_physical_expr;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_optimizer::optimizer::PhysicalOptimizer;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_sql::parser::{DFParser, Statement};
use datafusion_sql::planner::{ContextProvider, ParserOptions, PlannerContext, SqlToRel};

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use itertools::Itertools;
use log::{debug, info};
use object_store::ObjectStore;
use sqlparser::ast::{Expr as SQLExpr, ExprWithAlias as SQLExprWithAlias};
use sqlparser::dialect::dialect_from_str;
use std::any::Any;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::sync::Arc;
use url::Url;
use uuid::Uuid;

Expand Down
7 changes: 3 additions & 4 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@
//! [`QueryPlanner`]: execution::context::QueryPlanner
//! [`OptimizerRule`]: datafusion_optimizer::optimizer::OptimizerRule
//! [`AnalyzerRule`]: datafusion_optimizer::analyzer::AnalyzerRule
//! [`PhysicalOptimizerRule`]: crate::physical_optimizer::PhysicalOptimizerRule
//! [`PhysicalOptimizerRule`]: datafusion_physical_optimizer::PhysicalOptimizerRule
//!
//! ## Query Planning and Execution Overview
//!
Expand Down Expand Up @@ -349,7 +349,7 @@
//! filtering can never be `true` using additional statistical information.
//!
//! [cp_solver]: crate::physical_expr::intervals::cp_solver
//! [`PruningPredicate`]: crate::physical_optimizer::pruning::PruningPredicate
//! [`PruningPredicate`]: datafusion_physical_optimizer::pruning::PruningPredicate
//! [`PhysicalExpr`]: crate::physical_plan::PhysicalExpr
//!
//! ## Execution
Expand Down Expand Up @@ -659,7 +659,7 @@
//! [`OptimizerRule`]: optimizer::optimizer::OptimizerRule
//! [`ExecutionPlan`]: physical_plan::ExecutionPlan
//! [`PhysicalPlanner`]: physical_planner::PhysicalPlanner
//! [`PhysicalOptimizerRule`]: datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule
//! [`PhysicalOptimizerRule`]: datafusion_physical_optimizer::PhysicalOptimizerRule
//! [`Schema`]: arrow::datatypes::Schema
//! [`PhysicalExpr`]: physical_plan::PhysicalExpr
//! [`RecordBatch`]: arrow::record_batch::RecordBatch
Expand All @@ -677,7 +677,6 @@ pub mod dataframe;
pub mod datasource;
pub mod error;
pub mod execution;
pub mod physical_optimizer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀

pub mod physical_planner;
pub mod prelude;
pub mod scalar;
Expand Down
Loading
Loading