Skip to content

Commit

Permalink
Export datafusion_physical_optimizer, only use datafusion crate i…
Browse files Browse the repository at this point in the history
…n the examples (#14305)

* Only use `datafusion` crate in the examples

* taplo format
  • Loading branch information
alamb authored Jan 27, 2025
1 parent be56bd3 commit 0eebc0c
Show file tree
Hide file tree
Showing 35 changed files with 174 additions and 185 deletions.
10 changes: 2 additions & 8 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,15 @@ path = "examples/external_dependency/query-aws-s3.rs"

[dev-dependencies]
arrow = { workspace = true }
# arrow_schema is required for record_batch! macro :sad:
arrow-flight = { workspace = true }
arrow-schema = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
dashmap = { workspace = true }
# note only use main datafusion crate for examples
datafusion = { workspace = true, default-features = true, features = ["avro"] }
datafusion-catalog = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-expr = { workspace = true }
datafusion-functions-window-common = { workspace = true }
datafusion-optimizer = { workspace = true, default-features = true }
datafusion-physical-expr = { workspace = true, default-features = true }
datafusion-physical-optimizer = { workspace = true, default-features = true }
datafusion-proto = { workspace = true }
datafusion-sql = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
Expand Down
23 changes: 11 additions & 12 deletions datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow_schema::SchemaRef;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::catalog::Session;
use datafusion::common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::{
ParquetAccessPlan, ParquetExecBuilder,
Expand All @@ -35,6 +40,8 @@ use datafusion::datasource::physical_plan::{
};
use datafusion::datasource::TableProvider;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::logical_expr::utils::conjunction;
use datafusion::logical_expr::{TableProviderFilterPushDown, TableType};
use datafusion::parquet::arrow::arrow_reader::{
ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector,
};
Expand All @@ -43,20 +50,12 @@ use datafusion::parquet::arrow::ArrowWriter;
use datafusion::parquet::file::metadata::ParquetMetaData;
use datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties};
use datafusion::parquet::schema::types::ColumnPath;
use datafusion::physical_expr::utils::{Guarantee, LiteralGuarantee};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use datafusion_common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::utils::conjunction;
use datafusion_expr::{TableProviderFilterPushDown, TableType};
use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee};
use datafusion_physical_optimizer::pruning::PruningPredicate;

use async_trait::async_trait;
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::FutureExt;
use object_store::ObjectStore;
Expand Down Expand Up @@ -284,7 +283,7 @@ impl IndexTableProvider {
.transpose()?
// if there are no filters, use a literal true to have a predicate
// that always evaluates to true we can pass to the index
.unwrap_or_else(|| datafusion_physical_expr::expressions::lit(true));
.unwrap_or_else(|| datafusion::physical_expr::expressions::lit(true));

Ok(predicate)
}
Expand Down
26 changes: 12 additions & 14 deletions datafusion-examples/examples/advanced_udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,25 @@
// specific language governing permissions and limitations
// under the License.

use arrow_schema::{Field, Schema};
use arrow::datatypes::{Field, Schema};
use datafusion::physical_expr::NullState;
use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility};
use datafusion_physical_expr::NullState;
use std::{any::Any, sync::Arc};

