Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export datafusion_physical_optimizer, only use datafusion crate in the examples #14305

Merged
merged 4 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,15 @@ path = "examples/external_dependency/query-aws-s3.rs"

[dev-dependencies]
arrow = { workspace = true }
# arrow_schema is required for record_batch! macro :sad:
arrow-flight = { workspace = true }
arrow-schema = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
dashmap = { workspace = true }
# note only use main datafusion crate for examples
datafusion = { workspace = true, default-features = true, features = ["avro"] }
datafusion-catalog = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-expr = { workspace = true }
datafusion-functions-window-common = { workspace = true }
datafusion-optimizer = { workspace = true, default-features = true }
datafusion-physical-expr = { workspace = true, default-features = true }
datafusion-physical-optimizer = { workspace = true, default-features = true }
datafusion-proto = { workspace = true }
datafusion-sql = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
Expand Down
23 changes: 11 additions & 12 deletions datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow_schema::SchemaRef;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::catalog::Session;
use datafusion::common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::{
ParquetAccessPlan, ParquetExecBuilder,
Expand All @@ -35,6 +40,8 @@ use datafusion::datasource::physical_plan::{
};
use datafusion::datasource::TableProvider;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::logical_expr::utils::conjunction;
use datafusion::logical_expr::{TableProviderFilterPushDown, TableType};
use datafusion::parquet::arrow::arrow_reader::{
ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector,
};
Expand All @@ -43,20 +50,12 @@ use datafusion::parquet::arrow::ArrowWriter;
use datafusion::parquet::file::metadata::ParquetMetaData;
use datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties};
use datafusion::parquet::schema::types::ColumnPath;
use datafusion::physical_expr::utils::{Guarantee, LiteralGuarantee};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use datafusion_common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::utils::conjunction;
use datafusion_expr::{TableProviderFilterPushDown, TableType};
use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee};
use datafusion_physical_optimizer::pruning::PruningPredicate;

use async_trait::async_trait;
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::FutureExt;
use object_store::ObjectStore;
Expand Down Expand Up @@ -284,7 +283,7 @@ impl IndexTableProvider {
.transpose()?
// if there are no filters, use a literal true to have a predicate
// that always evaluates to true we can pass to the index
.unwrap_or_else(|| datafusion_physical_expr::expressions::lit(true));
.unwrap_or_else(|| datafusion::physical_expr::expressions::lit(true));

Ok(predicate)
}
Expand Down
26 changes: 12 additions & 14 deletions datafusion-examples/examples/advanced_udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,25 @@
// specific language governing permissions and limitations
// under the License.

use arrow_schema::{Field, Schema};
use arrow::datatypes::{Field, Schema};
use datafusion::physical_expr::NullState;
use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility};
use datafusion_physical_expr::NullState;
use std::{any::Any, sync::Arc};

