diff --git a/Cargo.toml b/Cargo.toml index a3f521a725c9..1581c115f505 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,7 +96,7 @@ async-trait = "0.1.73" bigdecimal = "0.4.7" bytes = "1.4" chrono = { version = "0.4.38", default-features = false } -ctor = "0.2.0" +ctor = "0.2.9" dashmap = "6.0.1" datafusion = { path = "datafusion/core", version = "44.0.0", default-features = false } datafusion-catalog = { path = "datafusion/catalog", version = "44.0.0" } diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index ec4a58fab346..8754612bfb11 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -66,7 +66,7 @@ url = "2.5.4" [dev-dependencies] assert_cmd = "2.0" -ctor = "0.2.0" +ctor = "0.2.9" predicates = "3.0" rstest = "0.22" diff --git a/datafusion/common/src/pyarrow.rs b/datafusion/common/src/pyarrow.rs index bdcf831c7884..29869c8da561 100644 --- a/datafusion/common/src/pyarrow.rs +++ b/datafusion/common/src/pyarrow.rs @@ -138,6 +138,9 @@ mod tests { fn test_py_scalar() { init_python(); + // TODO: remove this attribute when bumping pyo3 to v0.23.0 + // See: + #[allow(unexpected_cfgs)] Python::with_gil(|py| { let scalar_float = ScalarValue::Float64(Some(12.34)); let py_float = scalar_float.into_py(py).call_method0(py, "as_py").unwrap(); diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 7470597ef72c..06b94f804268 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1148,8 +1148,8 @@ impl ListingTable { /// This method first checks if the statistics for the given file are already cached. /// If they are, it returns the cached statistics. /// If they are not, it infers the statistics from the file and stores them in the cache. - async fn do_collect_statistics<'a>( - &'a self, + async fn do_collect_statistics( + &self, ctx: &SessionState, store: &Arc, part_file: &PartitionedFile, diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index f20475df150b..e5da49ad7b8b 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1047,7 +1047,7 @@ impl SessionContext { Ok(table) } - async fn find_and_deregister<'a>( + async fn find_and_deregister( &self, table_ref: impl Into, table_type: TableType, @@ -1481,10 +1481,7 @@ impl SessionContext { /// provided reference. /// /// [`register_table`]: SessionContext::register_table - pub async fn table<'a>( - &self, - table_ref: impl Into, - ) -> Result { + pub async fn table(&self, table_ref: impl Into) -> Result { let table_ref: TableReference = table_ref.into(); let provider = self.table_provider(table_ref.clone()).await?; let plan = LogicalPlanBuilder::scan( @@ -1511,7 +1508,7 @@ impl SessionContext { } /// Return a [`TableProvider`] for the specified table. - pub async fn table_provider<'a>( + pub async fn table_provider( &self, table_ref: impl Into, ) -> Result> { diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 96b2454fa330..9f5afc7abc2e 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -285,9 +285,7 @@ pub(crate) fn replace_with_order_preserving_variants( mod tests { use super::*; - use crate::datasource::file_format::file_compression_type::FileCompressionType; - use crate::datasource::listing::PartitionedFile; - use crate::datasource::physical_plan::{CsvExec, FileScanConfig}; + use crate::execution::TaskContext; use crate::physical_optimizer::test_utils::check_integrity; use crate::physical_plan::coalesce_batches::CoalesceBatchesExec; use crate::physical_plan::filter::FilterExec; @@ -296,18 +294,24 @@ mod tests { use crate::physical_plan::{ displayable, get_plan_string, ExecutionPlan, Partitioning, }; - use crate::prelude::SessionConfig; + use crate::prelude::{SessionConfig, SessionContext}; use crate::test::TestStreamPartition; + use arrow::array::{ArrayRef, Int32Array}; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use arrow::record_batch::RecordBatch; use datafusion_common::tree_node::{TransformedResult, TreeNode}; use datafusion_common::Result; - use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::{JoinType, Operator}; use datafusion_physical_expr::expressions::{self, col, Column}; use datafusion_physical_expr::PhysicalSortExpr; + use datafusion_physical_plan::collect; + use datafusion_physical_plan::memory::MemoryExec; use datafusion_physical_plan::streaming::StreamingTableExec; + use object_store::memory::InMemory; + use object_store::ObjectStore; + use url::Url; use rstest::rstest; @@ -328,20 +332,24 @@ mod tests { /// * `$PLAN`: The plan to optimize. /// * `$SOURCE_UNBOUNDED`: Whether the given plan contains an unbounded source. macro_rules! assert_optimized_in_all_boundedness_situations { - ($EXPECTED_UNBOUNDED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PLAN_LINES: expr, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $SOURCE_UNBOUNDED: expr) => { + ($EXPECTED_UNBOUNDED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PLAN_LINES: expr, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $SOURCE_UNBOUNDED: expr, $PREFER_EXISTING_SORT: expr) => { if $SOURCE_UNBOUNDED { assert_optimized_prefer_sort_on_off!( $EXPECTED_UNBOUNDED_PLAN_LINES, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES, - $PLAN + $PLAN, + $PREFER_EXISTING_SORT, + $SOURCE_UNBOUNDED ); } else { assert_optimized_prefer_sort_on_off!( $EXPECTED_BOUNDED_PLAN_LINES, $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES, $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES, - $PLAN + $PLAN, + $PREFER_EXISTING_SORT, + $SOURCE_UNBOUNDED ); } }; @@ -359,19 +367,24 @@ mod tests { /// the flag `prefer_existing_sort` is `true`. /// * `$PLAN`: The plan to optimize. macro_rules! assert_optimized_prefer_sort_on_off { - ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr) => { - assert_optimized!( - $EXPECTED_PLAN_LINES, - $EXPECTED_OPTIMIZED_PLAN_LINES, - $PLAN.clone(), - false - ); - assert_optimized!( - $EXPECTED_PLAN_LINES, - $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES, - $PLAN, - true - ); + ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $PREFER_EXISTING_SORT: expr, $SOURCE_UNBOUNDED: expr) => { + if $PREFER_EXISTING_SORT { + assert_optimized!( + $EXPECTED_PLAN_LINES, + $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES, + $PLAN, + $PREFER_EXISTING_SORT, + $SOURCE_UNBOUNDED + ); + } else { + assert_optimized!( + $EXPECTED_PLAN_LINES, + $EXPECTED_OPTIMIZED_PLAN_LINES, + $PLAN, + $PREFER_EXISTING_SORT, + $SOURCE_UNBOUNDED + ); + } }; } @@ -385,7 +398,7 @@ mod tests { /// * `$PLAN`: The plan to optimize. /// * `$PREFER_EXISTING_SORT`: Value of the `prefer_existing_sort` flag. macro_rules! assert_optimized { - ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $PREFER_EXISTING_SORT: expr) => { + ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $PREFER_EXISTING_SORT: expr, $SOURCE_UNBOUNDED: expr) => { let physical_plan = $PLAN; let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); @@ -412,6 +425,19 @@ mod tests { expected_optimized_lines, actual, "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n" ); + + if !$SOURCE_UNBOUNDED { + let ctx = SessionContext::new(); + let object_store = InMemory::new(); + object_store.put(&object_store::path::Path::from("file_path"), bytes::Bytes::from("").into()).await?; + ctx.register_object_store(&Url::parse("test://").unwrap(), Arc::new(object_store)); + let task_ctx = Arc::new(TaskContext::from(&ctx)); + let res = collect(optimized_physical_plan, task_ctx).await; + assert!( + res.is_ok(), + "Some errors occurred while executing the optimized physical plan: {:?}", res.unwrap_err() + ); + } }; } @@ -420,13 +446,14 @@ mod tests { // Searches for a simple sort and a repartition just after it, the second repartition with 1 input partition should not be affected async fn test_replace_multiple_input_repartition_1( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition = repartition_exec_hash(repartition_exec_round_robin(source)); let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true); @@ -447,7 +474,7 @@ mod tests { " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -464,13 +491,13 @@ mod tests { " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -479,7 +506,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -488,13 +516,14 @@ mod tests { #[tokio::test] async fn test_with_inter_children_change_only( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr_default("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -538,7 +567,7 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC", ]; // Expected unbounded result (same for with and without flag) @@ -564,7 +593,7 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [a@0 ASC]", @@ -574,7 +603,7 @@ mod tests { " SortPreservingMergeExec: [a@0 ASC]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -583,7 +612,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -592,13 +622,14 @@ mod tests { #[tokio::test] async fn test_replace_multiple_input_repartition_2( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let filter = filter_exec(repartition_rr); @@ -623,7 +654,7 @@ mod tests { " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " FilterExec: c@1 > 3", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -642,14 +673,14 @@ mod tests { " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " FilterExec: c@1 > 3", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " FilterExec: c@1 > 3", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -658,7 +689,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -667,13 +699,14 @@ mod tests { #[tokio::test] async fn test_replace_multiple_input_repartition_with_extra_steps( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -701,7 +734,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -722,7 +755,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", @@ -730,7 +763,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -739,7 +772,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -748,13 +782,14 @@ mod tests { #[tokio::test] async fn test_replace_multiple_input_repartition_with_extra_steps_2( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr); @@ -786,7 +821,7 @@ mod tests { " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -809,7 +844,7 @@ mod tests { " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", @@ -818,7 +853,7 @@ mod tests { " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -827,7 +862,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -836,13 +872,14 @@ mod tests { #[tokio::test] async fn test_not_replacing_when_no_need_to_preserve_sorting( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -867,7 +904,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -887,7 +924,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = expected_optimized_bounded; @@ -898,7 +935,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -907,13 +945,14 @@ mod tests { #[tokio::test] async fn test_with_multiple_replacable_repartitions( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -944,7 +983,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -967,7 +1006,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", @@ -976,7 +1015,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -985,7 +1024,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -994,13 +1034,14 @@ mod tests { #[tokio::test] async fn test_not_replace_with_different_orderings( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -1028,7 +1069,7 @@ mod tests { " SortExec: expr=[c@1 ASC], preserve_partitioning=[true]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -1046,7 +1087,7 @@ mod tests { " SortExec: expr=[c@1 ASC], preserve_partitioning=[true]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = expected_optimized_bounded; @@ -1057,7 +1098,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -1066,13 +1108,14 @@ mod tests { #[tokio::test] async fn test_with_lost_ordering( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -1093,7 +1136,7 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -1110,13 +1153,13 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -1125,7 +1168,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -1134,13 +1178,14 @@ mod tests { #[tokio::test] async fn test_with_lost_and_kept_ordering( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; let source = if source_unbounded { stream_exec_ordered(&schema, sort_exprs) } else { - csv_exec_sorted(&schema, sort_exprs) + memory_exec_sorted(&schema, sort_exprs) }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -1184,7 +1229,7 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -1211,7 +1256,7 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = [ "SortPreservingMergeExec: [c@1 ASC]", @@ -1222,7 +1267,7 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; assert_optimized_in_all_boundedness_situations!( expected_input_unbounded, @@ -1231,7 +1276,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -1240,6 +1286,7 @@ mod tests { #[tokio::test] async fn test_with_multiple_child_trees( #[values(false, true)] source_unbounded: bool, + #[values(false, true)] prefer_existing_sort: bool, ) -> Result<()> { let schema = create_test_schema()?; @@ -1247,7 +1294,7 @@ mod tests { let left_source = if source_unbounded { stream_exec_ordered(&schema, left_sort_exprs) } else { - csv_exec_sorted(&schema, left_sort_exprs) + memory_exec_sorted(&schema, left_sort_exprs) }; let left_repartition_rr = repartition_exec_round_robin(left_source); let left_repartition_hash = repartition_exec_hash(left_repartition_rr); @@ -1258,7 +1305,7 @@ mod tests { let right_source = if source_unbounded { stream_exec_ordered(&schema, right_sort_exprs) } else { - csv_exec_sorted(&schema, right_sort_exprs) + memory_exec_sorted(&schema, right_sort_exprs) }; let right_repartition_rr = repartition_exec_round_robin(right_source); let right_repartition_hash = repartition_exec_hash(right_repartition_rr); @@ -1299,11 +1346,11 @@ mod tests { " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; // Expected unbounded result (same for with and without flag) @@ -1330,11 +1377,11 @@ mod tests { " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " MemoryExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", ]; let expected_optimized_bounded_sort_preserve = expected_optimized_bounded; @@ -1345,7 +1392,8 @@ mod tests { expected_optimized_bounded, expected_optimized_bounded_sort_preserve, physical_plan, - source_unbounded + source_unbounded, + prefer_existing_sort ); Ok(()) } @@ -1492,33 +1540,36 @@ mod tests { ) } - // creates a csv exec source for the test purposes - // projection and has_header parameters are given static due to testing needs - fn csv_exec_sorted( + // creates a memory exec source for the test purposes + // projection parameter is given static due to testing needs + fn memory_exec_sorted( schema: &SchemaRef, sort_exprs: impl IntoIterator, ) -> Arc { - let sort_exprs = sort_exprs.into_iter().collect(); - let projection: Vec = vec![0, 2, 3]; + pub fn make_partition(schema: &SchemaRef, sz: i32) -> RecordBatch { + let values = (0..sz).collect::>(); + let arr = Arc::new(Int32Array::from(values)); + let arr = arr as ArrayRef; - Arc::new( - CsvExec::builder( - FileScanConfig::new( - ObjectStoreUrl::parse("test:///").unwrap(), - schema.clone(), - ) - .with_file(PartitionedFile::new("file_path".to_string(), 100)) - .with_projection(Some(projection)) - .with_output_ordering(vec![sort_exprs]), + RecordBatch::try_new( + schema.clone(), + vec![arr.clone(), arr.clone(), arr.clone(), arr], ) - .with_has_header(true) - .with_delimeter(0) - .with_quote(b'"') - .with_escape(None) - .with_comment(None) - .with_newlines_in_values(false) - .with_file_compression_type(FileCompressionType::UNCOMPRESSED) - .build(), - ) + .unwrap() + } + + let rows = 5; + let partitions = 1; + let sort_exprs = sort_exprs.into_iter().collect(); + Arc::new({ + let data: Vec> = (0..partitions) + .map(|_| vec![make_partition(schema, rows)]) + .collect(); + let projection: Vec = vec![0, 2, 3]; + MemoryExec::try_new(&data, schema.clone(), Some(projection)) + .unwrap() + .try_with_sort_information(vec![sort_exprs]) + .unwrap() + }) } } diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index c5016a18d443..a57fd80c48e1 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -71,7 +71,8 @@ pub use datafusion_expr_common::columnar_value::ColumnarValue; pub use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; pub use datafusion_expr_common::operator::Operator; pub use datafusion_expr_common::signature::{ - ArrayFunctionSignature, Signature, TypeSignature, Volatility, TIMEZONE_WILDCARD, + ArrayFunctionSignature, Signature, TypeSignature, TypeSignatureClass, Volatility, + TIMEZONE_WILDCARD, }; pub use datafusion_expr_common::type_coercion::binary; pub use expr::{ diff --git a/datafusion/expr/src/type_coercion/functions.rs b/datafusion/expr/src/type_coercion/functions.rs index 96bb5c4b2d8f..5294cc526d38 100644 --- a/datafusion/expr/src/type_coercion/functions.rs +++ b/datafusion/expr/src/type_coercion/functions.rs @@ -438,18 +438,11 @@ fn get_valid_types( } fn function_length_check(length: usize, expected_length: usize) -> Result<()> { - if length < 1 { - return plan_err!( - "The signature expected at least one argument but received {expected_length}" - ); - } - if length != expected_length { return plan_err!( - "The signature expected {length} arguments but received {expected_length}" + "The signature expected {expected_length} arguments but received {length}" ); } - Ok(()) } @@ -939,6 +932,7 @@ mod tests { use super::*; use arrow::datatypes::Field; + use datafusion_common::assert_contains; #[test] fn test_string_conversion() { @@ -1027,6 +1021,29 @@ mod tests { Ok(()) } + #[test] + fn test_get_valid_types_length_check() -> Result<()> { + let signature = TypeSignature::Numeric(1); + + let err = get_valid_types(&signature, &[]).unwrap_err(); + assert_contains!( + err.to_string(), + "The signature expected 1 arguments but received 0" + ); + + let err = get_valid_types( + &signature, + &[DataType::Int32, DataType::Int32, DataType::Int32], + ) + .unwrap_err(); + assert_contains!( + err.to_string(), + "The signature expected 1 arguments but received 3" + ); + + Ok(()) + } + #[test] fn test_fixed_list_wildcard_coerce() -> Result<()> { let inner = Arc::new(Field::new_list_field(DataType::Int32, false)); diff --git a/datafusion/functions-window/src/nth_value.rs b/datafusion/functions-window/src/nth_value.rs index d15e76718b02..e5d866940c05 100644 --- a/datafusion/functions-window/src/nth_value.rs +++ b/datafusion/functions-window/src/nth_value.rs @@ -360,9 +360,10 @@ impl PartitionEvaluator for NthValueEvaluator { }) .unwrap_or_default(); if valid_indices.is_empty() { - return ScalarValue::try_from(arr.data_type()); + None + } else { + Some(valid_indices) } - Some(valid_indices) } else { None }; diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml index fd986c4be41c..c8025fb2d895 100644 --- a/datafusion/functions/Cargo.toml +++ b/datafusion/functions/Cargo.toml @@ -204,6 +204,11 @@ harness = false name = "strpos" required-features = ["unicode_expressions"] +[[bench]] +harness = false +name = "reverse" +required-features = ["unicode_expressions"] + [[bench]] harness = false name = "trunc" diff --git a/datafusion/functions/benches/reverse.rs b/datafusion/functions/benches/reverse.rs new file mode 100644 index 000000000000..c7c1ef8a8220 --- /dev/null +++ b/datafusion/functions/benches/reverse.rs @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +extern crate criterion; + +use arrow::array::OffsetSizeTrait; +use arrow::util::bench_util::{ + create_string_array_with_len, create_string_view_array_with_len, +}; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use datafusion_expr::ColumnarValue; +use datafusion_functions::unicode; +use std::sync::Arc; + +fn create_args( + size: usize, + str_len: usize, + force_view_types: bool, +) -> Vec { + if force_view_types { + let string_array = + Arc::new(create_string_view_array_with_len(size, 0.1, str_len, false)); + + vec![ColumnarValue::Array(string_array)] + } else { + let string_array = + Arc::new(create_string_array_with_len::(size, 0.1, str_len)); + + vec![ColumnarValue::Array(string_array)] + } +} + +fn criterion_benchmark(c: &mut Criterion) { + let reverse = unicode::reverse(); + for size in [1024, 4096] { + let str_len = 8; + + let args = create_args::(size, str_len, true); + c.bench_function( + format!("reverse_string_view [size={}, str_len={}]", size, str_len).as_str(), + |b| { + b.iter(|| { + // TODO use invoke_with_args + black_box(reverse.invoke_batch(&args, str_len)) + }) + }, + ); + + let str_len = 32; + + let args = create_args::(size, str_len, true); + c.bench_function( + format!("reverse_string_view [size={}, str_len={}]", size, str_len).as_str(), + |b| { + b.iter(|| { + // TODO use invoke_with_args + black_box(reverse.invoke_batch(&args, str_len)) + }) + }, + ); + + let args = create_args::(size, str_len, false); + c.bench_function( + format!("reverse_string [size={}, str_len={}]", size, str_len).as_str(), + |b| { + b.iter(|| { + // TODO use invoke_with_args + black_box(reverse.invoke_batch(&args, str_len)) + }) + }, + ); + } +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/functions/src/unicode/reverse.rs b/datafusion/functions/src/unicode/reverse.rs index 5ad347ed96c0..f07deda70e52 100644 --- a/datafusion/functions/src/unicode/reverse.rs +++ b/datafusion/functions/src/unicode/reverse.rs @@ -20,8 +20,7 @@ use std::sync::Arc; use crate::utils::{make_scalar_function, utf8_to_str_type}; use arrow::array::{ - Array, ArrayAccessor, ArrayIter, ArrayRef, AsArray, GenericStringArray, - OffsetSizeTrait, + Array, ArrayRef, AsArray, GenericStringBuilder, OffsetSizeTrait, StringArrayType, }; use arrow::datatypes::DataType; use datafusion_common::{exec_err, Result}; @@ -105,8 +104,7 @@ impl ScalarUDFImpl for ReverseFunc { } } -/// Reverses the order of the characters in the string. -/// reverse('abcde') = 'edcba' +/// Reverses the order of the characters in the string `reverse('abcde') = 'edcba'`. /// The implementation uses UTF-8 code points as characters pub fn reverse(args: &[ArrayRef]) -> Result { if args[0].data_type() == &Utf8View { @@ -116,14 +114,23 @@ pub fn reverse(args: &[ArrayRef]) -> Result { } } -fn reverse_impl<'a, T: OffsetSizeTrait, V: ArrayAccessor>( +fn reverse_impl<'a, T: OffsetSizeTrait, V: StringArrayType<'a>>( string_array: V, ) -> Result { - let result = ArrayIter::new(string_array) - .map(|string| string.map(|string: &str| string.chars().rev().collect::())) - .collect::>(); + let mut builder = GenericStringBuilder::::with_capacity(string_array.len(), 1024); + + let mut reversed = String::new(); + for string in string_array.iter() { + if let Some(s) = string { + reversed.extend(s.chars().rev()); + builder.append_value(&reversed); + reversed.clear(); + } else { + builder.append_null(); + } + } - Ok(Arc::new(result) as ArrayRef) + Ok(Arc::new(builder.finish()) as ArrayRef) } #[cfg(test)] diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index c2e892d63da0..e90f9c32ee87 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -159,9 +159,7 @@ pub trait DynEq { impl DynEq for T { fn dyn_eq(&self, other: &dyn Any) -> bool { - other - .downcast_ref::() - .map_or(false, |other| other == self) + other.downcast_ref::() == Some(self) } } diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index 0d7501610662..63397e69c09d 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -291,9 +291,9 @@ impl PhysicalSortRequirement { /// Returns whether this requirement is equal or more specific than `other`. pub fn compatible(&self, other: &PhysicalSortRequirement) -> bool { self.expr.eq(&other.expr) - && other.options.map_or(true, |other_opts| { - self.options.map_or(false, |opts| opts == other_opts) - }) + && other + .options + .map_or(true, |other_opts| self.options == Some(other_opts)) } #[deprecated(since = "43.0.0", note = "use LexRequirement::from_lex_ordering")] @@ -409,6 +409,22 @@ impl LexOrdering { .map(PhysicalSortExpr::from) .collect() } + + /// Collapse a `LexOrdering` into a new duplicate-free `LexOrdering` based on expression. + /// + /// This function filters duplicate entries that have same physical + /// expression inside, ignoring [`SortOptions`]. For example: + /// + /// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`. + pub fn collapse(self) -> Self { + let mut output = LexOrdering::default(); + for item in self { + if !output.iter().any(|req| req.expr.eq(&item.expr)) { + output.push(item); + } + } + output + } } impl From> for LexOrdering { @@ -540,6 +556,21 @@ impl LexRequirement { .collect(), ) } + + /// Constructs a duplicate-free `LexOrderingReq` by filtering out + /// duplicate entries that have same physical expression inside. + /// + /// For example, `vec![a Some(ASC), a Some(DESC)]` collapses to `vec![a + /// Some(ASC)]`. + pub fn collapse(self) -> Self { + let mut output = Vec::::new(); + for item in self { + if !output.iter().any(|req| req.expr.eq(&item.expr)) { + output.push(item); + } + } + LexRequirement::new(output) + } } impl From for LexRequirement { diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index cb11409479a8..5c749a1a5a6e 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping}; +use super::{add_offset_to_expr, ProjectionMapping}; use crate::{ expressions::Column, LexOrdering, LexRequirement, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, PhysicalSortRequirement, @@ -526,12 +526,13 @@ impl EquivalenceGroup { &self, sort_reqs: &LexRequirement, ) -> LexRequirement { - collapse_lex_req(LexRequirement::new( + LexRequirement::new( sort_reqs .iter() .map(|sort_req| self.normalize_sort_requirement(sort_req.clone())) .collect(), - )) + ) + .collapse() } /// Projects `expr` according to the given projection mapping. diff --git a/datafusion/physical-expr/src/equivalence/mod.rs b/datafusion/physical-expr/src/equivalence/mod.rs index b50633d777f7..a5b85064e625 100644 --- a/datafusion/physical-expr/src/equivalence/mod.rs +++ b/datafusion/physical-expr/src/equivalence/mod.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use crate::expressions::Column; -use crate::{LexRequirement, PhysicalExpr, PhysicalSortRequirement}; +use crate::{LexRequirement, PhysicalExpr}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; @@ -41,14 +41,9 @@ pub use properties::{ /// It will also filter out entries that are ordered if the next entry is; /// for instance, `vec![floor(a) Some(ASC), a Some(ASC)]` will be collapsed to /// `vec![a Some(ASC)]`. +#[deprecated(since = "45.0.0", note = "Use LexRequirement::collapse")] pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement { - let mut output = Vec::::new(); - for item in input { - if !output.iter().any(|req| req.expr.eq(&item.expr)) { - output.push(item); - } - } - LexRequirement::new(output) + input.collapse() } /// Adds the `offset` value to `Column` indices inside `expr`. This function is @@ -80,7 +75,9 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow_schema::{SchemaRef, SortOptions}; use datafusion_common::{plan_datafusion_err, Result}; - use datafusion_physical_expr_common::sort_expr::LexOrdering; + use datafusion_physical_expr_common::sort_expr::{ + LexOrdering, PhysicalSortRequirement, + }; pub fn output_schema( mapping: &ProjectionMapping, diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index 5dfa1b08f366..ae502d4d5f67 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -159,8 +159,13 @@ impl OrderingEquivalenceClass { /// Returns the concatenation of all the orderings. This enables merge /// operations to preserve all equivalent orderings simultaneously. pub fn output_ordering(&self) -> Option { - let output_ordering = self.orderings.iter().flatten().cloned().collect(); - let output_ordering = collapse_lex_ordering(output_ordering); + let output_ordering = self + .orderings + .iter() + .flatten() + .cloned() + .collect::() + .collapse(); (!output_ordering.is_empty()).then_some(output_ordering) } @@ -221,19 +226,6 @@ impl IntoIterator for OrderingEquivalenceClass { } } -/// This function constructs a duplicate-free `LexOrdering` by filtering out -/// duplicate entries that have same physical expression inside. For example, -/// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`. -pub fn collapse_lex_ordering(input: LexOrdering) -> LexOrdering { - let mut output = LexOrdering::default(); - for item in input.iter() { - if !output.iter().any(|req| req.expr.eq(&item.expr)) { - output.push(item.clone()); - } - } - output -} - /// Trims `orderings[idx]` if some suffix of it overlaps with a prefix of /// `orderings[pre_idx]`. Returns `true` if there is any overlap, `false` otherwise. fn resolve_overlap(orderings: &mut [LexOrdering], idx: usize, pre_idx: usize) -> bool { diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 4f440416c457..2c7335649b28 100755 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -22,11 +22,9 @@ use std::slice::Iter; use std::sync::Arc; use std::{fmt, mem}; -use super::ordering::collapse_lex_ordering; use crate::equivalence::class::{const_exprs_contains, AcrossPartitions}; use crate::equivalence::{ - collapse_lex_req, EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, - ProjectionMapping, + EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping, }; use crate::expressions::{with_new_schema, CastExpr, Column, Literal}; use crate::{ @@ -505,15 +503,12 @@ impl EquivalenceProperties { ); let constants_normalized = self.eq_group.normalize_exprs(constant_exprs); // Prune redundant sections in the requirement: - collapse_lex_req( - normalized_sort_reqs - .iter() - .filter(|&order| { - !physical_exprs_contains(&constants_normalized, &order.expr) - }) - .cloned() - .collect(), - ) + normalized_sort_reqs + .iter() + .filter(|&order| !physical_exprs_contains(&constants_normalized, &order.expr)) + .cloned() + .collect::() + .collapse() } /// Checks whether the given ordering is satisfied by any of the existing @@ -915,7 +910,7 @@ impl EquivalenceProperties { // Simplify each ordering by removing redundant sections: orderings .chain(projected_orderings) - .map(collapse_lex_ordering) + .map(|lex_ordering| lex_ordering.collapse()) .collect() } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 4812fa41347d..52fd6f90e595 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -44,10 +44,9 @@ use datafusion_execution::TaskContext; use datafusion_expr::{Accumulator, Aggregate}; use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr::{ - equivalence::{collapse_lex_req, ProjectionMapping}, - expressions::Column, - physical_exprs_contains, EquivalenceProperties, LexOrdering, LexRequirement, - PhysicalExpr, PhysicalSortRequirement, + equivalence::ProjectionMapping, expressions::Column, physical_exprs_contains, + EquivalenceProperties, LexOrdering, LexRequirement, PhysicalExpr, + PhysicalSortRequirement, }; use itertools::Itertools; @@ -473,7 +472,7 @@ impl AggregateExec { &mode, )?; new_requirement.inner.extend(req); - new_requirement = collapse_lex_req(new_requirement); + new_requirement = new_requirement.collapse(); // If our aggregation has grouping sets then our base grouping exprs will // be expanded based on the flags in `group_by.groups` where for each diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 36c4b9f18da9..510cbc248b63 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -32,7 +32,6 @@ use datafusion_expr::{ PartitionEvaluator, ReversedUDWF, WindowFrame, WindowFunctionDefinition, WindowUDF, }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; -use datafusion_physical_expr::equivalence::collapse_lex_req; use datafusion_physical_expr::{ reverse_order_bys, window::{SlidingAggregateWindowExpr, StandardWindowFunctionExpr}, @@ -469,8 +468,8 @@ pub fn get_window_mode( { let req = LexRequirement::new( [partition_by_reqs.inner.clone(), order_by_reqs.inner].concat(), - ); - let req = collapse_lex_req(req); + ) + .collapse(); if partition_by_eqs.ordering_satisfy_requirement(&req) { // Window can be run with existing ordering let mode = if indices.len() == partitionby_exprs.len() { diff --git a/datafusion/sql/src/unparser/dialect.rs b/datafusion/sql/src/unparser/dialect.rs index 3a44d7f0ec48..5c318a96ef6c 100644 --- a/datafusion/sql/src/unparser/dialect.rs +++ b/datafusion/sql/src/unparser/dialect.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use arrow_schema::TimeUnit; use datafusion_common::Result; @@ -29,6 +29,9 @@ use sqlparser::{ use super::{utils::character_length_to_sql, utils::date_part_to_sql, Unparser}; +pub type ScalarFnToSqlHandler = + Box Result> + Send + Sync>; + /// `Dialect` to use for Unparsing /// /// The default dialect tries to avoid quoting identifiers unless necessary (e.g. `a` instead of `"a"`) @@ -150,6 +153,18 @@ pub trait Dialect: Send + Sync { Ok(None) } + /// Extends the dialect's default rules for unparsing scalar functions. + /// This is useful for supporting application-specific UDFs or custom engine extensions. + fn with_custom_scalar_overrides( + self, + _handlers: Vec<(&str, ScalarFnToSqlHandler)>, + ) -> Self + where + Self: Sized, + { + unimplemented!("Custom scalar overrides are not supported by this dialect yet"); + } + /// Allow to unparse a qualified column with a full qualified name /// (e.g. catalog_name.schema_name.table_name.column_name) /// Otherwise, the column will be unparsed with only the table name and column name @@ -305,7 +320,19 @@ impl PostgreSqlDialect { } } -pub struct DuckDBDialect {} +#[derive(Default)] +pub struct DuckDBDialect { + custom_scalar_fn_overrides: HashMap, +} + +impl DuckDBDialect { + #[must_use] + pub fn new() -> Self { + Self { + custom_scalar_fn_overrides: HashMap::new(), + } + } +} impl Dialect for DuckDBDialect { fn identifier_quote_style(&self, _: &str) -> Option { @@ -320,12 +347,27 @@ impl Dialect for DuckDBDialect { BinaryOperator::DuckIntegerDivide } + fn with_custom_scalar_overrides( + mut self, + handlers: Vec<(&str, ScalarFnToSqlHandler)>, + ) -> Self { + for (func_name, handler) in handlers { + self.custom_scalar_fn_overrides + .insert(func_name.to_string(), handler); + } + self + } + fn scalar_function_to_sql_overrides( &self, unparser: &Unparser, func_name: &str, args: &[Expr], ) -> Result> { + if let Some(handler) = self.custom_scalar_fn_overrides.get(func_name) { + return handler(unparser, args); + } + if func_name == "character_length" { return character_length_to_sql( unparser, diff --git a/datafusion/sql/src/unparser/expr.rs b/datafusion/sql/src/unparser/expr.rs index d012d3437720..7a110fd0785c 100644 --- a/datafusion/sql/src/unparser/expr.rs +++ b/datafusion/sql/src/unparser/expr.rs @@ -1636,7 +1636,7 @@ mod tests { use crate::unparser::dialect::{ CharacterLengthStyle, CustomDialect, CustomDialectBuilder, DateFieldExtractStyle, - Dialect, PostgreSqlDialect, + Dialect, DuckDBDialect, PostgreSqlDialect, ScalarFnToSqlHandler, }; use super::*; @@ -2722,4 +2722,28 @@ mod tests { Ok(()) } + + #[test] + fn test_custom_scalar_overrides_duckdb() -> Result<()> { + let duckdb_default = DuckDBDialect::new(); + let duckdb_extended = DuckDBDialect::new().with_custom_scalar_overrides(vec![( + "dummy_udf", + Box::new(|unparser: &Unparser, args: &[Expr]| { + unparser.scalar_function_to_sql("smart_udf", args).map(Some) + }) as ScalarFnToSqlHandler, + )]); + + for (dialect, expected) in [ + (duckdb_default, r#"dummy_udf("a", "b")"#), + (duckdb_extended, r#"smart_udf("a", "b")"#), + ] { + let unparser = Unparser::new(&dialect); + let expr = + ScalarUDF::new_from_impl(DummyUDF::new()).call(vec![col("a"), col("b")]); + let actual = format!("{}", unparser.expr_to_sql(&expr)?); + assert_eq!(actual, expected); + } + + Ok(()) + } } diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 3c6f0f6deba1..2c82df969f1f 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -1767,7 +1767,7 @@ logical_plan 01)Projection: count(*) AS global_count 02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] 03)----SubqueryAlias: a -04)------Projection: +04)------Projection: 05)--------Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[]] 06)----------Projection: aggregate_test_100.c1 07)------------Filter: aggregate_test_100.c13 != Utf8("C2GT5KVyOPZpgKVl110TyZO0NcJ434") @@ -4625,6 +4625,38 @@ NULL 1 statement ok DROP TABLE t; + +# Test for ignore nulls in nth_VALUE without null values +statement ok +CREATE TABLE t AS VALUES (3, 3), (4, 4), (5, 5), (6, 6); + +query I +SELECT column1 FROM t ORDER BY column2; +---- +3 +4 +5 +6 + +query I +SELECT nth_VALUE(column1, 1) OVER(ORDER BY column2) FROM t; +---- +3 +3 +3 +3 + +query I +SELECT nth_VALUE(column1, 1) IGNORE NULLS OVER(ORDER BY column2) FROM t; +---- +3 +3 +3 +3 + +statement ok +DROP TABLE t; + # Test for ignore nulls with ORDER BY in nth_VALUE statement ok CREATE TABLE t AS VALUES (3, 3), (4, 4), (null::bigint, 1), (null::bigint, 2), (5, 5), (6, 6); @@ -5055,7 +5087,7 @@ select b, row_number() over (order by a) from (select TRUE as a, 1 as b); # test window functions on boolean columns statement count 0 -create table t1 (id int, bool_col boolean) as values +create table t1 (id int, bool_col boolean) as values (1, true), (2, false), (3, true), @@ -5110,7 +5142,7 @@ select ntile(2) over (order by bool_col) from t1; 2 query IIIRRI -select +select row_number() over (order by bool_col) as row_num, rank() over (order by bool_col) as rank, dense_rank() over (order by bool_col) as dense_rank, diff --git a/datafusion/substrait/Cargo.toml b/datafusion/substrait/Cargo.toml index 60e35cc966d2..a60b70a963e9 100644 --- a/datafusion/substrait/Cargo.toml +++ b/datafusion/substrait/Cargo.toml @@ -54,3 +54,10 @@ tokio = { workspace = true } default = ["physical"] physical = ["datafusion/parquet"] protoc = ["substrait/protoc"] + +[package.metadata.docs.rs] +# Use default features ("physical") for docs, plus "protoc". "protoc" is needed +# to get a consistent version of the protobuf compiler in the docs build; +# without that, an outdated protobuf compiler may fail to compile the protobuf +# files as it did in versions 42.0.0 through 44.0.0. +all-features = true diff --git a/datafusion/wasmtest/Cargo.toml b/datafusion/wasmtest/Cargo.toml index 8520b8d02da5..aae66e6b9a97 100644 --- a/datafusion/wasmtest/Cargo.toml +++ b/datafusion/wasmtest/Cargo.toml @@ -62,9 +62,9 @@ datafusion-sql = { workspace = true } getrandom = { version = "0.2.8", features = ["js"] } parquet = { workspace = true } -wasm-bindgen = "0.2.87" -wasm-bindgen-futures = "0.4.40" +wasm-bindgen = "0.2.99" +wasm-bindgen-futures = "0.4.49" [dev-dependencies] tokio = { workspace = true } -wasm-bindgen-test = "0.3.44" +wasm-bindgen-test = "0.3.49"