Skip to content

Commit

Permalink
try to coercion utf8 to utf8view
Browse files Browse the repository at this point in the history
  • Loading branch information
goldmedal committed Aug 27, 2024
1 parent 27f7d1e commit c7d925a
Showing 1 changed file with 32 additions and 7 deletions.
39 changes: 32 additions & 7 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::physical_plan::{
};

use arrow::compute::sum;
use arrow_schema::DataType;
use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions};
use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
use datafusion_common::parsers::CompressionTypeVariant;
Expand Down Expand Up @@ -519,6 +520,8 @@ pub fn statistics_from_parquet_meta_calc(
file_metadata.key_value_metadata(),
)?;

let file_schema = coercion_utf8_to_utf8view(file_schema, table_schema.clone())?;

statistics.column_statistics = if has_statistics {
let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema);
let mut null_counts_array =
Expand Down Expand Up @@ -566,6 +569,28 @@ pub fn statistics_from_parquet_meta_calc(
Ok(statistics)
}

/// Convert utf8 to utf8view if the table schema has utf8view and the file schema has utf8
fn coercion_utf8_to_utf8view(
file_schema: Schema,
table_schema: SchemaRef,
) -> Result<SchemaRef> {
let mut fields = Vec::with_capacity(file_schema.fields().len());
for field in file_schema.fields() {
let field_name = field.name();
let table_field = table_schema.field_with_name(field_name)?;
if matches!(field.data_type(), DataType::Utf8) {
if matches!(table_field.data_type(), DataType::Utf8View) {
fields.push(table_field.clone());
} else {
fields.push(Arc::unwrap_or_clone(field.clone()));
}
} else {
fields.push(Arc::unwrap_or_clone(field.clone()));
}
}
Ok(Arc::new(Schema::new(fields)))
}

/// Deprecated
/// Use [`statistics_from_parquet_meta_calc`] instead.
/// This method was deprecated because it didn't need to be async so a new method was created
Expand Down Expand Up @@ -1238,7 +1263,7 @@ mod tests {
use arrow_schema::{DataType, Field};
use async_trait::async_trait;
use datafusion_common::cast::{
as_binary_array, as_boolean_array, as_float32_array, as_float64_array,
as_binary_view_array, as_boolean_array, as_float32_array, as_float64_array,
as_int32_array, as_timestamp_nanosecond_array,
};
use datafusion_common::config::ParquetOptions;
Expand Down Expand Up @@ -1591,7 +1616,7 @@ mod tests {
let schema = format.infer_schema(&state, &store, &files).await.unwrap();

let null_i64 = ScalarValue::Int64(None);
let null_utf8 = ScalarValue::Utf8(None);
let null_utf8 = ScalarValue::Utf8View(None);

// Fetch statistics for first file
let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[0], None).await?;
Expand All @@ -1603,11 +1628,11 @@ mod tests {
assert_eq!(c1_stats.null_count, Precision::Exact(1));
assert_eq!(
c1_stats.max_value,
Precision::Exact(ScalarValue::Utf8(Some("bar".to_string())))
Precision::Exact(ScalarValue::Utf8View(Some("bar".to_string())))
);
assert_eq!(
c1_stats.min_value,
Precision::Exact(ScalarValue::Utf8(Some("Foo".to_string())))
Precision::Exact(ScalarValue::Utf8View(Some("Foo".to_string())))
);
// column c2: missing from the file so the table treats all 3 rows as null
let c2_stats = &stats.column_statistics[1];
Expand Down Expand Up @@ -1731,8 +1756,8 @@ mod tests {
bigint_col: Int64\n\
float_col: Float32\n\
double_col: Float64\n\
date_string_col: Binary\n\
string_col: Binary\n\
date_string_col: BinaryView\n\
string_col: BinaryView\n\
timestamp_col: Timestamp(Nanosecond, None)",
y
);
Expand Down Expand Up @@ -1888,7 +1913,7 @@ mod tests {
assert_eq!(1, batches[0].num_columns());
assert_eq!(8, batches[0].num_rows());

let array = as_binary_array(batches[0].column(0))?;
let array = as_binary_view_array(batches[0].column(0))?;
let mut values: Vec<&str> = vec![];
for i in 0..batches[0].num_rows() {
values.push(std::str::from_utf8(array.value(i)).unwrap());
Expand Down

0 comments on commit c7d925a

Please sign in to comment.