use arrow::{
array::{
ArrayRef, AsArray, Float32Array, PrimitiveArray, PrimitiveBuilder, UInt32Array,
},
datatypes::{ArrowNativeTypeOp, ArrowPrimitiveType, Float64Type, UInt32Type},
record_batch::RecordBatch,
use arrow::array::{
ArrayRef, AsArray, Float32Array, PrimitiveArray, PrimitiveBuilder, UInt32Array,
};
use arrow::datatypes::{ArrowNativeTypeOp, ArrowPrimitiveType, Float64Type, UInt32Type};
use arrow::record_batch::RecordBatch;
use datafusion::common::{cast::as_float64_array, ScalarValue};
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::{cast::as_float64_array, ScalarValue};
use datafusion_expr::{
use datafusion::logical_expr::{
expr::AggregateFunction,
function::{AccumulatorArgs, AggregateFunctionSimplification, StateFieldsArgs},
simplify::SimplifyInfo,
Accumulator, AggregateUDF, AggregateUDFImpl, GroupsAccumulator, Signature,
Accumulator, AggregateUDF, AggregateUDFImpl, EmitTo, GroupsAccumulator, Signature,
};
use datafusion::prelude::*;

/// This example shows how to use the full AggregateUDFImpl API to implement a user
/// defined aggregate function. As in the `simple_udaf.rs` example, this struct implements
Expand Down Expand Up @@ -308,7 +306,7 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator {
}

/// Generate output, as specified by `emit_to` and update the intermediate state
fn evaluate(&mut self, emit_to: datafusion_expr::EmitTo) -> Result<ArrayRef> {
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
let counts = emit_to.take_needed(&mut self.counts);
let prods = emit_to.take_needed(&mut self.prods);
let nulls = self.null_state.build(emit_to);
Expand Down Expand Up @@ -344,7 +342,7 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator {
}

// return arrays for counts and prods
fn state(&mut self, emit_to: datafusion_expr::EmitTo) -> Result<Vec<ArrayRef>> {
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
let nulls = self.null_state.build(emit_to);
let nulls = Some(nulls);

Expand Down
8 changes: 4 additions & 4 deletions datafusion-examples/examples/advanced_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ use arrow::array::{
use arrow::compute;
use arrow::datatypes::{DataType, Float64Type};
use arrow::record_batch::RecordBatch;
use datafusion::common::{exec_err, internal_err, ScalarValue};
use datafusion::error::Result;
use datafusion::logical_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion::logical_expr::Volatility;
use datafusion::prelude::*;
use datafusion_common::{exec_err, internal_err, ScalarValue};
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{
use datafusion::logical_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
};
use datafusion::prelude::*;

/// This example shows how to use the full ScalarUDFImpl API to implement a user
/// defined function. As in the `simple_udf.rs` example, this struct implements
Expand Down
22 changes: 12 additions & 10 deletions datafusion-examples/examples/advanced_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,24 @@
use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility};
use std::any::Any;

use arrow::datatypes::Field;
use arrow::{
array::{ArrayRef, AsArray, Float64Array},
datatypes::Float64Type,
};
use arrow_schema::Field;
use datafusion::common::ScalarValue;
use datafusion::error::Result;
use datafusion::functions_aggregate::average::avg_udaf;
use datafusion::prelude::*;
use datafusion_common::ScalarValue;
use datafusion_expr::expr::WindowFunction;
use datafusion_expr::function::{WindowFunctionSimplification, WindowUDFFieldArgs};
use datafusion_expr::simplify::SimplifyInfo;
use datafusion_expr::{
Expr, PartitionEvaluator, Signature, WindowFrame, WindowUDF, WindowUDFImpl,
use datafusion::logical_expr::expr::WindowFunction;
use datafusion::logical_expr::function::{
PartitionEvaluatorArgs, WindowFunctionSimplification, WindowUDFFieldArgs,
};
use datafusion::logical_expr::simplify::SimplifyInfo;
use datafusion::logical_expr::{
Expr, PartitionEvaluator, Signature, WindowFrame, WindowFunctionDefinition,
WindowUDF, WindowUDFImpl,
};
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use datafusion::prelude::*;

/// This example shows how to use the full WindowUDFImpl API to implement a user
/// defined window function. As in the `simple_udwf.rs` example, this struct implements
Expand Down Expand Up @@ -189,7 +191,7 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
fn simplify(&self) -> Option<WindowFunctionSimplification> {
let simplify = |window_function: WindowFunction, _: &dyn SimplifyInfo| {
Ok(Expr::WindowFunction(WindowFunction {
fun: datafusion_expr::WindowFunctionDefinition::AggregateUDF(avg_udaf()),
fun: WindowFunctionDefinition::AggregateUDF(avg_udaf()),
args: window_function.args,
partition_by: window_function.partition_by,
order_by: window_function.order_by,
Expand Down
10 changes: 5 additions & 5 deletions datafusion-examples/examples/analyzer_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
// under the License.

use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use datafusion::common::config::ConfigOptions;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::common::Result;
use datafusion::logical_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder};
use datafusion::optimizer::analyzer::AnalyzerRule;
use datafusion::prelude::SessionContext;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::Result;
use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder};
use datafusion_optimizer::analyzer::AnalyzerRule;
use std::sync::{Arc, Mutex};

/// This example demonstrates how to add your own [`AnalyzerRule`] to
Expand Down
6 changes: 3 additions & 3 deletions datafusion-examples/examples/composed_extension_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ use std::ops::Deref;
use std::sync::Arc;

use datafusion::common::Result;
use datafusion::common::{internal_err, DataFusionError};
use datafusion::logical_expr::registry::FunctionRegistry;
use datafusion::logical_expr::{AggregateUDF, ScalarUDF};
use datafusion::physical_plan::{DisplayAs, ExecutionPlan};
use datafusion::prelude::SessionContext;
use datafusion_common::{internal_err, DataFusionError};
use datafusion_expr::registry::FunctionRegistry;
use datafusion_expr::{AggregateUDF, ScalarUDF};
use datafusion_proto::physical_plan::{AsExecutionPlan, PhysicalExtensionCodec};
use datafusion_proto::protobuf;

Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/csv_json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::sync::Arc;

use arrow_schema::{DataType, Field, Schema};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::{
assert_batches_eq,
datasource::{
Expand Down
7 changes: 3 additions & 4 deletions datafusion-examples/examples/custom_file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ use std::{any::Any, sync::Arc};

use arrow::{
array::{AsArray, RecordBatch, StringArray, UInt8Array},
datatypes::UInt64Type,
datatypes::{DataType, Field, Schema, SchemaRef, UInt64Type},
};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::common::{GetExt, Statistics};
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::physical_expr::LexRequirement;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::{
datasource::{
file_format::{
Expand All @@ -38,8 +39,6 @@ use datafusion::{
physical_plan::ExecutionPlan,
prelude::SessionContext,
};
use datafusion_common::{GetExt, Statistics};
use datafusion_physical_expr::PhysicalExpr;
use object_store::{ObjectMeta, ObjectStore};
use tempfile::tempdir;

Expand Down
8 changes: 4 additions & 4 deletions datafusion-examples/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::config::CsvOptions;
use datafusion::common::parsers::CompressionTypeVariant;
use datafusion::common::DataFusionError;
use datafusion::common::ScalarValue;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::error::Result;
use datafusion::functions_aggregate::average::avg;
use datafusion::functions_aggregate::min_max::max;
use datafusion::prelude::*;
use datafusion_common::config::CsvOptions;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::DataFusionError;
use datafusion_common::ScalarValue;
use std::fs::File;
use std::io::Write;
use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/date_time_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use datafusion::arrow::array::StringArray;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::assert_batches_eq;
use datafusion::common::assert_contains;
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::assert_contains;

#[tokio::main]
async fn main() -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/deserialize_to_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

use arrow::array::{AsArray, PrimitiveArray};
use arrow::datatypes::{Float64Type, Int32Type};
use datafusion::common::assert_batches_eq;
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::assert_batches_eq;
use futures::StreamExt;

/// This example shows how to convert query results into Rust structs by using
Expand Down
24 changes: 12 additions & 12 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ use arrow::array::{BooleanArray, Int32Array, Int8Array};
use arrow::record_batch::RecordBatch;

use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::common::DFSchema;
use datafusion::common::{ScalarValue, ToDFSchema};
use datafusion::error::Result;
use datafusion::functions_aggregate::first_last::first_value_udaf;
use datafusion::logical_expr::execution_props::ExecutionProps;
use datafusion::logical_expr::expr::BinaryExpr;
use datafusion::logical_expr::interval_arithmetic::Interval;
use datafusion::logical_expr::simplify::SimplifyContext;
use datafusion::logical_expr::{ColumnarValue, ExprFunctionExt, ExprSchemable, Operator};
use datafusion::optimizer::analyzer::type_coercion::TypeCoercionRewriter;
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries};
use datafusion::prelude::*;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr::BinaryExpr;
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{ColumnarValue, ExprFunctionExt, ExprSchemable, Operator};
use datafusion_optimizer::analyzer::type_coercion::TypeCoercionRewriter;

/// This example demonstrates the DataFusion [`Expr`] API.
///
Expand Down Expand Up @@ -357,7 +357,7 @@ fn type_coercion_demo() -> Result<()> {
// Evaluation with an expression that has not been type coerced cannot succeed.
let props = ExecutionProps::default();
let physical_expr =
datafusion_physical_expr::create_physical_expr(&expr, &df_schema, &props)?;
datafusion::physical_expr::create_physical_expr(&expr, &df_schema, &props)?;
let e = physical_expr.evaluate(&batch).unwrap_err();
assert!(e
.find_root()
Expand All @@ -373,7 +373,7 @@ fn type_coercion_demo() -> Result<()> {
let context = SimplifyContext::new(&props).with_schema(Arc::new(df_schema.clone()));
let simplifier = ExprSimplifier::new(context);
let coerced_expr = simplifier.coerce(expr.clone(), &df_schema)?;
let physical_expr = datafusion_physical_expr::create_physical_expr(
let physical_expr = datafusion::physical_expr::create_physical_expr(
&coerced_expr,
&df_schema,
&props,
Expand All @@ -385,7 +385,7 @@ fn type_coercion_demo() -> Result<()> {
.clone()
.rewrite(&mut TypeCoercionRewriter::new(&df_schema))?
.data;
let physical_expr = datafusion_physical_expr::create_physical_expr(
let physical_expr = datafusion::physical_expr::create_physical_expr(
&coerced_expr,
&df_schema,
&props,
Expand Down Expand Up @@ -413,7 +413,7 @@ fn type_coercion_demo() -> Result<()> {
}
})?
.data;
let physical_expr = datafusion_physical_expr::create_physical_expr(
let physical_expr = datafusion::physical_expr::create_physical_expr(
&coerced_expr,
&df_schema,
&props,
Expand Down
Loading
Loading