diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 125688a38b00..1b21783bb1ce 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -39,6 +39,7 @@ use crate::physical_plan::{ }; use arrow::compute::sum; +use arrow_schema::DataType; use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions}; use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; use datafusion_common::parsers::CompressionTypeVariant; @@ -519,6 +520,8 @@ pub fn statistics_from_parquet_meta_calc( file_metadata.key_value_metadata(), )?; + let file_schema = coercion_utf8_to_utf8view(file_schema, table_schema.clone())?; + statistics.column_statistics = if has_statistics { let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema); let mut null_counts_array = @@ -566,6 +569,28 @@ pub fn statistics_from_parquet_meta_calc( Ok(statistics) } +/// Convert utf8 to utf8view if the table schema has utf8view and the file schema has utf8 +fn coercion_utf8_to_utf8view( + file_schema: Schema, + table_schema: SchemaRef, +) -> Result { + let mut fields = Vec::with_capacity(file_schema.fields().len()); + for field in file_schema.fields() { + let field_name = field.name(); + let table_field = table_schema.field_with_name(field_name)?; + if matches!(field.data_type(), DataType::Utf8) { + if matches!(table_field.data_type(), DataType::Utf8View) { + fields.push(table_field.clone()); + } else { + fields.push(Arc::unwrap_or_clone(field.clone())); + } + } else { + fields.push(Arc::unwrap_or_clone(field.clone())); + } + } + Ok(Arc::new(Schema::new(fields))) +} + /// Deprecated /// Use [`statistics_from_parquet_meta_calc`] instead. /// This method was deprecated because it didn't need to be async so a new method was created @@ -1238,7 +1263,7 @@ mod tests { use arrow_schema::{DataType, Field}; use async_trait::async_trait; use datafusion_common::cast::{ - as_binary_array, as_boolean_array, as_float32_array, as_float64_array, + as_binary_view_array, as_boolean_array, as_float32_array, as_float64_array, as_int32_array, as_timestamp_nanosecond_array, }; use datafusion_common::config::ParquetOptions; @@ -1591,7 +1616,7 @@ mod tests { let schema = format.infer_schema(&state, &store, &files).await.unwrap(); let null_i64 = ScalarValue::Int64(None); - let null_utf8 = ScalarValue::Utf8(None); + let null_utf8 = ScalarValue::Utf8View(None); // Fetch statistics for first file let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[0], None).await?; @@ -1603,11 +1628,11 @@ mod tests { assert_eq!(c1_stats.null_count, Precision::Exact(1)); assert_eq!( c1_stats.max_value, - Precision::Exact(ScalarValue::Utf8(Some("bar".to_string()))) + Precision::Exact(ScalarValue::Utf8View(Some("bar".to_string()))) ); assert_eq!( c1_stats.min_value, - Precision::Exact(ScalarValue::Utf8(Some("Foo".to_string()))) + Precision::Exact(ScalarValue::Utf8View(Some("Foo".to_string()))) ); // column c2: missing from the file so the table treats all 3 rows as null let c2_stats = &stats.column_statistics[1]; @@ -1731,8 +1756,8 @@ mod tests { bigint_col: Int64\n\ float_col: Float32\n\ double_col: Float64\n\ - date_string_col: Binary\n\ - string_col: Binary\n\ + date_string_col: BinaryView\n\ + string_col: BinaryView\n\ timestamp_col: Timestamp(Nanosecond, None)", y ); @@ -1888,7 +1913,7 @@ mod tests { assert_eq!(1, batches[0].num_columns()); assert_eq!(8, batches[0].num_rows()); - let array = as_binary_array(batches[0].column(0))?; + let array = as_binary_view_array(batches[0].column(0))?; let mut values: Vec<&str> = vec![]; for i in 0..batches[0].num_rows() { values.push(std::str::from_utf8(array.value(i)).unwrap());