use arrow::{
array::{
ArrayRef, AsArray, Float32Array, PrimitiveArray, PrimitiveBuilder, UInt32Array,
},
datatypes::{ArrowNativeTypeOp, ArrowPrimitiveType, Float64Type, UInt32Type},
record_batch::RecordBatch,
use arrow::array::{
ArrayRef, AsArray, Float32Array, PrimitiveArray, PrimitiveBuilder, UInt32Array,
};
use arrow::datatypes::{ArrowNativeTypeOp, ArrowPrimitiveType, Float64Type, UInt32Type};
use arrow::record_batch::RecordBatch;
use datafusion::common::{cast::as_float64_array, ScalarValue};
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::{cast::as_float64_array, ScalarValue};
use datafusion_expr::{
use datafusion::logical_expr::{
expr::AggregateFunction,
function::{AccumulatorArgs, AggregateFunctionSimplification, StateFieldsArgs},
simplify::SimplifyInfo,
Accumulator, AggregateUDF, AggregateUDFImpl, GroupsAccumulator, Signature,
Accumulator, AggregateUDF, AggregateUDFImpl, EmitTo, GroupsAccumulator, Signature,
};
use datafusion::prelude::*;

/// This example shows how to use the full AggregateUDFImpl API to implement a user
/// defined aggregate function. As in the `simple_udaf.rs` example, this struct implements
Expand Down Expand Up @@ -308,7 +306,7 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator {
}

/// Generate output, as specified by `emit_to` and update the intermediate state
fn evaluate(&mut self, emit_to: datafusion_expr::EmitTo) -> Result<ArrayRef> {
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
let counts = emit_to.take_needed(&mut self.counts);
let prods = emit_to.take_needed(&mut self.prods);
let nulls = self.null_state.build(emit_to);
Expand Down Expand Up @@ -344,7 +342,7 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator {
}

// return arrays for counts and prods
fn state(&mut self, emit_to: datafusion_expr::EmitTo) -> Result<Vec<ArrayRef>> {
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
let nulls = self.null_state.build(emit_to);
let nulls = Some(nulls);

Expand Down
8 changes: 4 additions & 4 deletions datafusion-examples/examples/advanced_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ use arrow::array::{
use arrow::compute;
use arrow::datatypes::{DataType, Float64Type};
use arrow::record_batch::RecordBatch;
use datafusion::common::{exec_err, internal_err, ScalarValue};
use datafusion::error::Result;
use datafusion::logical_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion::logical_expr::Volatility;
use datafusion::prelude::*;
use datafusion_common::{exec_err, internal_err, ScalarValue};
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{
use datafusion::logical_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
};
use datafusion::prelude::*;

/// This example shows how to use the full ScalarUDFImpl API to implement a user
/// defined function. As in the `simple_udf.rs` example, this struct implements
Expand Down
22 changes: 12 additions & 10 deletions datafusion-examples/examples/advanced_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,24 @@
use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility};
use std::any::Any;

use arrow::datatypes::Field;
use arrow::{
array::{ArrayRef, AsArray, Float64Array},
datatypes::Float64Type,
};
use arrow_schema::Field;
use datafusion::common::ScalarValue;
use datafusion::error::Result;
use datafusion::functions_aggregate::average::avg_udaf;
use datafusion::prelude::*;
use datafusion_common::ScalarValue;
use datafusion_expr::expr::WindowFunction;
use datafusion_expr::function::{WindowFunctionSimplification, WindowUDFFieldArgs};
use datafusion_expr::simplify::SimplifyInfo;
use datafusion_expr::{
Expr, PartitionEvaluator, Signature, WindowFrame, WindowUDF, WindowUDFImpl,
use datafusion::logical_expr::expr::WindowFunction;
use datafusion::logical_expr::function::{
PartitionEvaluatorArgs, WindowFunctionSimplification, WindowUDFFieldArgs,
};
use datafusion::logical_expr::simplify::SimplifyInfo;
use datafusion::logical_expr::{
Expr, PartitionEvaluator, Signature, WindowFrame, WindowFunctionDefinition,
WindowUDF, WindowUDFImpl,
};
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use datafusion::prelude::*;

/// This example shows how to use the full WindowUDFImpl API to implement a user
/// defined window function. As in the `simple_udwf.rs` example, this struct implements
Expand Down Expand Up @@ -189,7 +191,7 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
fn simplify(&self) -> Option<WindowFunctionSimplification> {
let simplify = |window_function: WindowFunction, _: &dyn SimplifyInfo| {
Ok(Expr::WindowFunction(WindowFunction {
fun: datafusion_expr::WindowFunctionDefinition::AggregateUDF(avg_udaf()),
fun: WindowFunctionDefinition::AggregateUDF(avg_udaf()),
args: window_function.args,
partition_by: window_function.partition_by,
order_by: window_function.order_by,
Expand Down
10 changes: 5 additions & 5 deletions datafusion-examples/examples/analyzer_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
// under the License.

use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use datafusion::common::config::ConfigOptions;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::common::Result;
use datafusion::logical_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder};
use datafusion::optimizer::analyzer::AnalyzerRule;
use datafusion::prelude::SessionContext;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::Result;
use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder};
use datafusion_optimizer::analyzer::AnalyzerRule;
use std::sync::{Arc, Mutex};

/// This example demonstrates how to add your own [`AnalyzerRule`] to
Expand Down
6 changes: 3 additions & 3 deletions datafusion-examples/examples/composed_extension_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ use std::ops::Deref;
use std::sync::Arc;

use datafusion::common::Result;
use datafusion::common::{internal_err, DataFusionError};
use datafusion::logical_expr::registry::FunctionRegistry;
use datafusion::logical_expr::{AggregateUDF, ScalarUDF};
use datafusion::physical_plan::{DisplayAs, ExecutionPlan};
use datafusion::prelude::SessionContext;
use datafusion_common::{internal_err, DataFusionError};
use datafusion_expr::registry::FunctionRegistry;
use datafusion_expr::{AggregateUDF, ScalarUDF};
use datafusion_proto::physical_plan::{AsExecutionPlan, PhysicalExtensionCodec};
use datafusion_proto::protobuf;

Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/csv_json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::sync::Arc;

use arrow_schema::{DataType, Field, Schema};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::{
assert_batches_eq,
datasource::{
Expand Down
7 changes: 3 additions & 4 deletions datafusion-examples/examples/custom_file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ use std::{any::Any, sync::Arc};

use arrow::{
array::{AsArray, RecordBatch, StringArray, UInt8Array},
datatypes::UInt64Type,
datatypes::{DataType, Field, Schema, SchemaRef, UInt64Type},
};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::common::{GetExt, Statistics};
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::physical_expr::LexRequirement;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::{
datasource::{
file_format::{
Expand All @@ -38,8 +39,6 @@ use datafusion::{
physical_plan::ExecutionPlan,
prelude::SessionContext,
};
use datafusion_common::{GetExt, Statistics};
use datafusion_physical_expr::PhysicalExpr;
use object_store::{ObjectMeta, ObjectStore};
use tempfile::tempdir;

Expand Down
8 changes: 4 additions & 4 deletions datafusion-examples/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::config::CsvOptions;
use datafusion::common::parsers::CompressionTypeVariant;
use datafusion::common::DataFusionError;
use datafusion::common::ScalarValue;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::error::Result;
use datafusion::functions_aggregate::average::avg;
use datafusion::functions_aggregate::min_max::max;
use datafusion::prelude::*;
use datafusion_common::config::CsvOptions;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::DataFusionError;
use datafusion_common::ScalarValue;
use std::fs::File;
use std::io::Write;
use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/date_time_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use datafusion::arrow::array::StringArray;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::assert_batches_eq;
use datafusion::common::assert_contains;
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::assert_contains;

#[tokio::main]
async fn main() -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/deserialize_to_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

use arrow::array::{AsArray, PrimitiveArray};
use arrow::datatypes::{Float64Type, Int32Type};
use datafusion::common::assert_batches_eq;
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::assert_batches_eq;
use futures::StreamExt;

/// This example shows how to convert query results into Rust structs by using
Expand Down
24 changes: 12 additions & 12 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ use arrow::array::{BooleanArray, Int32Array, Int8Array};
use arrow::record_batch::RecordBatch;

use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::common::DFSchema;
use datafusion::common::{ScalarValue, ToDFSchema};
use datafusion::error::Result;
use datafusion::functions_aggregate::first_last::first_value_udaf;
use datafusion::logical_expr::execution_props::ExecutionProps;
use datafusion::logical_expr::expr::BinaryExpr;
use datafusion::logical_expr::interval_arithmetic::Interval;
use datafusion::logical_expr::simplify::SimplifyContext;
use datafusion::logical_expr::{ColumnarValue, ExprFunctionExt, ExprSchemable, Operator};
use datafusion::optimizer::analyzer::type_coercion::TypeCoercionRewriter;
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries};
use datafusion::prelude::*;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr::BinaryExpr;
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{ColumnarValue, ExprFunctionExt, ExprSchemable, Operator};
use datafusion_optimizer::analyzer::type_coercion::TypeCoercionRewriter;

/// This example demonstrates the DataFusion [`Expr`] API.
///
Expand Down Expand Up @@ -357,7 +357,7 @@ fn type_coercion_demo() -> Result<()> {
// Evaluation with an expression that has not been type coerced cannot succeed.
let props = ExecutionProps::default();
let physical_expr =
datafusion_physical_expr::create_physical_expr(&expr, &df_schema, &props)?;
datafusion::physical_expr::create_physical_expr(&expr, &df_schema, &props)?;
let e = physical_expr.evaluate(&batch).unwrap_err();
assert!(e
.find_root()
Expand All @@ -373,7 +373,7 @@ fn type_coercion_demo() -> Result<()> {
let context = SimplifyContext::new(&props).with_schema(Arc::new(df_schema.clone()));
let simplifier = ExprSimplifier::new(context);
let coerced_expr = simplifier.coerce(expr.clone(), &df_schema)?;
let physical_expr = datafusion_physical_expr::create_physical_expr(
let physical_expr = datafusion::physical_expr::create_physical_expr(
&coerced_expr,
&df_schema,
&props,
Expand All @@ -385,7 +385,7 @@ fn type_coercion_demo() -> Result<()> {
.clone()
.rewrite(&mut TypeCoercionRewriter::new(&df_schema))?
.data;
let physical_expr = datafusion_physical_expr::create_physical_expr(
let physical_expr = datafusion::physical_expr::create_physical_expr(
&coerced_expr,
&df_schema,
&props,
Expand Down Expand Up @@ -413,7 +413,7 @@ fn type_coercion_demo() -> Result<()> {
}
})?
.data;
let physical_expr = datafusion_physical_expr::create_physical_expr(
let physical_expr = datafusion::physical_expr::create_physical_expr(
&coerced_expr,
&df_schema,
&props,
Expand Down
Loading

0 comments on commit 0eebc0c

Please sign in to comment.