diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 50bebc5b40947..2ed965c332c79 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -80,9 +80,12 @@ jobs: - name: Check datafusion-common without default features run: cargo check --all-targets --no-default-features -p datafusion-common - - name: Check datafusion-functions + - name: Check datafusion-functions without default features run: cargo check --all-targets --no-default-features -p datafusion-functions + - name: Check datafusion-substrait without default features + run: cargo check --all-targets --no-default-features -p datafusion-substrait + - name: Check workspace in debug mode run: cargo check --all-targets --workspace @@ -582,9 +585,9 @@ jobs: # # To reproduce: # 1. Install the version of Rust that is failing. Example: - # rustup install 1.79.0 + # rustup install 1.80.0 # 2. Run the command that failed with that version. Example: - # cargo +1.79.0 check -p datafusion + # cargo +1.80.0 check -p datafusion # # To resolve, either: # 1. Change your code to use older Rust features, @@ -603,4 +606,4 @@ jobs: run: cargo msrv --output-format json --log-target stdout verify - name: Check datafusion-cli working-directory: datafusion-cli - run: cargo msrv --output-format json --log-target stdout verify \ No newline at end of file + run: cargo msrv --output-format json --log-target stdout verify diff --git a/Cargo.toml b/Cargo.toml index 1ca6cdfdb12d4..651fe77049e08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,7 @@ homepage = "https://datafusion.apache.org" license = "Apache-2.0" readme = "README.md" repository = "https://github.com/apache/datafusion" -rust-version = "1.79" +rust-version = "1.80" version = "43.0.0" [workspace.dependencies] diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 1fa99185a5a08..6c0f1e993a9d9 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1361,7 +1361,6 @@ dependencies = [ "datafusion-functions-aggregate-common", "datafusion-functions-window-common", "datafusion-physical-expr-common", - "indexmap", "paste", "recursive", "serde_json", @@ -1546,7 +1545,6 @@ dependencies = [ "datafusion-physical-expr-common", "half", "hashbrown 0.14.5", - "indexmap", "itertools", "log", "paste", @@ -1603,7 +1601,6 @@ dependencies = [ "futures", "half", "hashbrown 0.14.5", - "indexmap", "itertools", "log", "once_cell", @@ -1622,7 +1619,6 @@ dependencies = [ "arrow-schema", "datafusion-common", "datafusion-expr", - "indexmap", "log", "recursive", "regex", diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 743ec1b4a749b..8e0ae01457de5 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -26,7 +26,7 @@ license = "Apache-2.0" homepage = "https://datafusion.apache.org" repository = "https://github.com/apache/datafusion" # Specify MSRV here as `cargo msrv` doesn't support workspace version -rust-version = "1.79" +rust-version = "1.80" readme = "README.md" [dependencies] diff --git a/datafusion-cli/Dockerfile b/datafusion-cli/Dockerfile index 79c24f6baf3ef..faf345660dbea 100644 --- a/datafusion-cli/Dockerfile +++ b/datafusion-cli/Dockerfile @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -FROM rust:1.79-bookworm AS builder +FROM rust:1.80-bookworm AS builder COPY . /usr/src/datafusion COPY ./datafusion /usr/src/datafusion/datafusion diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index 3d999766e03fb..de66b60fe4499 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -32,7 +32,7 @@ use aws_credential_types::provider::ProvideCredentials; use object_store::aws::{AmazonS3Builder, AwsCredential}; use object_store::gcp::GoogleCloudStorageBuilder; use object_store::http::HttpBuilder; -use object_store::{CredentialProvider, ObjectStore}; +use object_store::{ClientOptions, CredentialProvider, ObjectStore}; use url::Url; pub async fn get_s3_object_store_builder( @@ -437,6 +437,7 @@ pub(crate) async fn get_object_store( } "http" | "https" => Arc::new( HttpBuilder::new() + .with_client_options(ClientOptions::new().with_allow_http(true)) .with_url(url.origin().ascii_serialization()) .build()?, ), diff --git a/datafusion/common/src/cse.rs b/datafusion/common/src/cse.rs index ab02915858cd2..bb9abd5399081 100644 --- a/datafusion/common/src/cse.rs +++ b/datafusion/common/src/cse.rs @@ -24,8 +24,8 @@ use crate::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor, }; +use crate::IndexMap; use crate::Result; -use indexmap::IndexMap; use std::collections::HashMap; use std::hash::{BuildHasher, Hash, Hasher, RandomState}; use std::marker::PhantomData; diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 45620c3cacc8a..b5f7b5681eefc 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -1382,14 +1382,14 @@ mod tests { // Succeeds if both have the same element type, disregards names and nullability assert!(DFSchema::datatype_is_logically_equal( - &DataType::List(Field::new("item", DataType::Int8, true).into()), + &DataType::List(Field::new_list_field(DataType::Int8, true).into()), &DataType::List(Field::new("element", DataType::Int8, false).into()) )); // Fails if element type is different assert!(!DFSchema::datatype_is_logically_equal( - &DataType::List(Field::new("item", DataType::Int8, true).into()), - &DataType::List(Field::new("item", DataType::Int16, true).into()) + &DataType::List(Field::new_list_field(DataType::Int8, true).into()), + &DataType::List(Field::new_list_field(DataType::Int16, true).into()) )); // Test maps @@ -1522,14 +1522,14 @@ mod tests { // Succeeds if both have the same element type, disregards names and nullability assert!(DFSchema::datatype_is_semantically_equal( - &DataType::List(Field::new("item", DataType::Int8, true).into()), + &DataType::List(Field::new_list_field(DataType::Int8, true).into()), &DataType::List(Field::new("element", DataType::Int8, false).into()) )); // Fails if element type is different assert!(!DFSchema::datatype_is_semantically_equal( - &DataType::List(Field::new("item", DataType::Int8, true).into()), - &DataType::List(Field::new("item", DataType::Int16, true).into()) + &DataType::List(Field::new_list_field(DataType::Int8, true).into()), + &DataType::List(Field::new_list_field(DataType::Int16, true).into()) )); // Test maps diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 77e8cd60ede24..ad3b6ab0d1715 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -73,6 +73,7 @@ pub use param_value::ParamValues; pub use scalar::{ScalarType, ScalarValue}; pub use schema_reference::SchemaReference; pub use stats::{ColumnStatistics, Statistics}; +use std::hash::RandomState; pub use table_reference::{ResolvedTableReference, TableReference}; pub use unnest::{RecursionUnnestOption, UnnestOptions}; pub use utils::project_schema; @@ -93,6 +94,9 @@ pub use error::{ pub type HashMap = hashbrown::HashMap; pub type HashSet = hashbrown::HashSet; +pub type IndexMap = indexmap::IndexMap; +pub type IndexSet = indexmap::IndexSet; + /// Downcast an Arrow Array to a concrete type, return an `DataFusionError::Internal` if the cast is /// not possible. In normal usage of DataFusion the downcast should always succeed. /// diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index edba0b84431fb..bb62ea441438c 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -39,11 +39,7 @@ use crate::cast::{ }; use crate::error::{DataFusionError, Result, _exec_err, _internal_err, _not_impl_err}; use crate::hash_utils::create_hashes; -use crate::utils::{ - array_into_fixed_size_list_array_with_field_name, array_into_large_list_array, - array_into_large_list_array_with_field_name, array_into_list_array, - array_into_list_array_with_field_name, -}; +use crate::utils::SingleRowListArrayBuilder; use arrow::compute::kernels::numeric::*; use arrow::util::display::{array_value_to_string, ArrayFormatter, FormatOptions}; use arrow::{ @@ -2092,7 +2088,11 @@ impl ScalarValue { } else { Self::iter_to_array(values.iter().cloned()).unwrap() }; - Arc::new(array_into_list_array(values, nullable)) + Arc::new( + SingleRowListArrayBuilder::new(values) + .with_nullable(nullable) + .build_list_array(), + ) } /// Same as [`ScalarValue::new_list`] but with nullable set to true. @@ -2148,7 +2148,11 @@ impl ScalarValue { } else { Self::iter_to_array(values).unwrap() }; - Arc::new(array_into_list_array(values, nullable)) + Arc::new( + SingleRowListArrayBuilder::new(values) + .with_nullable(nullable) + .build_list_array(), + ) } /// Converts `Vec` where each element has type corresponding to @@ -2185,7 +2189,7 @@ impl ScalarValue { } else { Self::iter_to_array(values.iter().cloned()).unwrap() }; - Arc::new(array_into_large_list_array(values)) + Arc::new(SingleRowListArrayBuilder::new(values).build_large_list_array()) } /// Converts a scalar value into an array of `size` rows. @@ -2665,38 +2669,27 @@ impl ScalarValue { let list_array = array.as_list::(); let nested_array = list_array.value(index); // Produces a single element `ListArray` with the value at `index`. - let arr = Arc::new(array_into_list_array_with_field_name( - nested_array, - field.is_nullable(), - field.name(), - )); - - ScalarValue::List(arr) + SingleRowListArrayBuilder::new(nested_array) + .with_field(field) + .build_list_scalar() } DataType::LargeList(field) => { let list_array = as_large_list_array(array); let nested_array = list_array.value(index); // Produces a single element `LargeListArray` with the value at `index`. - let arr = Arc::new(array_into_large_list_array_with_field_name( - nested_array, - field.name(), - )); - - ScalarValue::LargeList(arr) + SingleRowListArrayBuilder::new(nested_array) + .with_field(field) + .build_large_list_scalar() } // TODO: There is no test for FixedSizeList now, add it later DataType::FixedSizeList(field, _) => { let list_array = as_fixed_size_list_array(array)?; let nested_array = list_array.value(index); - // Produces a single element `ListArray` with the value at `index`. + // Produces a single element `FixedSizeListArray` with the value at `index`. let list_size = nested_array.len(); - let arr = Arc::new(array_into_fixed_size_list_array_with_field_name( - nested_array, - list_size, - field.name(), - )); - - ScalarValue::FixedSizeList(arr) + SingleRowListArrayBuilder::new(nested_array) + .with_field(field) + .build_fixed_size_list_scalar(list_size) } DataType::Date32 => typed_cast!(array, index, Date32Array, Date32)?, DataType::Date64 => typed_cast!(array, index, Date64Array, Date64)?, @@ -3895,7 +3888,6 @@ mod tests { }; use crate::assert_batches_eq; - use crate::utils::array_into_list_array_nullable; use arrow::buffer::OffsetBuffer; use arrow::compute::{is_null, kernels}; use arrow::error::ArrowError; @@ -4037,7 +4029,7 @@ mod tests { #[test] fn test_to_array_of_size_for_fsl() { let values = Int32Array::from_iter([Some(1), None, Some(2)]); - let field = Arc::new(Field::new("item", DataType::Int32, true)); + let field = Arc::new(Field::new_list_field(DataType::Int32, true)); let arr = FixedSizeListArray::new(Arc::clone(&field), 3, Arc::new(values), None); let sv = ScalarValue::FixedSizeList(Arc::new(arr)); let actual_arr = sv @@ -4071,14 +4063,15 @@ mod tests { let result = ScalarValue::new_list_nullable(scalars.as_slice(), &DataType::Utf8); - let expected = array_into_list_array_nullable(Arc::new(StringArray::from(vec![ - "rust", - "arrow", - "data-fusion", - ]))); + let expected = single_row_list_array(vec!["rust", "arrow", "data-fusion"]); assert_eq!(*result, expected); } + fn single_row_list_array(items: Vec<&str>) -> ListArray { + SingleRowListArrayBuilder::new(Arc::new(StringArray::from(items))) + .build_list_array() + } + fn build_list( values: Vec>>>, ) -> Vec { @@ -4093,8 +4086,7 @@ mod tests { ) } else if O::IS_LARGE { new_null_array( - &DataType::LargeList(Arc::new(Field::new( - "item", + &DataType::LargeList(Arc::new(Field::new_list_field( DataType::Int64, true, ))), @@ -4102,8 +4094,7 @@ mod tests { ) } else { new_null_array( - &DataType::List(Arc::new(Field::new( - "item", + &DataType::List(Arc::new(Field::new_list_field( DataType::Int64, true, ))), @@ -4122,7 +4113,7 @@ mod tests { #[test] fn test_iter_to_array_fixed_size_list() { - let field = Arc::new(Field::new("item", DataType::Int32, true)); + let field = Arc::new(Field::new_list_field(DataType::Int32, true)); let f1 = Arc::new(FixedSizeListArray::new( Arc::clone(&field), 3, @@ -4283,12 +4274,8 @@ mod tests { #[test] fn iter_to_array_string_test() { - let arr1 = array_into_list_array_nullable(Arc::new(StringArray::from(vec![ - "foo", "bar", "baz", - ]))); - let arr2 = array_into_list_array_nullable(Arc::new(StringArray::from(vec![ - "rust", "world", - ]))); + let arr1 = single_row_list_array(vec!["foo", "bar", "baz"]); + let arr2 = single_row_list_array(vec!["rust", "world"]); let scalars = vec![ ScalarValue::List(Arc::new(arr1)), @@ -4961,7 +4948,7 @@ mod tests { let null_list_scalar = ScalarValue::try_from_array(&list, 1).unwrap(); let data_type = - DataType::List(Arc::new(Field::new("item", DataType::Int32, true))); + DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))); assert_eq!(non_null_list_scalar.data_type(), data_type); assert_eq!(null_list_scalar.data_type(), data_type); @@ -4969,7 +4956,7 @@ mod tests { #[test] fn scalar_try_from_list_datatypes() { - let inner_field = Arc::new(Field::new("item", DataType::Int32, true)); + let inner_field = Arc::new(Field::new_list_field(DataType::Int32, true)); // Test for List let data_type = &DataType::List(Arc::clone(&inner_field)); @@ -5010,9 +4997,8 @@ mod tests { #[test] fn scalar_try_from_list_of_list() { - let data_type = DataType::List(Arc::new(Field::new( - "item", - DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + let data_type = DataType::List(Arc::new(Field::new_list_field( + DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))), true, ))); let data_type = &data_type; @@ -5020,9 +5006,11 @@ mod tests { let expected = ScalarValue::List( new_null_array( - &DataType::List(Arc::new(Field::new( - "item", - DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + &DataType::List(Arc::new(Field::new_list_field( + DataType::List(Arc::new(Field::new_list_field( + DataType::Int32, + true, + ))), true, ))), 1, @@ -5038,13 +5026,12 @@ mod tests { #[test] fn scalar_try_from_not_equal_list_nested_list() { let list_data_type = - DataType::List(Arc::new(Field::new("item", DataType::Int32, true))); + DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))); let data_type = &list_data_type; let list_scalar: ScalarValue = data_type.try_into().unwrap(); - let nested_list_data_type = DataType::List(Arc::new(Field::new( - "item", - DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + let nested_list_data_type = DataType::List(Arc::new(Field::new_list_field( + DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))), true, ))); let data_type = &nested_list_data_type; @@ -5678,7 +5665,7 @@ mod tests { let field_a = Arc::new(Field::new("A", DataType::Utf8, false)); let field_primitive_list = Arc::new(Field::new( "primitive_list", - DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))), false, )); @@ -5745,13 +5732,13 @@ mod tests { // Define list-of-structs scalars let nl0_array = ScalarValue::iter_to_array(vec![s0, s1.clone()]).unwrap(); - let nl0 = ScalarValue::List(Arc::new(array_into_list_array_nullable(nl0_array))); + let nl0 = SingleRowListArrayBuilder::new(nl0_array).build_list_scalar(); let nl1_array = ScalarValue::iter_to_array(vec![s2]).unwrap(); - let nl1 = ScalarValue::List(Arc::new(array_into_list_array_nullable(nl1_array))); + let nl1 = SingleRowListArrayBuilder::new(nl1_array).build_list_scalar(); let nl2_array = ScalarValue::iter_to_array(vec![s1]).unwrap(); - let nl2 = ScalarValue::List(Arc::new(array_into_list_array_nullable(nl2_array))); + let nl2 = SingleRowListArrayBuilder::new(nl2_array).build_list_scalar(); // iter_to_array for list-of-struct let array = ScalarValue::iter_to_array(vec![nl0, nl1, nl2]).unwrap(); @@ -5879,9 +5866,8 @@ mod tests { fn build_2d_list(data: Vec>) -> ListArray { let a1 = ListArray::from_iter_primitive::(vec![Some(data)]); ListArray::new( - Arc::new(Field::new( - "item", - DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + Arc::new(Field::new_list_field( + DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))), true, )), OffsetBuffer::::from_lengths([1]), @@ -6876,8 +6862,7 @@ mod tests { assert_eq!(1, arr.len()); assert_eq!( arr.data_type(), - &DataType::List(Arc::new(Field::new( - "item", + &DataType::List(Arc::new(Field::new_list_field( DataType::Timestamp(TimeUnit::Millisecond, Some(s.into())), true, ))) diff --git a/datafusion/common/src/utils/mod.rs b/datafusion/common/src/utils/mod.rs index f3bba8e254d98..5e840f8594009 100644 --- a/datafusion/common/src/utils/mod.rs +++ b/datafusion/common/src/utils/mod.rs @@ -321,89 +321,201 @@ pub fn longest_consecutive_prefix>( count } +/// Creates single element [`ListArray`], [`LargeListArray`] and +/// [`FixedSizeListArray`] from other arrays +/// +/// For example this builder can convert `[1, 2, 3]` into `[[1, 2, 3]]` +/// +/// # Example +/// ``` +/// # use std::sync::Arc; +/// # use arrow_array::{Array, ListArray}; +/// # use arrow_array::types::Int64Type; +/// # use datafusion_common::utils::SingleRowListArrayBuilder; +/// // Array is [1, 2, 3] +/// let arr = ListArray::from_iter_primitive::(vec![ +/// Some(vec![Some(1), Some(2), Some(3)]), +/// ]); +/// // Wrap as a list array: [[1, 2, 3]] +/// let list_arr = SingleRowListArrayBuilder::new(Arc::new(arr)).build_list_array(); +/// assert_eq!(list_arr.len(), 1); +/// ``` +#[derive(Debug, Clone)] +pub struct SingleRowListArrayBuilder { + /// array to be wrapped + arr: ArrayRef, + /// Should the resulting array be nullable? Defaults to `true`. + nullable: bool, + /// Specify the field name for the resulting array. Defaults to value used in + /// [`Field::new_list_field`] + field_name: Option, +} + +impl SingleRowListArrayBuilder { + /// Create a new instance of [`SingleRowListArrayBuilder`] + pub fn new(arr: ArrayRef) -> Self { + Self { + arr, + nullable: true, + field_name: None, + } + } + + /// Set the nullable flag + pub fn with_nullable(mut self, nullable: bool) -> Self { + self.nullable = nullable; + self + } + + /// sets the field name for the resulting array + pub fn with_field_name(mut self, field_name: Option) -> Self { + self.field_name = field_name; + self + } + + /// Copies field name and nullable from the specified field + pub fn with_field(self, field: &Field) -> Self { + self.with_field_name(Some(field.name().to_owned())) + .with_nullable(field.is_nullable()) + } + + /// Build a single element [`ListArray`] + pub fn build_list_array(self) -> ListArray { + let (field, arr) = self.into_field_and_arr(); + let offsets = OffsetBuffer::from_lengths([arr.len()]); + ListArray::new(field, offsets, arr, None) + } + + /// Build a single element [`ListArray`] and wrap as [`ScalarValue::List`] + pub fn build_list_scalar(self) -> ScalarValue { + ScalarValue::List(Arc::new(self.build_list_array())) + } + + /// Build a single element [`LargeListArray`] + pub fn build_large_list_array(self) -> LargeListArray { + let (field, arr) = self.into_field_and_arr(); + let offsets = OffsetBuffer::from_lengths([arr.len()]); + LargeListArray::new(field, offsets, arr, None) + } + + /// Build a single element [`LargeListArray`] and wrap as [`ScalarValue::LargeList`] + pub fn build_large_list_scalar(self) -> ScalarValue { + ScalarValue::LargeList(Arc::new(self.build_large_list_array())) + } + + /// Build a single element [`FixedSizeListArray`] + pub fn build_fixed_size_list_array(self, list_size: usize) -> FixedSizeListArray { + let (field, arr) = self.into_field_and_arr(); + FixedSizeListArray::new(field, list_size as i32, arr, None) + } + + /// Build a single element [`FixedSizeListArray`] and wrap as [`ScalarValue::FixedSizeList`] + pub fn build_fixed_size_list_scalar(self, list_size: usize) -> ScalarValue { + ScalarValue::FixedSizeList(Arc::new(self.build_fixed_size_list_array(list_size))) + } + + /// Helper function: convert this builder into a tuple of field and array + fn into_field_and_arr(self) -> (Arc, ArrayRef) { + let Self { + arr, + nullable, + field_name, + } = self; + let data_type = arr.data_type().to_owned(); + let field = match field_name { + Some(name) => Field::new(name, data_type, nullable), + None => Field::new_list_field(data_type, nullable), + }; + (Arc::new(field), arr) + } +} + /// Wrap an array into a single element `ListArray`. /// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]` /// The field in the list array is nullable. +#[deprecated( + since = "44.0.0", + note = "please use `SingleRowListArrayBuilder` instead" +)] pub fn array_into_list_array_nullable(arr: ArrayRef) -> ListArray { - array_into_list_array(arr, true) + SingleRowListArrayBuilder::new(arr) + .with_nullable(true) + .build_list_array() } /// Wrap an array into a single element `ListArray`. /// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]` +#[deprecated( + since = "44.0.0", + note = "please use `SingleRowListArrayBuilder` instead" +)] pub fn array_into_list_array(arr: ArrayRef, nullable: bool) -> ListArray { - let offsets = OffsetBuffer::from_lengths([arr.len()]); - ListArray::new( - Arc::new(Field::new_list_field(arr.data_type().to_owned(), nullable)), - offsets, - arr, - None, - ) + SingleRowListArrayBuilder::new(arr) + .with_nullable(nullable) + .build_list_array() } +#[deprecated( + since = "44.0.0", + note = "please use `SingleRowListArrayBuilder` instead" +)] pub fn array_into_list_array_with_field_name( arr: ArrayRef, nullable: bool, field_name: &str, ) -> ListArray { - let offsets = OffsetBuffer::from_lengths([arr.len()]); - ListArray::new( - Arc::new(Field::new(field_name, arr.data_type().to_owned(), nullable)), - offsets, - arr, - None, - ) + SingleRowListArrayBuilder::new(arr) + .with_nullable(nullable) + .with_field_name(Some(field_name.to_string())) + .build_list_array() } /// Wrap an array into a single element `LargeListArray`. /// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]` +#[deprecated( + since = "44.0.0", + note = "please use `SingleRowListArrayBuilder` instead" +)] pub fn array_into_large_list_array(arr: ArrayRef) -> LargeListArray { - let offsets = OffsetBuffer::from_lengths([arr.len()]); - LargeListArray::new( - Arc::new(Field::new_list_field(arr.data_type().to_owned(), true)), - offsets, - arr, - None, - ) + SingleRowListArrayBuilder::new(arr).build_large_list_array() } +#[deprecated( + since = "44.0.0", + note = "please use `SingleRowListArrayBuilder` instead" +)] pub fn array_into_large_list_array_with_field_name( arr: ArrayRef, field_name: &str, ) -> LargeListArray { - let offsets = OffsetBuffer::from_lengths([arr.len()]); - LargeListArray::new( - Arc::new(Field::new(field_name, arr.data_type().to_owned(), true)), - offsets, - arr, - None, - ) + SingleRowListArrayBuilder::new(arr) + .with_field_name(Some(field_name.to_string())) + .build_large_list_array() } +#[deprecated( + since = "44.0.0", + note = "please use `SingleRowListArrayBuilder` instead" +)] pub fn array_into_fixed_size_list_array( arr: ArrayRef, list_size: usize, ) -> FixedSizeListArray { - let list_size = list_size as i32; - FixedSizeListArray::new( - Arc::new(Field::new_list_field(arr.data_type().to_owned(), true)), - list_size, - arr, - None, - ) + SingleRowListArrayBuilder::new(arr).build_fixed_size_list_array(list_size) } +#[deprecated( + since = "44.0.0", + note = "please use `SingleRowListArrayBuilder` instead" +)] pub fn array_into_fixed_size_list_array_with_field_name( arr: ArrayRef, list_size: usize, field_name: &str, ) -> FixedSizeListArray { - let list_size = list_size as i32; - FixedSizeListArray::new( - Arc::new(Field::new(field_name, arr.data_type().to_owned(), true)), - list_size, - arr, - None, - ) + SingleRowListArrayBuilder::new(arr) + .with_field_name(Some(field_name.to_string())) + .build_fixed_size_list_array(list_size) } /// Wrap arrays into a single element `ListArray`. diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 6c5a31e3624af..90b8abc622605 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -30,7 +30,7 @@ authors = { workspace = true } # Specify MSRV here as `cargo msrv` doesn't support workspace version and fails with # "Unable to find key 'package.rust-version' (or 'package.metadata.msrv') in 'arrow-datafusion/Cargo.toml'" # https://github.com/foresterre/cargo-msrv/issues/590 -rust-version = "1.79" +rust-version = "1.80" [lints] workspace = true diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index 1f10cb244e83c..7b1f349e15b5a 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -36,7 +36,7 @@ async fn csv_query_array_agg_distinct() -> Result<()> { *actual[0].schema(), Schema::new(vec![Field::new_list( "array_agg(DISTINCT aggregate_test_100.c2)", - Field::new("item", DataType::UInt32, true), + Field::new_list_field(DataType::UInt32, true), true ),]) ); diff --git a/datafusion/expr-common/src/type_coercion/binary.rs b/datafusion/expr-common/src/type_coercion/binary.rs index 4b2f3b5e46b58..7a6e9841e22c8 100644 --- a/datafusion/expr-common/src/type_coercion/binary.rs +++ b/datafusion/expr-common/src/type_coercion/binary.rs @@ -2098,7 +2098,7 @@ mod tests { ); // list - let inner_field = Arc::new(Field::new("item", DataType::Int64, true)); + let inner_field = Arc::new(Field::new_list_field(DataType::Int64, true)); test_coercion_binary_rule!( DataType::List(Arc::clone(&inner_field)), DataType::List(Arc::clone(&inner_field)), @@ -2155,8 +2155,7 @@ mod tests { ); // Negative test: inner_timestamp_field and inner_field are not compatible because their inner types are not compatible - let inner_timestamp_field = Arc::new(Field::new( - "item", + let inner_timestamp_field = Arc::new(Field::new_list_field( DataType::Timestamp(TimeUnit::Microsecond, None), true, )); diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index 438662e0642b0..772a57ac93d3f 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -49,7 +49,6 @@ datafusion-expr-common = { workspace = true } datafusion-functions-aggregate-common = { workspace = true } datafusion-functions-window-common = { workspace = true } datafusion-physical-expr-common = { workspace = true } -indexmap = { workspace = true } paste = "^1.0" recursive = { workspace = true } serde_json = { workspace = true } diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 90235e3f84c48..a44219be4385d 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -54,11 +54,10 @@ use datafusion_common::file_options::file_type::FileType; use datafusion_common::{ exec_err, get_target_functional_dependencies, internal_err, not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, - FunctionalDependencies, Result, ScalarValue, TableReference, ToDFSchema, + FunctionalDependencies, IndexSet, Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions, }; use datafusion_expr_common::type_coercion::binary::type_union_resolution; -use indexmap::IndexSet; /// Default table name for unnamed table pub const UNNAMED_TABLE: &str = "?table?"; @@ -2432,7 +2431,7 @@ mod tests { ], false, ); - let string_field = Field::new("item", DataType::Utf8, false); + let string_field = Field::new_list_field(DataType::Utf8, false); let strings_field = Field::new_list("item", string_field.clone(), false); let schema = Schema::new(vec![ Field::new("scalar", DataType::UInt32, false), diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index fe725e7d96ded..a1875012eea7f 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -51,10 +51,9 @@ use datafusion_common::tree_node::{ use datafusion_common::{ aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence, - FunctionalDependencies, ParamValues, Result, ScalarValue, TableReference, + FunctionalDependencies, IndexSet, ParamValues, Result, ScalarValue, TableReference, UnnestOptions, }; -use indexmap::IndexSet; // backwards compatibility use crate::display::PgJsonVisitor; diff --git a/datafusion/expr/src/type_coercion/functions.rs b/datafusion/expr/src/type_coercion/functions.rs index 5f52c7ccc20ef..9d15d96939921 100644 --- a/datafusion/expr/src/type_coercion/functions.rs +++ b/datafusion/expr/src/type_coercion/functions.rs @@ -64,7 +64,12 @@ pub fn data_types_with_scalar_udf( return Ok(current_types.to_vec()); } - try_coerce_types(valid_types, current_types, &signature.type_signature) + try_coerce_types( + func.name(), + valid_types, + current_types, + &signature.type_signature, + ) } /// Performs type coercion for aggregate function arguments. @@ -100,7 +105,12 @@ pub fn data_types_with_aggregate_udf( return Ok(current_types.to_vec()); } - try_coerce_types(valid_types, current_types, &signature.type_signature) + try_coerce_types( + func.name(), + valid_types, + current_types, + &signature.type_signature, + ) } /// Performs type coercion for window function arguments. @@ -133,7 +143,12 @@ pub fn data_types_with_window_udf( return Ok(current_types.to_vec()); } - try_coerce_types(valid_types, current_types, &signature.type_signature) + try_coerce_types( + func.name(), + valid_types, + current_types, + &signature.type_signature, + ) } /// Performs type coercion for function arguments. @@ -144,6 +159,7 @@ pub fn data_types_with_window_udf( /// For more details on coercion in general, please see the /// [`type_coercion`](crate::type_coercion) module. pub fn data_types( + function_name: impl AsRef, current_types: &[DataType], signature: &Signature, ) -> Result> { @@ -166,7 +182,12 @@ pub fn data_types( return Ok(current_types.to_vec()); } - try_coerce_types(valid_types, current_types, &signature.type_signature) + try_coerce_types( + function_name, + valid_types, + current_types, + &signature.type_signature, + ) } fn is_well_supported_signature(type_signature: &TypeSignature) -> bool { @@ -187,6 +208,7 @@ fn is_well_supported_signature(type_signature: &TypeSignature) -> bool { } fn try_coerce_types( + function_name: impl AsRef, valid_types: Vec>, current_types: &[DataType], type_signature: &TypeSignature, @@ -218,7 +240,8 @@ fn try_coerce_types( // none possible -> Error plan_err!( - "Coercion from {:?} to the signature {:?} failed.", + "Failed to coerce arguments to satisfy a call to {} function: coercion from {:?} to the signature {:?} failed.", + function_name.as_ref(), current_types, type_signature ) @@ -958,7 +981,7 @@ mod tests { #[test] fn test_fixed_list_wildcard_coerce() -> Result<()> { - let inner = Arc::new(Field::new("item", DataType::Int32, false)); + let inner = Arc::new(Field::new_list_field(DataType::Int32, false)); let current_types = vec![ DataType::FixedSizeList(Arc::clone(&inner), 2), // able to coerce for any size ]; @@ -971,7 +994,7 @@ mod tests { Volatility::Stable, ); - let coerced_data_types = data_types(¤t_types, &signature).unwrap(); + let coerced_data_types = data_types("test", ¤t_types, &signature).unwrap(); assert_eq!(coerced_data_types, current_types); // make sure it can't coerce to a different size @@ -979,7 +1002,7 @@ mod tests { vec![DataType::FixedSizeList(Arc::clone(&inner), 3)], Volatility::Stable, ); - let coerced_data_types = data_types(¤t_types, &signature); + let coerced_data_types = data_types("test", ¤t_types, &signature); assert!(coerced_data_types.is_err()); // make sure it works with the same type. @@ -987,7 +1010,7 @@ mod tests { vec![DataType::FixedSizeList(Arc::clone(&inner), 2)], Volatility::Stable, ); - let coerced_data_types = data_types(¤t_types, &signature).unwrap(); + let coerced_data_types = data_types("test", ¤t_types, &signature).unwrap(); assert_eq!(coerced_data_types, current_types); Ok(()) @@ -996,10 +1019,9 @@ mod tests { #[test] fn test_nested_wildcard_fixed_size_lists() -> Result<()> { let type_into = DataType::FixedSizeList( - Arc::new(Field::new( - "item", + Arc::new(Field::new_list_field( DataType::FixedSizeList( - Arc::new(Field::new("item", DataType::Int32, false)), + Arc::new(Field::new_list_field(DataType::Int32, false)), FIXED_SIZE_LIST_WILDCARD, ), false, @@ -1008,10 +1030,9 @@ mod tests { ); let type_from = DataType::FixedSizeList( - Arc::new(Field::new( - "item", + Arc::new(Field::new_list_field( DataType::FixedSizeList( - Arc::new(Field::new("item", DataType::Int8, false)), + Arc::new(Field::new_list_field(DataType::Int8, false)), 4, ), false, @@ -1022,10 +1043,9 @@ mod tests { assert_eq!( coerced_from(&type_into, &type_from), Some(DataType::FixedSizeList( - Arc::new(Field::new( - "item", + Arc::new(Field::new_list_field( DataType::FixedSizeList( - Arc::new(Field::new("item", DataType::Int32, false)), + Arc::new(Field::new_list_field(DataType::Int32, false)), 4, ), false, diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 6f7c5d379260e..b99b9e7f4c1c3 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -36,10 +36,9 @@ use datafusion_common::tree_node::{ use datafusion_common::utils::get_at_indices; use datafusion_common::{ internal_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, - DataFusionError, HashMap, Result, TableReference, + DataFusionError, HashMap, IndexSet, Result, TableReference, }; -use indexmap::IndexSet; use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem}; pub use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity; diff --git a/datafusion/ffi/Cargo.toml b/datafusion/ffi/Cargo.toml index 119747342515d..6393fb31e3206 100644 --- a/datafusion/ffi/Cargo.toml +++ b/datafusion/ffi/Cargo.toml @@ -26,7 +26,7 @@ repository = { workspace = true } license = { workspace = true } authors = { workspace = true } # Specify MSRV here as `cargo msrv` doesn't support workspace version -rust-version = "1.76" +rust-version = "1.80" [lints] workspace = true diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/bytes.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/bytes.rs index 07fa4efc990e5..e321df61ddc6a 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/bytes.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/bytes.rs @@ -19,14 +19,13 @@ use arrow::array::{ArrayRef, OffsetSizeTrait}; use datafusion_common::cast::as_list_array; -use datafusion_common::utils::array_into_list_array_nullable; +use datafusion_common::utils::SingleRowListArrayBuilder; use datafusion_common::ScalarValue; use datafusion_expr_common::accumulator::Accumulator; use datafusion_physical_expr_common::binary_map::{ArrowBytesSet, OutputType}; use datafusion_physical_expr_common::binary_view_map::ArrowBytesViewSet; use std::fmt::Debug; use std::mem::size_of_val; -use std::sync::Arc; /// Specialized implementation of /// `COUNT DISTINCT` for [`StringArray`] [`LargeStringArray`], @@ -49,8 +48,7 @@ impl Accumulator for BytesDistinctCountAccumulator { fn state(&mut self) -> datafusion_common::Result> { let set = self.0.take(); let arr = set.into_state(); - let list = Arc::new(array_into_list_array_nullable(arr)); - Ok(vec![ScalarValue::List(list)]) + Ok(vec![SingleRowListArrayBuilder::new(arr).build_list_scalar()]) } fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { @@ -109,8 +107,7 @@ impl Accumulator for BytesViewDistinctCountAccumulator { fn state(&mut self) -> datafusion_common::Result> { let set = self.0.take(); let arr = set.into_state(); - let list = Arc::new(array_into_list_array_nullable(arr)); - Ok(vec![ScalarValue::List(list)]) + Ok(vec![SingleRowListArrayBuilder::new(arr).build_list_scalar()]) } fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs index 405b2c2db7bdd..e8b6588dc0913 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs @@ -33,8 +33,8 @@ use arrow::array::PrimitiveArray; use arrow::datatypes::DataType; use datafusion_common::cast::{as_list_array, as_primitive_array}; -use datafusion_common::utils::array_into_list_array_nullable; use datafusion_common::utils::memory::estimate_memory_size; +use datafusion_common::utils::SingleRowListArrayBuilder; use datafusion_common::ScalarValue; use datafusion_expr_common::accumulator::Accumulator; @@ -73,8 +73,7 @@ where PrimitiveArray::::from_iter_values(self.values.iter().cloned()) .with_data_type(self.data_type.clone()), ); - let list = Arc::new(array_into_list_array_nullable(arr)); - Ok(vec![ScalarValue::List(list)]) + Ok(vec![SingleRowListArrayBuilder::new(arr).build_list_scalar()]) } fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { @@ -160,8 +159,7 @@ where let arr = Arc::new(PrimitiveArray::::from_iter_values( self.values.iter().map(|v| v.0), )) as ArrayRef; - let list = Arc::new(array_into_list_array_nullable(arr)); - Ok(vec![ScalarValue::List(list)]) + Ok(vec![SingleRowListArrayBuilder::new(arr).build_list_scalar()]) } fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { diff --git a/datafusion/functions-aggregate/COMMENTS.md b/datafusion/functions-aggregate/COMMENTS.md index e669e13557115..1cb4cdd7d5a45 100644 --- a/datafusion/functions-aggregate/COMMENTS.md +++ b/datafusion/functions-aggregate/COMMENTS.md @@ -54,7 +54,7 @@ first argument and the definition looks like this: // `input_type` : data type of the first argument let mut fields = vec![Field::new_list( format_state_name(self.name(), "nth_value"), - Field::new("item", args.input_types[0].clone(), true /* nullable of list item */ ), + Field::new_list_field(args.input_types[0].clone(), true /* nullable of list item */ ), false, // nullable of list itself )]; ``` diff --git a/datafusion/functions-aggregate/src/approx_median.rs b/datafusion/functions-aggregate/src/approx_median.rs index 8920c8e5f0c48..f653e94740b31 100644 --- a/datafusion/functions-aggregate/src/approx_median.rs +++ b/datafusion/functions-aggregate/src/approx_median.rs @@ -87,7 +87,7 @@ impl AggregateUDFImpl for ApproxMedian { Field::new(format_state_name(args.name, "min"), Float64, false), Field::new_list( format_state_name(args.name, "centroids"), - Field::new("item", Float64, true), + Field::new_list_field(Float64, true), false, ), ]) diff --git a/datafusion/functions-aggregate/src/approx_percentile_cont.rs b/datafusion/functions-aggregate/src/approx_percentile_cont.rs index 6edae6344ab15..197c459d26e57 100644 --- a/datafusion/functions-aggregate/src/approx_percentile_cont.rs +++ b/datafusion/functions-aggregate/src/approx_percentile_cont.rs @@ -240,7 +240,7 @@ impl AggregateUDFImpl for ApproxPercentileCont { ), Field::new_list( format_state_name(args.name, "centroids"), - Field::new("item", DataType::Float64, true), + Field::new_list_field(DataType::Float64, true), false, ), ]) diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 3b9a521ec9721..6344dbcd93dc9 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -22,7 +22,7 @@ use arrow::datatypes::DataType; use arrow_schema::{Field, Fields}; use datafusion_common::cast::as_list_array; -use datafusion_common::utils::{array_into_list_array_nullable, get_row_at_idx}; +use datafusion_common::utils::{get_row_at_idx, SingleRowListArrayBuilder}; use datafusion_common::{exec_err, ScalarValue}; use datafusion_common::{internal_err, Result}; use datafusion_expr::aggregate_doc_sections::DOC_SECTION_GENERAL; @@ -77,8 +77,7 @@ impl AggregateUDFImpl for ArrayAgg { } fn return_type(&self, arg_types: &[DataType]) -> Result { - Ok(DataType::List(Arc::new(Field::new( - "item", + Ok(DataType::List(Arc::new(Field::new_list_field( arg_types[0].clone(), true, )))) @@ -89,7 +88,7 @@ impl AggregateUDFImpl for ArrayAgg { return Ok(vec![Field::new_list( format_state_name(args.name, "distinct_array_agg"), // See COMMENTS.md to understand why nullable is set to true - Field::new("item", args.input_types[0].clone(), true), + Field::new_list_field(args.input_types[0].clone(), true), true, )]); } @@ -97,7 +96,7 @@ impl AggregateUDFImpl for ArrayAgg { let mut fields = vec![Field::new_list( format_state_name(args.name, "array_agg"), // See COMMENTS.md to understand why nullable is set to true - Field::new("item", args.input_types[0].clone(), true), + Field::new_list_field(args.input_types[0].clone(), true), true, )]; @@ -108,7 +107,7 @@ impl AggregateUDFImpl for ArrayAgg { let orderings = args.ordering_fields.to_vec(); fields.push(Field::new_list( format_state_name(args.name, "array_agg_orderings"), - Field::new("item", DataType::Struct(Fields::from(orderings)), true), + Field::new_list_field(DataType::Struct(Fields::from(orderings)), true), false, )); @@ -238,9 +237,8 @@ impl Accumulator for ArrayAggAccumulator { } let concated_array = arrow::compute::concat(&element_arrays)?; - let list_array = array_into_list_array_nullable(concated_array); - Ok(ScalarValue::List(Arc::new(list_array))) + Ok(SingleRowListArrayBuilder::new(concated_array).build_list_scalar()) } fn size(&self) -> usize { @@ -530,9 +528,7 @@ impl OrderSensitiveArrayAggAccumulator { let ordering_array = StructArray::try_new(struct_field, column_wise_ordering_values, None)?; - Ok(ScalarValue::List(Arc::new(array_into_list_array_nullable( - Arc::new(ordering_array), - )))) + Ok(SingleRowListArrayBuilder::new(Arc::new(ordering_array)).build_list_scalar()) } } diff --git a/datafusion/functions-aggregate/src/bit_and_or_xor.rs b/datafusion/functions-aggregate/src/bit_and_or_xor.rs index aeed78737c1d5..6298071a223b2 100644 --- a/datafusion/functions-aggregate/src/bit_and_or_xor.rs +++ b/datafusion/functions-aggregate/src/bit_and_or_xor.rs @@ -272,7 +272,7 @@ impl AggregateUDFImpl for BitwiseOperation { format!("{} distinct", self.name()).as_str(), ), // See COMMENTS.md to understand why nullable is set to true - Field::new("item", args.return_type.clone(), true), + Field::new_list_field(args.return_type.clone(), true), false, )]) } else { diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index c8f8c8153ce11..a191f7bad477b 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -135,7 +135,7 @@ impl AggregateUDFImpl for Count { Ok(vec![Field::new_list( format_state_name(args.name, "count distinct"), // See COMMENTS.md to understand why nullable is set to true - Field::new("item", args.input_types[0].clone(), true), + Field::new_list_field(args.input_types[0].clone(), true), false, )]) } else { diff --git a/datafusion/functions-aggregate/src/median.rs b/datafusion/functions-aggregate/src/median.rs index bcffb19b75593..0fd9d4ca63e4b 100644 --- a/datafusion/functions-aggregate/src/median.rs +++ b/datafusion/functions-aggregate/src/median.rs @@ -104,7 +104,7 @@ impl AggregateUDFImpl for Median { fn state_fields(&self, args: StateFieldsArgs) -> Result> { //Intermediate state is a list of the elements we have collected so far - let field = Field::new("item", args.input_types[0].clone(), true); + let field = Field::new_list_field(args.input_types[0].clone(), true); let state_name = if args.is_distinct { "distinct_median" } else { diff --git a/datafusion/functions-aggregate/src/nth_value.rs b/datafusion/functions-aggregate/src/nth_value.rs index 0c72939633b16..d1a3a00763ec2 100644 --- a/datafusion/functions-aggregate/src/nth_value.rs +++ b/datafusion/functions-aggregate/src/nth_value.rs @@ -26,7 +26,7 @@ use std::sync::{Arc, OnceLock}; use arrow::array::{new_empty_array, ArrayRef, AsArray, StructArray}; use arrow_schema::{DataType, Field, Fields}; -use datafusion_common::utils::{array_into_list_array_nullable, get_row_at_idx}; +use datafusion_common::utils::{get_row_at_idx, SingleRowListArrayBuilder}; use datafusion_common::{exec_err, internal_err, not_impl_err, Result, ScalarValue}; use datafusion_expr::aggregate_doc_sections::DOC_SECTION_STATISTICAL; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; @@ -142,14 +142,14 @@ impl AggregateUDFImpl for NthValueAgg { let mut fields = vec![Field::new_list( format_state_name(self.name(), "nth_value"), // See COMMENTS.md to understand why nullable is set to true - Field::new("item", args.input_types[0].clone(), true), + Field::new_list_field(args.input_types[0].clone(), true), false, )]; let orderings = args.ordering_fields.to_vec(); if !orderings.is_empty() { fields.push(Field::new_list( format_state_name(self.name(), "nth_value_orderings"), - Field::new("item", DataType::Struct(Fields::from(orderings)), true), + Field::new_list_field(DataType::Struct(Fields::from(orderings)), true), false, )); } @@ -423,9 +423,7 @@ impl NthValueAccumulator { let ordering_array = StructArray::try_new(struct_field, column_wise_ordering_values, None)?; - Ok(ScalarValue::List(Arc::new(array_into_list_array_nullable( - Arc::new(ordering_array), - )))) + Ok(SingleRowListArrayBuilder::new(Arc::new(ordering_array)).build_list_scalar()) } fn evaluate_values(&self) -> ScalarValue { diff --git a/datafusion/functions-aggregate/src/sum.rs b/datafusion/functions-aggregate/src/sum.rs index 447b5d8a57c44..df87a593cf35a 100644 --- a/datafusion/functions-aggregate/src/sum.rs +++ b/datafusion/functions-aggregate/src/sum.rs @@ -183,7 +183,7 @@ impl AggregateUDFImpl for Sum { Ok(vec![Field::new_list( format_state_name(args.name, "sum distinct"), // See COMMENTS.md to understand why nullable is set to true - Field::new("item", args.return_type.clone(), true), + Field::new_list_field(args.return_type.clone(), true), false, )]) } else { diff --git a/datafusion/functions-nested/benches/map.rs b/datafusion/functions-nested/benches/map.rs index 0f1d9ed506365..f92bb6cecf9c5 100644 --- a/datafusion/functions-nested/benches/map.rs +++ b/datafusion/functions-nested/benches/map.rs @@ -75,7 +75,7 @@ fn criterion_benchmark(c: &mut Criterion) { c.bench_function("map_1000", |b| { let mut rng = rand::thread_rng(); - let field = Arc::new(Field::new("item", DataType::Utf8, true)); + let field = Arc::new(Field::new_list_field(DataType::Utf8, true)); let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000])); let key_list = ListArray::new( field, @@ -83,7 +83,7 @@ fn criterion_benchmark(c: &mut Criterion) { Arc::new(StringArray::from(keys(&mut rng))), None, ); - let field = Arc::new(Field::new("item", DataType::Int32, true)); + let field = Arc::new(Field::new_list_field(DataType::Int32, true)); let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000])); let value_list = ListArray::new( field, diff --git a/datafusion/functions-nested/src/concat.rs b/datafusion/functions-nested/src/concat.rs index ac9b5b2a16eff..3e8a5877fb336 100644 --- a/datafusion/functions-nested/src/concat.rs +++ b/datafusion/functions-nested/src/concat.rs @@ -429,7 +429,7 @@ fn concat_internal(args: &[ArrayRef]) -> Result { .collect::>(); let list_arr = GenericListArray::::new( - Arc::new(Field::new("item", data_type, true)), + Arc::new(Field::new_list_field(data_type, true)), OffsetBuffer::from_lengths(array_lengths), Arc::new(arrow::compute::concat(elements.as_slice())?), Some(NullBuffer::new(buffer)), @@ -558,7 +558,7 @@ where let data = mutable.freeze(); Ok(Arc::new(GenericListArray::::try_new( - Arc::new(Field::new("item", data_type.to_owned(), true)), + Arc::new(Field::new_list_field(data_type.to_owned(), true)), OffsetBuffer::new(offsets.into()), arrow_array::make_array(data), None, diff --git a/datafusion/functions-nested/src/dimension.rs b/datafusion/functions-nested/src/dimension.rs index 9ca7c87aba559..2d2f90e9c7cb4 100644 --- a/datafusion/functions-nested/src/dimension.rs +++ b/datafusion/functions-nested/src/dimension.rs @@ -73,7 +73,7 @@ impl ScalarUDFImpl for ArrayDims { fn return_type(&self, arg_types: &[DataType]) -> Result { Ok(match arg_types[0] { List(_) | LargeList(_) | FixedSizeList(_, _) => { - List(Arc::new(Field::new("item", UInt64, true))) + List(Arc::new(Field::new_list_field(UInt64, true))) } _ => { return plan_err!("The array_dims function can only accept List/LargeList/FixedSizeList."); diff --git a/datafusion/functions-nested/src/extract.rs b/datafusion/functions-nested/src/extract.rs index 13095bc4ba3fc..fc35f00763305 100644 --- a/datafusion/functions-nested/src/extract.rs +++ b/datafusion/functions-nested/src/extract.rs @@ -622,7 +622,7 @@ where let data = mutable.freeze(); Ok(Arc::new(GenericListArray::::try_new( - Arc::new(Field::new("item", array.value_type(), true)), + Arc::new(Field::new_list_field(array.value_type(), true)), OffsetBuffer::::new(offsets.into()), arrow_array::make_array(data), None, diff --git a/datafusion/functions-nested/src/make_array.rs b/datafusion/functions-nested/src/make_array.rs index 825824a82d200..22870dd85f0c6 100644 --- a/datafusion/functions-nested/src/make_array.rs +++ b/datafusion/functions-nested/src/make_array.rs @@ -28,7 +28,8 @@ use arrow_array::{ use arrow_buffer::OffsetBuffer; use arrow_schema::DataType::{List, Null}; use arrow_schema::{DataType, Field}; -use datafusion_common::{plan_err, utils::array_into_list_array_nullable, Result}; +use datafusion_common::utils::SingleRowListArrayBuilder; +use datafusion_common::{plan_err, Result}; use datafusion_expr::binary::{ try_type_union_resolution_with_struct, type_union_resolution, }; @@ -89,8 +90,7 @@ impl ScalarUDFImpl for MakeArray { 0 => Ok(empty_array_type()), _ => { // At this point, all the type in array should be coerced to the same one - Ok(List(Arc::new(Field::new( - "item", + Ok(List(Arc::new(Field::new_list_field( arg_types[0].to_owned(), true, )))) @@ -172,7 +172,7 @@ fn get_make_array_doc() -> &'static Documentation { // Empty array is a special case that is useful for many other array functions pub(super) fn empty_array_type() -> DataType { - List(Arc::new(Field::new("item", DataType::Int64, true))) + List(Arc::new(Field::new_list_field(DataType::Int64, true))) } /// `make_array_inner` is the implementation of the `make_array` function. @@ -194,7 +194,9 @@ pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result { let length = arrays.iter().map(|a| a.len()).sum(); // By default Int64 let array = new_null_array(&DataType::Int64, length); - Ok(Arc::new(array_into_list_array_nullable(array))) + Ok(Arc::new( + SingleRowListArrayBuilder::new(array).build_list_array(), + )) } _ => array_array::(arrays, data_type), } @@ -285,7 +287,7 @@ fn array_array( let data = mutable.freeze(); Ok(Arc::new(GenericListArray::::try_new( - Arc::new(Field::new("item", data_type, true)), + Arc::new(Field::new_list_field(data_type, true)), OffsetBuffer::new(offsets.into()), arrow_array::make_array(data), None, diff --git a/datafusion/functions-nested/src/map_extract.rs b/datafusion/functions-nested/src/map_extract.rs index d2ab078cb22dd..24f396e741b2d 100644 --- a/datafusion/functions-nested/src/map_extract.rs +++ b/datafusion/functions-nested/src/map_extract.rs @@ -78,8 +78,7 @@ impl ScalarUDFImpl for MapExtract { } let map_type = &arg_types[0]; let map_fields = get_map_entry_field(map_type)?; - Ok(DataType::List(Arc::new(Field::new( - "item", + Ok(DataType::List(Arc::new(Field::new_list_field( map_fields.last().unwrap().data_type().clone(), true, )))) @@ -187,7 +186,7 @@ fn general_map_extract_inner( let data = mutable.freeze(); Ok(Arc::new(ListArray::new( - Arc::new(Field::new("item", map_array.value_type().clone(), true)), + Arc::new(Field::new_list_field(map_array.value_type().clone(), true)), OffsetBuffer::::new(offsets.into()), Arc::new(make_array(data)), None, diff --git a/datafusion/functions-nested/src/map_keys.rs b/datafusion/functions-nested/src/map_keys.rs index 4abdbcad1e825..1d19cb8492f03 100644 --- a/datafusion/functions-nested/src/map_keys.rs +++ b/datafusion/functions-nested/src/map_keys.rs @@ -72,8 +72,7 @@ impl ScalarUDFImpl for MapKeysFunc { } let map_type = &arg_types[0]; let map_fields = get_map_entry_field(map_type)?; - Ok(DataType::List(Arc::new(Field::new( - "item", + Ok(DataType::List(Arc::new(Field::new_list_field( map_fields.first().unwrap().data_type().clone(), false, )))) @@ -130,7 +129,7 @@ fn map_keys_inner(args: &[ArrayRef]) -> Result { }; Ok(Arc::new(ListArray::new( - Arc::new(Field::new("item", map_array.key_type().clone(), false)), + Arc::new(Field::new_list_field(map_array.key_type().clone(), false)), map_array.offsets().clone(), Arc::clone(map_array.keys()), None, diff --git a/datafusion/functions-nested/src/map_values.rs b/datafusion/functions-nested/src/map_values.rs index f1cc36cade636..816ebe74aff0a 100644 --- a/datafusion/functions-nested/src/map_values.rs +++ b/datafusion/functions-nested/src/map_values.rs @@ -72,8 +72,7 @@ impl ScalarUDFImpl for MapValuesFunc { } let map_type = &arg_types[0]; let map_fields = get_map_entry_field(map_type)?; - Ok(DataType::List(Arc::new(Field::new( - "item", + Ok(DataType::List(Arc::new(Field::new_list_field( map_fields.last().unwrap().data_type().clone(), true, )))) @@ -131,7 +130,7 @@ fn map_values_inner(args: &[ArrayRef]) -> Result { }; Ok(Arc::new(ListArray::new( - Arc::new(Field::new("item", map_array.value_type().clone(), true)), + Arc::new(Field::new_list_field(map_array.value_type().clone(), true)), map_array.offsets().clone(), Arc::clone(map_array.values()), None, diff --git a/datafusion/functions-nested/src/position.rs b/datafusion/functions-nested/src/position.rs index 9ed4b4c42d14a..feacc70061926 100644 --- a/datafusion/functions-nested/src/position.rs +++ b/datafusion/functions-nested/src/position.rs @@ -252,7 +252,7 @@ impl ScalarUDFImpl for ArrayPositions { } fn return_type(&self, _arg_types: &[DataType]) -> Result { - Ok(List(Arc::new(Field::new("item", UInt64, true)))) + Ok(List(Arc::new(Field::new_list_field(UInt64, true)))) } fn invoke_batch( diff --git a/datafusion/functions-nested/src/range.rs b/datafusion/functions-nested/src/range.rs index 360f0023dc477..8344c1a261db0 100644 --- a/datafusion/functions-nested/src/range.rs +++ b/datafusion/functions-nested/src/range.rs @@ -109,8 +109,7 @@ impl ScalarUDFImpl for Range { if arg_types.iter().any(|t| t.is_null()) { Ok(Null) } else { - Ok(List(Arc::new(Field::new( - "item", + Ok(List(Arc::new(Field::new_list_field( arg_types[0].clone(), true, )))) @@ -249,8 +248,7 @@ impl ScalarUDFImpl for GenSeries { if arg_types.iter().any(|t| t.is_null()) { Ok(Null) } else { - Ok(List(Arc::new(Field::new( - "item", + Ok(List(Arc::new(Field::new_list_field( arg_types[0].clone(), true, )))) @@ -393,7 +391,7 @@ pub(super) fn gen_range_inner( }; } let arr = Arc::new(ListArray::try_new( - Arc::new(Field::new("item", Int64, true)), + Arc::new(Field::new_list_field(Int64, true)), OffsetBuffer::new(offsets.into()), Arc::new(Int64Array::from(values)), Some(NullBuffer::new(valid.finish())), diff --git a/datafusion/functions-nested/src/remove.rs b/datafusion/functions-nested/src/remove.rs index df0edc99bc62a..e5521706bece2 100644 --- a/datafusion/functions-nested/src/remove.rs +++ b/datafusion/functions-nested/src/remove.rs @@ -433,7 +433,7 @@ fn general_remove( }; Ok(Arc::new(GenericListArray::::try_new( - Arc::new(Field::new("item", data_type, true)), + Arc::new(Field::new_list_field(data_type, true)), OffsetBuffer::new(offsets.into()), values, list_array.nulls().cloned(), diff --git a/datafusion/functions-nested/src/repeat.rs b/datafusion/functions-nested/src/repeat.rs index f67ab83b1d390..2842b91a781b5 100644 --- a/datafusion/functions-nested/src/repeat.rs +++ b/datafusion/functions-nested/src/repeat.rs @@ -72,8 +72,7 @@ impl ScalarUDFImpl for ArrayRepeat { } fn return_type(&self, arg_types: &[DataType]) -> Result { - Ok(List(Arc::new(Field::new( - "item", + Ok(List(Arc::new(Field::new_list_field( arg_types[0].clone(), true, )))) @@ -204,7 +203,7 @@ fn general_repeat( let values = compute::concat(&new_values)?; Ok(Arc::new(GenericListArray::::try_new( - Arc::new(Field::new("item", data_type.to_owned(), true)), + Arc::new(Field::new_list_field(data_type.to_owned(), true)), OffsetBuffer::from_lengths(count_vec), values, None, @@ -255,7 +254,7 @@ fn general_list_repeat( let repeated_array = arrow_array::make_array(data); let list_arr = GenericListArray::::try_new( - Arc::new(Field::new("item", value_type.clone(), true)), + Arc::new(Field::new_list_field(value_type.clone(), true)), OffsetBuffer::::from_lengths(vec![original_data.len(); count]), repeated_array, None, @@ -272,7 +271,7 @@ fn general_list_repeat( let values = compute::concat(&new_values)?; Ok(Arc::new(ListArray::try_new( - Arc::new(Field::new("item", data_type.to_owned(), true)), + Arc::new(Field::new_list_field(data_type.to_owned(), true)), OffsetBuffer::::from_lengths(lengths), values, None, diff --git a/datafusion/functions-nested/src/replace.rs b/datafusion/functions-nested/src/replace.rs index 01811b77734da..e971d97dbf2b9 100644 --- a/datafusion/functions-nested/src/replace.rs +++ b/datafusion/functions-nested/src/replace.rs @@ -414,7 +414,7 @@ fn general_replace( let data = mutable.freeze(); Ok(Arc::new(GenericListArray::::try_new( - Arc::new(Field::new("item", list_array.value_type(), true)), + Arc::new(Field::new_list_field(list_array.value_type(), true)), OffsetBuffer::::new(offsets.into()), arrow_array::make_array(data), Some(NullBuffer::new(valid.finish())), diff --git a/datafusion/functions-nested/src/set_ops.rs b/datafusion/functions-nested/src/set_ops.rs index 889eaed4edcc6..642fd9ad54cd3 100644 --- a/datafusion/functions-nested/src/set_ops.rs +++ b/datafusion/functions-nested/src/set_ops.rs @@ -270,13 +270,10 @@ impl ScalarUDFImpl for ArrayDistinct { fn return_type(&self, arg_types: &[DataType]) -> Result { match &arg_types[0] { - List(field) | FixedSizeList(field, _) => Ok(List(Arc::new(Field::new( - "item", - field.data_type().clone(), - true, - )))), - LargeList(field) => Ok(LargeList(Arc::new(Field::new( - "item", + List(field) | FixedSizeList(field, _) => Ok(List(Arc::new( + Field::new_list_field(field.data_type().clone(), true), + ))), + LargeList(field) => Ok(LargeList(Arc::new(Field::new_list_field( field.data_type().clone(), true, )))), @@ -376,10 +373,10 @@ fn generic_set_lists( set_op: SetOp, ) -> Result { if matches!(l.value_type(), Null) { - let field = Arc::new(Field::new("item", r.value_type(), true)); + let field = Arc::new(Field::new_list_field(r.value_type(), true)); return general_array_distinct::(r, &field); } else if matches!(r.value_type(), Null) { - let field = Arc::new(Field::new("item", l.value_type(), true)); + let field = Arc::new(Field::new_list_field(l.value_type(), true)); return general_array_distinct::(l, &field); } diff --git a/datafusion/functions-nested/src/sort.rs b/datafusion/functions-nested/src/sort.rs index cdd6842acc0ee..043fedd89bf8a 100644 --- a/datafusion/functions-nested/src/sort.rs +++ b/datafusion/functions-nested/src/sort.rs @@ -70,13 +70,10 @@ impl ScalarUDFImpl for ArraySort { fn return_type(&self, arg_types: &[DataType]) -> Result { match &arg_types[0] { - List(field) | FixedSizeList(field, _) => Ok(List(Arc::new(Field::new( - "item", - field.data_type().clone(), - true, - )))), - LargeList(field) => Ok(LargeList(Arc::new(Field::new( - "item", + List(field) | FixedSizeList(field, _) => Ok(List(Arc::new( + Field::new_list_field(field.data_type().clone(), true), + ))), + LargeList(field) => Ok(LargeList(Arc::new(Field::new_list_field( field.data_type().clone(), true, )))), @@ -198,7 +195,7 @@ pub fn array_sort_inner(args: &[ArrayRef]) -> Result { .collect::>(); let list_arr = ListArray::new( - Arc::new(Field::new("item", data_type, true)), + Arc::new(Field::new_list_field(data_type, true)), OffsetBuffer::from_lengths(array_lengths), Arc::new(compute::concat(elements.as_slice())?), Some(NullBuffer::new(buffer)), diff --git a/datafusion/functions-nested/src/string.rs b/datafusion/functions-nested/src/string.rs index 8c6cb73e97c9b..143a3d06a32a2 100644 --- a/datafusion/functions-nested/src/string.rs +++ b/datafusion/functions-nested/src/string.rs @@ -252,7 +252,7 @@ impl ScalarUDFImpl for StringToArray { fn return_type(&self, arg_types: &[DataType]) -> Result { Ok(match arg_types[0] { Utf8 | Utf8View | LargeUtf8 => { - List(Arc::new(Field::new("item", arg_types[0].clone(), true))) + List(Arc::new(Field::new_list_field(arg_types[0].clone(), true))) } _ => { return plan_err!( diff --git a/datafusion/functions-nested/src/utils.rs b/datafusion/functions-nested/src/utils.rs index b9a75724bcdee..4b7b5ebd8ba1e 100644 --- a/datafusion/functions-nested/src/utils.rs +++ b/datafusion/functions-nested/src/utils.rs @@ -114,7 +114,7 @@ pub(crate) fn align_array_dimensions( let offsets = OffsetBuffer::::from_lengths(array_lengths); aligned_array = Arc::new(GenericListArray::::try_new( - Arc::new(Field::new("item", data_type, true)), + Arc::new(Field::new_list_field(data_type, true)), offsets, aligned_array, None, @@ -274,27 +274,27 @@ pub(crate) fn get_map_entry_field(data_type: &DataType) -> Result<&Fields> { mod tests { use super::*; use arrow::datatypes::Int64Type; - use datafusion_common::utils::array_into_list_array_nullable; + use datafusion_common::utils::SingleRowListArrayBuilder; /// Only test internal functions, array-related sql functions will be tested in sqllogictest `array.slt` #[test] fn test_align_array_dimensions() { - let array1d_1 = + let array1d_1: ArrayRef = Arc::new(ListArray::from_iter_primitive::(vec![ Some(vec![Some(1), Some(2), Some(3)]), Some(vec![Some(4), Some(5)]), ])); - let array1d_2 = + let array1d_2: ArrayRef = Arc::new(ListArray::from_iter_primitive::(vec![ Some(vec![Some(6), Some(7), Some(8)]), ])); - let array2d_1 = Arc::new(array_into_list_array_nullable( - Arc::clone(&array1d_1) as ArrayRef - )) as ArrayRef; - let array2d_2 = Arc::new(array_into_list_array_nullable( - Arc::clone(&array1d_2) as ArrayRef - )) as ArrayRef; + let array2d_1: ArrayRef = Arc::new( + SingleRowListArrayBuilder::new(Arc::clone(&array1d_1)).build_list_array(), + ); + let array2d_2 = Arc::new( + SingleRowListArrayBuilder::new(Arc::clone(&array1d_2)).build_list_array(), + ); let res = align_array_dimensions::(vec![ array1d_1.to_owned(), @@ -310,10 +310,11 @@ mod tests { expected_dim ); - let array3d_1 = Arc::new(array_into_list_array_nullable(array2d_1)) as ArrayRef; - let array3d_2 = array_into_list_array_nullable(array2d_2.to_owned()); - let res = - align_array_dimensions::(vec![array1d_1, Arc::new(array3d_2)]).unwrap(); + let array3d_1: ArrayRef = + Arc::new(SingleRowListArrayBuilder::new(array2d_1).build_list_array()); + let array3d_2: ArrayRef = + Arc::new(SingleRowListArrayBuilder::new(array2d_2).build_list_array()); + let res = align_array_dimensions::(vec![array1d_1, array3d_2]).unwrap(); let expected = as_list_array(&array3d_1).unwrap(); let expected_dim = datafusion_common::utils::list_ndims(array3d_1.data_type()); diff --git a/datafusion/functions/src/regex/regexpmatch.rs b/datafusion/functions/src/regex/regexpmatch.rs index df4f294bb9507..93178d23de4f0 100644 --- a/datafusion/functions/src/regex/regexpmatch.rs +++ b/datafusion/functions/src/regex/regexpmatch.rs @@ -80,7 +80,7 @@ impl ScalarUDFImpl for RegexpMatchFunc { fn return_type(&self, arg_types: &[DataType]) -> Result { Ok(match &arg_types[0] { DataType::Null => DataType::Null, - other => DataType::List(Arc::new(Field::new("item", other.clone(), true))), + other => DataType::List(Arc::new(Field::new_list_field(other.clone(), true))), }) } fn invoke_batch( diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 58e390ee8bdba..628c1498f9734 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -1357,7 +1357,7 @@ mod test { let err = Projection::try_new(vec![udaf], empty).err().unwrap(); assert!( - err.strip_backtrace().starts_with("Error during planning: Error during planning: Coercion from [Utf8] to the signature Uniform(1, [Float64]) failed") + err.strip_backtrace().starts_with("Error during planning: Error during planning: Failed to coerce arguments to satisfy a call to MY_AVG function: coercion from [Utf8] to the signature Uniform(1, [Float64]) failed") ); Ok(()) } @@ -1407,7 +1407,7 @@ mod test { .err() .unwrap() .strip_backtrace(); - assert!(err.starts_with("Error during planning: Error during planning: Coercion from [Utf8] to the signature Uniform(1, [Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64, Float32, Float64]) failed.")); + assert!(err.starts_with("Error during planning: Error during planning: Failed to coerce arguments to satisfy a call to avg function: coercion from [Utf8] to the signature Uniform(1, [Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64, Float32, Float64]) failed.")); Ok(()) } @@ -1847,7 +1847,7 @@ mod test { #[test] fn tes_case_when_list() -> Result<()> { - let inner_field = Arc::new(Field::new("item", DataType::Int64, true)); + let inner_field = Arc::new(Field::new_list_field(DataType::Int64, true)); let schema = Arc::new(DFSchema::from_unqualified_fields( vec![ Field::new( @@ -1869,7 +1869,7 @@ mod test { test_case_expression!( Some("list"), vec![(Box::new(col("large_list")), Box::new(lit("1")))], - DataType::LargeList(Arc::new(Field::new("item", DataType::Int64, true))), + DataType::LargeList(Arc::new(Field::new_list_field(DataType::Int64, true))), Utf8, schema ); @@ -1877,7 +1877,7 @@ mod test { test_case_expression!( Some("large_list"), vec![(Box::new(col("list")), Box::new(lit("1")))], - DataType::LargeList(Arc::new(Field::new("item", DataType::Int64, true))), + DataType::LargeList(Arc::new(Field::new_list_field(DataType::Int64, true))), Utf8, schema ); @@ -1885,7 +1885,7 @@ mod test { test_case_expression!( Some("list"), vec![(Box::new(col("fixed_list")), Box::new(lit("1")))], - DataType::List(Arc::new(Field::new("item", DataType::Int64, true))), + DataType::List(Arc::new(Field::new_list_field(DataType::Int64, true))), Utf8, schema ); @@ -1893,7 +1893,7 @@ mod test { test_case_expression!( Some("fixed_list"), vec![(Box::new(col("list")), Box::new(lit("1")))], - DataType::List(Arc::new(Field::new("item", DataType::Int64, true))), + DataType::List(Arc::new(Field::new_list_field(DataType::Int64, true))), Utf8, schema ); @@ -1901,7 +1901,7 @@ mod test { test_case_expression!( Some("fixed_list"), vec![(Box::new(col("large_list")), Box::new(lit("1")))], - DataType::LargeList(Arc::new(Field::new("item", DataType::Int64, true))), + DataType::LargeList(Arc::new(Field::new_list_field(DataType::Int64, true))), Utf8, schema ); @@ -1909,7 +1909,7 @@ mod test { test_case_expression!( Some("large_list"), vec![(Box::new(col("fixed_list")), Box::new(lit("1")))], - DataType::LargeList(Arc::new(Field::new("item", DataType::Int64, true))), + DataType::LargeList(Arc::new(Field::new_list_field(DataType::Int64, true))), Utf8, schema ); @@ -1918,7 +1918,7 @@ mod test { #[test] fn test_then_else_list() -> Result<()> { - let inner_field = Arc::new(Field::new("item", DataType::Int64, true)); + let inner_field = Arc::new(Field::new_list_field(DataType::Int64, true)); let schema = Arc::new(DFSchema::from_unqualified_fields( vec![ Field::new("boolean", DataType::Boolean, true), @@ -1946,7 +1946,7 @@ mod test { (Box::new(col("boolean")), Box::new(col("list"))) ], DataType::Boolean, - DataType::LargeList(Arc::new(Field::new("item", DataType::Int64, true))), + DataType::LargeList(Arc::new(Field::new_list_field(DataType::Int64, true))), schema ); @@ -1957,7 +1957,7 @@ mod test { (Box::new(col("boolean")), Box::new(col("large_list"))) ], DataType::Boolean, - DataType::LargeList(Arc::new(Field::new("item", DataType::Int64, true))), + DataType::LargeList(Arc::new(Field::new_list_field(DataType::Int64, true))), schema ); @@ -1969,7 +1969,7 @@ mod test { (Box::new(col("boolean")), Box::new(col("list"))) ], DataType::Boolean, - DataType::List(Arc::new(Field::new("item", DataType::Int64, true))), + DataType::List(Arc::new(Field::new_list_field(DataType::Int64, true))), schema ); @@ -1980,7 +1980,7 @@ mod test { (Box::new(col("boolean")), Box::new(col("fixed_list"))) ], DataType::Boolean, - DataType::List(Arc::new(Field::new("item", DataType::Int64, true))), + DataType::List(Arc::new(Field::new_list_field(DataType::Int64, true))), schema ); @@ -1992,7 +1992,7 @@ mod test { (Box::new(col("boolean")), Box::new(col("large_list"))) ], DataType::Boolean, - DataType::LargeList(Arc::new(Field::new("item", DataType::Int64, true))), + DataType::LargeList(Arc::new(Field::new_list_field(DataType::Int64, true))), schema ); @@ -2003,7 +2003,7 @@ mod test { (Box::new(col("boolean")), Box::new(col("fixed_list"))) ], DataType::Boolean, - DataType::LargeList(Arc::new(Field::new("item", DataType::Int64, true))), + DataType::LargeList(Arc::new(Field::new_list_field(DataType::Int64, true))), schema ); Ok(()) diff --git a/datafusion/optimizer/src/eliminate_duplicated_expr.rs b/datafusion/optimizer/src/eliminate_duplicated_expr.rs index 554985667fdf9..54acaa335f5d5 100644 --- a/datafusion/optimizer/src/eliminate_duplicated_expr.rs +++ b/datafusion/optimizer/src/eliminate_duplicated_expr.rs @@ -20,10 +20,9 @@ use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::tree_node::Transformed; -use datafusion_common::Result; +use datafusion_common::{IndexSet, Result}; use datafusion_expr::logical_plan::LogicalPlan; use datafusion_expr::{Aggregate, Expr, Sort, SortExpr}; -use indexmap::IndexSet; use std::hash::{Hash, Hasher}; /// Optimization rule that eliminate duplicated expr. #[derive(Default, Debug)] diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index fe751a5fb583c..955da36cfa014 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -17,7 +17,6 @@ //! [`PushDownFilter`] applies filters as early as possible -use indexmap::IndexSet; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -27,7 +26,7 @@ use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, }; use datafusion_common::{ - internal_err, plan_err, qualified_name, Column, DFSchema, Result, + internal_err, plan_err, qualified_name, Column, DFSchema, IndexSet, Result, }; use datafusion_expr::expr_rewriter::replace_col; use datafusion_expr::logical_plan::{Join, JoinType, LogicalPlan, TableScan, Union}; diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index 60de3f6b36736..bb67dea008a69 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -31,7 +31,9 @@ use datafusion_common::{ cast::{as_large_list_array, as_list_array}, tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRewriter}, }; -use datafusion_common::{internal_err, DFSchema, DataFusionError, Result, ScalarValue}; +use datafusion_common::{ + internal_err, DFSchema, DataFusionError, IndexSet, Result, ScalarValue, +}; use datafusion_expr::simplify::ExprSimplifyResult; use datafusion_expr::{ and, lit, or, BinaryExpr, Case, ColumnarValue, Expr, Like, Operator, Volatility, @@ -43,7 +45,6 @@ use datafusion_expr::{ utils::{iter_conjunction, iter_conjunction_owned}, }; use datafusion_physical_expr::{create_physical_expr, execution_props::ExecutionProps}; -use indexmap::IndexSet; use crate::analyzer::type_coercion::TypeCoercionRewriter; use crate::simplify_expressions::guarantees::GuaranteeRewriter; diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 522eaea8f67c7..3d7f57aebe232 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -48,7 +48,6 @@ datafusion-functions-aggregate-common = { workspace = true } datafusion-physical-expr-common = { workspace = true } half = { workspace = true } hashbrown = { workspace = true } -indexmap = { workspace = true } itertools = { workspace = true, features = ["use_std"] } log = { workspace = true } paste = "^1.0" diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 16051dae26cae..add6871d1ac44 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -20,12 +20,11 @@ use crate::{ expressions::Column, physical_exprs_contains, LexOrdering, LexRequirement, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, PhysicalSortRequirement, }; -use indexmap::IndexSet; use std::fmt::Display; use std::sync::Arc; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::JoinType; +use datafusion_common::{IndexSet, JoinType}; use datafusion_physical_expr_common::physical_expr::format_physical_expr_list; /// A structure representing a expression known to be constant in a physical execution plan. diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index fe866450b2b26..6c049196c4d76 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -36,12 +36,13 @@ use crate::{ use arrow_schema::{SchemaRef, SortOptions}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{internal_err, plan_err, JoinSide, JoinType, Result}; +use datafusion_common::{ + internal_err, plan_err, IndexMap, IndexSet, JoinSide, JoinType, Result, +}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_physical_expr_common::utils::ExprPropertiesNode; -use indexmap::{IndexMap, IndexSet}; use itertools::Itertools; /// A `EquivalenceProperties` object stores information known about the output diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index 8b130506cdea7..3070eb2b3f3d1 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -28,14 +28,12 @@ use arrow::compute::SortOptions; use arrow::datatypes::Field; use arrow::record_batch::RecordBatch; use datafusion_common::utils::compare_rows; -use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue}; +use datafusion_common::{internal_err, DataFusionError, IndexMap, Result, ScalarValue}; use datafusion_expr::window_state::{ PartitionBatchState, WindowAggState, WindowFrameContext, WindowFrameStateGroups, }; use datafusion_expr::{Accumulator, PartitionEvaluator, WindowFrame, WindowFrameBound}; -use indexmap::IndexMap; - /// Common trait for [window function] implementations /// /// # Aggregate Window Expressions diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index bb0e21fdfd158..7dd69e69eb392 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -57,7 +57,6 @@ datafusion-physical-expr-common = { workspace = true } futures = { workspace = true } half = { workspace = true } hashbrown = { workspace = true } -indexmap = { workspace = true } itertools = { workspace = true, features = ["use_std"] } log = { workspace = true } once_cell = "1.18.0" diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index 58bc7bb90a888..e4a7eb049e9eb 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -19,7 +19,7 @@ use arrow::record_batch::RecordBatch; use arrow_array::types::{ - Date32Type, Date64Type, Time32MillisecondType, Time32SecondType, + Date32Type, Date64Type, Decimal128Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, }; @@ -170,6 +170,9 @@ pub(crate) fn new_group_values( TimeUnit::Microsecond => downcast_helper!(TimestampMicrosecondType, d), TimeUnit::Nanosecond => downcast_helper!(TimestampNanosecondType, d), }, + DataType::Decimal128(_, _) => { + downcast_helper!(Decimal128Type, d); + } DataType::Utf8 => { return Ok(Box::new(GroupValuesByes::::new(OutputType::Utf8))); } diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index 89041eb0f04ec..034fb86d0693c 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -31,8 +31,8 @@ use crate::aggregates::group_values::GroupValues; use ahash::RandomState; use arrow::compute::cast; use arrow::datatypes::{ - BinaryViewType, Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, - Int32Type, Int64Type, Int8Type, StringViewType, Time32MillisecondType, + BinaryViewType, Date32Type, Date64Type, Decimal128Type, Float32Type, Float64Type, + Int16Type, Int32Type, Int64Type, Int8Type, StringViewType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, @@ -1008,6 +1008,14 @@ impl GroupValues for GroupValuesColumn { ) } }, + &DataType::Decimal128(_, _) => { + instantiate_primitive! { + v, + nullable, + Decimal128Type, + data_type + } + } &DataType::Utf8 => { let b = ByteGroupValueBuilder::::new(OutputType::Utf8); v.push(Box::new(b) as _) @@ -1214,6 +1222,7 @@ fn supported_type(data_type: &DataType) -> bool { | DataType::UInt64 | DataType::Float32 | DataType::Float64 + | DataType::Decimal128(_, _) | DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary @@ -1309,7 +1318,7 @@ mod tests { // 6. Only decrease group indices in non-inlined group index view // 7. Erase all things - let field = Field::new("item", DataType::Int32, true); + let field = Field::new_list_field(DataType::Int32, true); let schema = Arc::new(Schema::new_with_metadata(vec![field], HashMap::new())); let mut group_values = GroupValuesColumn::::try_new(schema).unwrap(); diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs index 4686a78f24b00..4ceeb634bad2e 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs @@ -200,10 +200,10 @@ impl GroupColumn let first_n_nulls = if NULLABLE { self.nulls.take_n(n) } else { None }; - Arc::new(PrimitiveArray::::new( - ScalarBuffer::from(first_n), - first_n_nulls, - )) + Arc::new( + PrimitiveArray::::new(ScalarBuffer::from(first_n), first_n_nulls) + .with_data_type(self.data_type.clone()), + ) } } diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs index 05214ec10d68b..85cd2e79b9363 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs @@ -208,6 +208,7 @@ where build_primitive(split, null_group) } }; + Ok(vec![Arc::new(array.with_data_type(self.data_type.clone()))]) } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 2e97334493dd6..c2d8b093a9234 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1344,7 +1344,7 @@ mod tests { Field::new("a", DataType::Int32, true), Field::new( "b", - DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))), true, ), ])); @@ -1389,7 +1389,7 @@ mod tests { assert_eq!(DataType::Int32, *sort_exec.schema().field(0).data_type()); assert_eq!( - DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))), *sort_exec.schema().field(1).data_type() ); diff --git a/datafusion/physical-plan/src/spill.rs b/datafusion/physical-plan/src/spill.rs index dee2d09862d36..f20adb0d2fab9 100644 --- a/datafusion/physical-plan/src/spill.rs +++ b/datafusion/physical-plan/src/spill.rs @@ -357,12 +357,12 @@ mod tests { let schema = Arc::new(Schema::new(vec![ Field::new( "nested_int", - DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))), false, ), Field::new( "nested_int2", - DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))), false, ), ])); diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 0615e6738a1fc..9f03385f09d1d 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -991,7 +991,7 @@ mod tests { offsets.push(OffsetSize::from_usize(values.len()).unwrap()); valid.append(true); - let field = Arc::new(Field::new("item", DataType::Utf8, true)); + let field = Arc::new(Field::new_list_field(DataType::Utf8, true)); GenericListArray::::new( field, OffsetBuffer::new(offsets.into()), @@ -1017,7 +1017,7 @@ mod tests { None, None, ])); - let field = Arc::new(Field::new("item", DataType::Utf8, true)); + let field = Arc::new(Field::new_list_field(DataType::Utf8, true)); let valid = NullBuffer::from(vec![true, false, true, false, true, true]); FixedSizeListArray::new(field, 2, values, Some(valid)) } @@ -1100,7 +1100,7 @@ mod tests { let out_schema = Arc::new(Schema::new(vec![ Field::new( "col1_unnest_placeholder_depth_1", - DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))), true, ), Field::new("col1_unnest_placeholder_depth_2", DataType::Int32, true), diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 398b5eb292d7b..287ef6d767ef0 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -52,7 +52,7 @@ use datafusion_common::utils::{ evaluate_partition_ranges, get_at_indices, get_row_at_idx, }; use datafusion_common::{ - arrow_datafusion_err, exec_err, DataFusionError, HashMap, Result, + arrow_datafusion_err, exec_err, DataFusionError, HashMap, IndexMap, Result, }; use datafusion_execution::TaskContext; use datafusion_expr::window_state::{PartitionBatchState, WindowAggState}; @@ -65,7 +65,6 @@ use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use futures::stream::Stream; use futures::{ready, StreamExt}; use hashbrown::raw::RawTable; -use indexmap::IndexMap; use log::debug; /// Window execution plan diff --git a/datafusion/proto-common/Cargo.toml b/datafusion/proto-common/Cargo.toml index 6c53e1b1ced0c..0aacc6d9938cf 100644 --- a/datafusion/proto-common/Cargo.toml +++ b/datafusion/proto-common/Cargo.toml @@ -26,7 +26,7 @@ homepage = { workspace = true } repository = { workspace = true } license = { workspace = true } authors = { workspace = true } -rust-version = "1.79" +rust-version = "1.80" # Exclude proto files so crates.io consumers don't need protoc exclude = ["*.proto"] diff --git a/datafusion/proto-common/gen/Cargo.toml b/datafusion/proto-common/gen/Cargo.toml index 6e5783f467a70..29f238903f40a 100644 --- a/datafusion/proto-common/gen/Cargo.toml +++ b/datafusion/proto-common/gen/Cargo.toml @@ -20,7 +20,7 @@ name = "gen-common" description = "Code generation for proto" version = "0.1.0" edition = { workspace = true } -rust-version = "1.79" +rust-version = "1.80" authors = { workspace = true } homepage = { workspace = true } repository = { workspace = true } diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index 9e4b331a01bfa..da28cad93ab68 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -27,7 +27,7 @@ repository = { workspace = true } license = { workspace = true } authors = { workspace = true } # Specify MSRV here as `cargo msrv` doesn't support workspace version -rust-version = "1.79" +rust-version = "1.80" # Exclude proto files so crates.io consumers don't need protoc exclude = ["*.proto"] diff --git a/datafusion/proto/gen/Cargo.toml b/datafusion/proto/gen/Cargo.toml index aee8fac4a1209..dc728489523de 100644 --- a/datafusion/proto/gen/Cargo.toml +++ b/datafusion/proto/gen/Cargo.toml @@ -20,7 +20,7 @@ name = "gen" description = "Code generation for proto" version = "0.1.0" edition = { workspace = true } -rust-version = "1.79" +rust-version = "1.80" authors = { workspace = true } homepage = { workspace = true } repository = { workspace = true } diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index cfb6862a0ca3e..8c150b20dd009 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -787,7 +787,7 @@ async fn roundtrip_logical_plan_unnest() -> Result<()> { Field::new("a", DataType::Int64, true), Field::new( "b", - DataType::List(Arc::new(Field::new("item", DataType::Int32, false))), + DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))), true, ), ]); @@ -1611,7 +1611,7 @@ fn round_trip_scalar_types() { ]; for test_case in should_pass.into_iter() { - let field = Field::new("item", test_case, true); + let field = Field::new_list_field(test_case, true); let proto: protobuf::Field = (&field).try_into().unwrap(); let roundtrip: Field = (&proto).try_into().unwrap(); assert_eq!(format!("{field:?}"), format!("{roundtrip:?}")); diff --git a/datafusion/sql/Cargo.toml b/datafusion/sql/Cargo.toml index 01bf920438162..b3a28fc9d5613 100644 --- a/datafusion/sql/Cargo.toml +++ b/datafusion/sql/Cargo.toml @@ -46,7 +46,6 @@ arrow-array = { workspace = true } arrow-schema = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } -indexmap = { workspace = true } log = { workspace = true } recursive = { workspace = true } regex = { workspace = true } diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 39b6eb6e81327..002185adafd63 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -25,7 +25,7 @@ use crate::utils::{ }; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; -use datafusion_common::{not_impl_err, plan_err, Result}; +use datafusion_common::{not_impl_err, plan_err, IndexMap, Result}; use datafusion_common::{RecursionUnnestOption, UnnestOptions}; use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions}; use datafusion_expr::expr_rewriter::{ @@ -38,7 +38,6 @@ use datafusion_expr::{ qualified_wildcard_with_options, wildcard_with_options, Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning, }; -use indexmap::IndexMap; use sqlparser::ast::{ Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr, WildcardAdditionalOptions, WindowType, diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 81e47ed939f22..eaae4fe73d8cd 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -276,9 +276,11 @@ impl Unparser<'_> { ) -> Result<()> { match plan { LogicalPlan::TableScan(scan) => { - if let Some(unparsed_table_scan) = - Self::unparse_table_scan_pushdown(plan, None)? - { + if let Some(unparsed_table_scan) = Self::unparse_table_scan_pushdown( + plan, + None, + select.already_projected(), + )? { return self.select_to_sql_recursively( &unparsed_table_scan, query, @@ -585,6 +587,7 @@ impl Unparser<'_> { let unparsed_table_scan = Self::unparse_table_scan_pushdown( plan, Some(plan_alias.alias.clone()), + select.already_projected(), )?; // if the child plan is a TableScan with pushdown operations, we don't need to // create an additional subquery for it @@ -714,6 +717,7 @@ impl Unparser<'_> { fn unparse_table_scan_pushdown( plan: &LogicalPlan, alias: Option, + already_projected: bool, ) -> Result> { match plan { LogicalPlan::TableScan(table_scan) => { @@ -743,24 +747,29 @@ impl Unparser<'_> { } } - if let Some(project_vec) = &table_scan.projection { - let project_columns = project_vec - .iter() - .cloned() - .map(|i| { - let schema = table_scan.source.schema(); - let field = schema.field(i); - if alias.is_some() { - Column::new(alias.clone(), field.name().clone()) - } else { - Column::new( - Some(table_scan.table_name.clone()), - field.name().clone(), - ) - } - }) - .collect::>(); - builder = builder.project(project_columns)?; + // Avoid creating a duplicate Projection node, which would result in an additional subquery if a projection already exists. + // For example, if the `optimize_projection` rule is applied, there will be a Projection node, and duplicate projection + // information included in the TableScan node. + if !already_projected { + if let Some(project_vec) = &table_scan.projection { + let project_columns = project_vec + .iter() + .cloned() + .map(|i| { + let schema = table_scan.source.schema(); + let field = schema.field(i); + if alias.is_some() { + Column::new(alias.clone(), field.name().clone()) + } else { + Column::new( + Some(table_scan.table_name.clone()), + field.name().clone(), + ) + } + }) + .collect::>(); + builder = builder.project(project_columns)?; + } } let filter_expr: Result> = table_scan @@ -805,14 +814,17 @@ impl Unparser<'_> { Self::unparse_table_scan_pushdown( &subquery_alias.input, Some(subquery_alias.alias.clone()), + already_projected, ) } // SubqueryAlias could be rewritten to a plan with a projection as the top node by [rewrite::subquery_alias_inner_query_and_columns]. // The inner table scan could be a scan with pushdown operations. LogicalPlan::Projection(projection) => { - if let Some(plan) = - Self::unparse_table_scan_pushdown(&projection.input, alias.clone())? - { + if let Some(plan) = Self::unparse_table_scan_pushdown( + &projection.input, + alias.clone(), + already_projected, + )? { let exprs = if alias.is_some() { let mut alias_rewriter = alias.as_ref().map(|alias_name| TableAliasRewriter { diff --git a/datafusion/sql/src/unparser/utils.rs b/datafusion/sql/src/unparser/utils.rs index c77af79243849..c5aed2929e504 100644 --- a/datafusion/sql/src/unparser/utils.rs +++ b/datafusion/sql/src/unparser/utils.rs @@ -20,13 +20,12 @@ use std::{cmp::Ordering, sync::Arc, vec}; use datafusion_common::{ internal_err, tree_node::{Transformed, TransformedResult, TreeNode}, - Column, DataFusionError, Result, ScalarValue, + Column, DataFusionError, IndexSet, Result, ScalarValue, }; use datafusion_expr::{ expr, utils::grouping_set_to_exprlist, Aggregate, Expr, LogicalPlan, LogicalPlanBuilder, Projection, SortExpr, Unnest, Window, }; -use indexmap::IndexSet; use sqlparser::ast; use super::{ diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index 2b3ee8be3ccd4..c220ab5c10cc7 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -27,7 +27,7 @@ use datafusion_common::tree_node::{ }; use datafusion_common::{ exec_err, internal_err, plan_err, Column, DFSchemaRef, DataFusionError, HashMap, - Result, ScalarValue, + IndexMap, Result, ScalarValue, }; use datafusion_expr::builder::get_struct_unnested_columns; use datafusion_expr::expr::{Alias, GroupingSet, Unnest, WindowFunction}; @@ -35,7 +35,6 @@ use datafusion_expr::utils::{expr_as_column_expr, find_column_exprs}; use datafusion_expr::{ col, expr_vec_fmt, ColumnUnnestList, Expr, ExprSchemable, LogicalPlan, }; -use indexmap::IndexMap; use sqlparser::ast::{Ident, Value}; /// Make a best-effort attempt at resolving all columns in the expression tree @@ -618,13 +617,12 @@ mod tests { use arrow::datatypes::{DataType as ArrowDataType, Field, Schema}; use arrow_schema::Fields; - use datafusion_common::{Column, DFSchema, Result}; + use datafusion_common::{Column, DFSchema, IndexMap, Result}; use datafusion_expr::{ col, lit, unnest, ColumnUnnestList, EmptyRelation, LogicalPlan, }; use datafusion_functions::core::expr_ext::FieldAccessor; use datafusion_functions_aggregate::expr_fn::count; - use indexmap::IndexMap; use crate::utils::{resolve_positions_to_exprs, rewrite_recursive_unnest_bottom_up}; @@ -763,8 +761,7 @@ mod tests { ), Field::new( "array_col", - ArrowDataType::List(Arc::new(Field::new( - "item", + ArrowDataType::List(Arc::new(Field::new_list_field( ArrowDataType::Int64, true, ))), diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 8e89323204a31..fcfee29f6ac9d 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -601,7 +601,7 @@ fn test_aggregation_without_projection() -> Result<()> { assert_eq!( actual, - r#"SELECT sum(users.age), users."name" FROM (SELECT users."name", users.age FROM users) GROUP BY users."name""# + r#"SELECT sum(users.age), users."name" FROM users GROUP BY users."name""# ); Ok(()) @@ -926,12 +926,25 @@ fn test_table_scan_pushdown() -> Result<()> { let query_from_table_scan_with_projection = LogicalPlanBuilder::from( table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?, ) - .project(vec![wildcard()])? + .project(vec![col("id"), col("age")])? .build()?; let query_from_table_scan_with_projection = plan_to_sql(&query_from_table_scan_with_projection)?; assert_eq!( query_from_table_scan_with_projection.to_string(), + "SELECT t1.id, t1.age FROM t1" + ); + + let query_from_table_scan_with_two_projections = LogicalPlanBuilder::from( + table_scan(Some("t1"), &schema, Some(vec![0, 1]))?.build()?, + ) + .project(vec![col("id"), col("age")])? + .project(vec![wildcard()])? + .build()?; + let query_from_table_scan_with_two_projections = + plan_to_sql(&query_from_table_scan_with_two_projections)?; + assert_eq!( + query_from_table_scan_with_two_projections.to_string(), "SELECT * FROM (SELECT t1.id, t1.age FROM t1)" ); diff --git a/datafusion/sql/tests/common/mod.rs b/datafusion/sql/tests/common/mod.rs index 63c296dfbc2f9..337fc2ce4f678 100644 --- a/datafusion/sql/tests/common/mod.rs +++ b/datafusion/sql/tests/common/mod.rs @@ -162,12 +162,18 @@ impl ContextProvider for MockContextProvider { "array" => Ok(Schema::new(vec![ Field::new( "left", - DataType::List(Arc::new(Field::new("item", DataType::Int64, true))), + DataType::List(Arc::new(Field::new_list_field( + DataType::Int64, + true, + ))), false, ), Field::new( "right", - DataType::List(Arc::new(Field::new("item", DataType::Int64, true))), + DataType::List(Arc::new(Field::new_list_field( + DataType::Int64, + true, + ))), false, ), ])), @@ -198,7 +204,10 @@ impl ContextProvider for MockContextProvider { "unnest_table" => Ok(Schema::new(vec![ Field::new( "array_col", - DataType::List(Arc::new(Field::new("item", DataType::Int64, true))), + DataType::List(Arc::new(Field::new_list_field( + DataType::Int64, + true, + ))), false, ), Field::new( diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 917e037682f24..e76c1466a5476 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -76,26 +76,26 @@ statement error DataFusion error: Schema error: Schema contains duplicate unqual SELECT approx_distinct(c9) count_c9, approx_distinct(cast(c9 as varchar)) count_c9_str FROM aggregate_test_100 # csv_query_approx_percentile_cont_with_weight -statement error DataFusion error: Error during planning: Error during planning: Coercion from \[Utf8, Int8, Float64\] to the signature OneOf(.*) failed(.|\n)* +statement error DataFusion error: Error during planning: Error during planning: Failed to coerce arguments to satisfy a call to approx_percentile_cont_with_weight function: coercion from \[Utf8, Int8, Float64\] to the signature OneOf(.*) failed(.|\n)* SELECT approx_percentile_cont_with_weight(c1, c2, 0.95) FROM aggregate_test_100 -statement error DataFusion error: Error during planning: Error during planning: Coercion from \[Int16, Utf8, Float64\] to the signature OneOf(.*) failed(.|\n)* +statement error DataFusion error: Error during planning: Error during planning: Failed to coerce arguments to satisfy a call to approx_percentile_cont_with_weight function: coercion from \[Int16, Utf8, Float64\] to the signature OneOf(.*) failed(.|\n)* SELECT approx_percentile_cont_with_weight(c3, c1, 0.95) FROM aggregate_test_100 -statement error DataFusion error: Error during planning: Error during planning: Coercion from \[Int16, Int8, Utf8\] to the signature OneOf(.*) failed(.|\n)* +statement error DataFusion error: Error during planning: Error during planning: Failed to coerce arguments to satisfy a call to approx_percentile_cont_with_weight function: coercion from \[Int16, Int8, Utf8\] to the signature OneOf(.*) failed(.|\n)* SELECT approx_percentile_cont_with_weight(c3, c2, c1) FROM aggregate_test_100 # csv_query_approx_percentile_cont_with_histogram_bins statement error DataFusion error: External error: This feature is not implemented: Tdigest max_size value for 'APPROX_PERCENTILE_CONT' must be UInt > 0 literal \(got data type Int64\)\. SELECT c1, approx_percentile_cont(c3, 0.95, -1000) AS c3_p95 FROM aggregate_test_100 GROUP BY 1 ORDER BY 1 -statement error DataFusion error: Error during planning: Error during planning: Coercion from \[Int16, Float64, Utf8\] to the signature OneOf(.*) failed(.|\n)* +statement error DataFusion error: Error during planning: Error during planning: Failed to coerce arguments to satisfy a call to approx_percentile_cont function: coercion from \[Int16, Float64, Utf8\] to the signature OneOf(.*) failed(.|\n)* SELECT approx_percentile_cont(c3, 0.95, c1) FROM aggregate_test_100 -statement error DataFusion error: Error during planning: Error during planning: Coercion from \[Int16, Float64, Float64\] to the signature OneOf(.*) failed(.|\n)* +statement error DataFusion error: Error during planning: Error during planning: Failed to coerce arguments to satisfy a call to approx_percentile_cont function: coercion from \[Int16, Float64, Float64\] to the signature OneOf(.*) failed(.|\n)* SELECT approx_percentile_cont(c3, 0.95, 111.1) FROM aggregate_test_100 -statement error DataFusion error: Error during planning: Error during planning: Coercion from \[Float64, Float64, Float64\] to the signature OneOf(.*) failed(.|\n)* +statement error DataFusion error: Error during planning: Error during planning: Failed to coerce arguments to satisfy a call to approx_percentile_cont function: coercion from \[Float64, Float64, Float64\] to the signature OneOf(.*) failed(.|\n)* SELECT approx_percentile_cont(c12, 0.95, 111.1) FROM aggregate_test_100 statement error DataFusion error: This feature is not implemented: Percentile value for 'APPROX_PERCENTILE_CONT' must be a literal diff --git a/datafusion/sqllogictest/test_files/errors.slt b/datafusion/sqllogictest/test_files/errors.slt index b99bbf3dc0780..14d5678e507c0 100644 --- a/datafusion/sqllogictest/test_files/errors.slt +++ b/datafusion/sqllogictest/test_files/errors.slt @@ -108,11 +108,11 @@ query error select avg(c1, c12) from aggregate_test_100; # AggregateFunction with wrong argument type -statement error Coercion +statement error DataFusion error: Error during planning: Error during planning: Failed to coerce arguments to satisfy a call to regr_slope function: coercion from select regr_slope(1, '2'); # WindowFunction using AggregateFunction wrong signature -statement error Coercion +statement error DataFusion error: Error during planning: Error during planning: Failed to coerce arguments to satisfy a call to regr_slope function: coercion from select c9, regr_slope(c11, '2') over () as min1 @@ -120,7 +120,7 @@ from aggregate_test_100 order by c9 # WindowFunction wrong signature -statement error DataFusion error: Error during planning: Error during planning: Coercion from \[Int32, Int64, Int64\] to the signature OneOf\(\[Any\(0\), Any\(1\), Any\(2\)\]\) failed +statement error DataFusion error: Error during planning: Error during planning: Failed to coerce arguments to satisfy a call to nth_value function: coercion from \[Int32, Int64, Int64\] to the signature OneOf\(\[Any\(0\), Any\(1\), Any\(2\)\]\) failed select c9, nth_value(c5, 2, 3) over (order by c9) as nv1 diff --git a/datafusion/sqllogictest/test_files/expr.slt b/datafusion/sqllogictest/test_files/expr.slt index 499d279515c36..9b8dfc2186be5 100644 --- a/datafusion/sqllogictest/test_files/expr.slt +++ b/datafusion/sqllogictest/test_files/expr.slt @@ -560,7 +560,7 @@ select repeat('-1.2', arrow_cast(3, 'Int32')); ---- -1.2-1.2-1.2 -query error DataFusion error: Error during planning: Error during planning: Coercion from \[Utf8, Float64\] to the signature +query error DataFusion error: Error during planning: Error during planning: Failed to coerce arguments to satisfy a call to repeat function: coercion from \[Utf8, Float64\] to the signature select repeat('-1.2', 3.2); query T diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index 4acf519c5de42..df7e21c2da447 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -5499,3 +5499,42 @@ SELECT GROUP BY ts, text ---- foo 2024-01-01T08:00:00+08:00 + +# Test multi group by int + Decimal128 +statement ok +create table source as values +(1, '123.45'), +(1, '123.45'), +(2, '678.90'), +(2, '1011.12'), +(3, '1314.15'), +(3, '1314.15'), +(2, '1011.12'), +(null, null), +(null, '123.45'), +(null, null), +(null, '123.45'), +(2, '678.90'), +(2, '678.90'), +(1, null) +; + +statement ok +create view t as select column1 as a, arrow_cast(column2, 'Decimal128(10, 2)') as b from source; + +query IRI +select a, b, count(*) from t group by a, b order by a, b; +---- +1 123.45 2 +1 NULL 1 +2 678.9 3 +2 1011.12 2 +3 1314.15 2 +NULL 123.45 2 +NULL NULL 2 + +statement ok +drop view t + +statement ok +drop table source; diff --git a/datafusion/substrait/Cargo.toml b/datafusion/substrait/Cargo.toml index 79e5a3206cad7..32c69727a25b7 100644 --- a/datafusion/substrait/Cargo.toml +++ b/datafusion/substrait/Cargo.toml @@ -26,7 +26,7 @@ repository = { workspace = true } license = { workspace = true } authors = { workspace = true } # Specify MSRV here as `cargo msrv` doesn't support workspace version -rust-version = "1.79" +rust-version = "1.80" [lints] workspace = true @@ -36,7 +36,7 @@ arrow-buffer = { workspace = true } async-recursion = "1.0" async-trait = { workspace = true } chrono = { workspace = true } -datafusion = { workspace = true, default-features = true } +datafusion = { workspace = true } itertools = { workspace = true } object_store = { workspace = true } pbjson-types = "0.7" @@ -51,4 +51,6 @@ serde_json = "1.0" tokio = { workspace = true } [features] +default = ["physical"] +physical = ["datafusion/parquet"] protoc = ["substrait/protoc"] diff --git a/datafusion/substrait/src/lib.rs b/datafusion/substrait/src/lib.rs index 1389cac75b99c..f33e86a2d20c0 100644 --- a/datafusion/substrait/src/lib.rs +++ b/datafusion/substrait/src/lib.rs @@ -75,6 +75,7 @@ //! ``` pub mod extensions; pub mod logical_plan; +#[cfg(feature = "physical")] pub mod physical_plan; pub mod serializer; pub mod variation_const; diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index c432a144127c7..26d71c7fd3e24 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -2430,7 +2430,7 @@ mod test { "struct", DataType::Struct(Fields::from(vec![Field::new( "inner", - DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))), + DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))), true, )])), true, diff --git a/datafusion/substrait/tests/cases/mod.rs b/datafusion/substrait/tests/cases/mod.rs index b1f4b95df66f1..777246e4139bf 100644 --- a/datafusion/substrait/tests/cases/mod.rs +++ b/datafusion/substrait/tests/cases/mod.rs @@ -20,6 +20,7 @@ mod emit_kind_tests; mod function_test; mod logical_plans; mod roundtrip_logical_plan; +#[cfg(feature = "physical")] mod roundtrip_physical_plan; mod serialize; mod substrait_validations; diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index 0141e6a08647b..f836dea03c61a 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -1453,10 +1453,14 @@ async fn create_all_type_context() -> Result { Field::new("utf8_col", DataType::Utf8, true), Field::new("large_utf8_col", DataType::LargeUtf8, true), Field::new("view_utf8_col", DataType::Utf8View, true), - Field::new_list("list_col", Field::new("item", DataType::Int64, true), true), + Field::new_list( + "list_col", + Field::new_list_field(DataType::Int64, true), + true, + ), Field::new_list( "large_list_col", - Field::new("item", DataType::Int64, true), + Field::new_list_field(DataType::Int64, true), true, ), Field::new("decimal_128_col", DataType::Decimal128(10, 2), true),