From 45e78412b65c9f0e981555100f46e339c3a89877 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sat, 4 Nov 2023 15:58:50 -0400 Subject: [PATCH] fix: delta scan partition ordering bug (#1789) # Description Sometimes the order of partition columns in our delta schema does not match the order of partition columns in the deltatable metadata. This would cause `DeltaScan` to provide incorrect values for partition columns. This is fixed by having `DeltaScan` use the metadata as the source of truth. # Related Issue(s) - closes #1787 --- .../src/delta_datafusion/mod.rs | 111 ++++++++++++++---- 1 file changed, 91 insertions(+), 20 deletions(-) diff --git a/crates/deltalake-core/src/delta_datafusion/mod.rs b/crates/deltalake-core/src/delta_datafusion/mod.rs index 125cf327f7..7fbe362afc 100644 --- a/crates/deltalake-core/src/delta_datafusion/mod.rs +++ b/crates/deltalake-core/src/delta_datafusion/mod.rs @@ -586,8 +586,14 @@ impl<'a> DeltaScanBuilder<'a> { // However we may want to do some additional balancing in case we are far off from the above. let mut file_groups: HashMap, Vec> = HashMap::new(); + let table_partition_cols = &self + .snapshot + .current_metadata() + .ok_or(DeltaTableError::NoMetadata)? + .partition_columns; + for action in files.iter() { - let mut part = partitioned_file_from_action(action, &schema); + let mut part = partitioned_file_from_action(action, table_partition_cols, &schema); if config.file_column_name.is_some() { part.partition_values @@ -602,13 +608,6 @@ impl<'a> DeltaScanBuilder<'a> { .push(part); } - let table_partition_cols = self - .snapshot - .current_metadata() - .ok_or(DeltaTableError::NoMetadata)? - .partition_columns - .clone(); - let file_schema = Arc::new(ArrowSchema::new( schema .fields() @@ -923,20 +922,30 @@ pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult PartitionedFile { - let partition_values = schema - .fields() + let partition_values = partition_columns .iter() - .filter_map(|f| { - action.partition_values.get(f.name()).map(|val| match val { - Some(value) => to_correct_scalar_value( - &serde_json::Value::String(value.to_string()), - f.data_type(), - ) - .unwrap_or(ScalarValue::Null), - None => get_null_of_arrow_type(f.data_type()).unwrap_or(ScalarValue::Null), - }) + .map(|part| { + action + .partition_values + .get(part) + .map(|val| { + schema + .field_with_name(part) + .map(|field| match val { + Some(value) => to_correct_scalar_value( + &serde_json::Value::String(value.to_string()), + field.data_type(), + ) + .unwrap_or(ScalarValue::Null), + None => get_null_of_arrow_type(field.data_type()) + .unwrap_or(ScalarValue::Null), + }) + .unwrap_or(ScalarValue::Null) + }) + .unwrap_or(ScalarValue::Null) }) .collect::>(); @@ -1618,6 +1627,7 @@ pub async fn find_files<'a>( #[cfg(test)] mod tests { + use crate::writer::test_utils::get_delta_schema; use arrow::array::StructArray; use arrow::datatypes::{DataType, Field, Schema}; use chrono::{TimeZone, Utc}; @@ -1797,7 +1807,8 @@ mod tests { Field::new("month", ArrowDataType::Int64, true), ]); - let file = partitioned_file_from_action(&action, &schema); + let part_columns = vec!["year".to_string(), "month".to_string()]; + let file = partitioned_file_from_action(&action, &part_columns, &schema); let ref_file = PartitionedFile { object_meta: object_store::ObjectMeta { location: Path::from("year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string()), @@ -1929,4 +1940,64 @@ mod tests { ]; assert_batches_sorted_eq!(&expected, &actual); } + + #[tokio::test] + async fn delta_scan_mixed_partition_order() { + // Tests issue (1787) where partition columns were incorrect when they + // have a different order in the metadata and table schema + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("modified", DataType::Utf8, true), + Field::new("id", DataType::Utf8, true), + Field::new("value", DataType::Int32, true), + ])); + + let table = crate::DeltaOps::new_in_memory() + .create() + .with_columns(get_delta_schema().get_fields().clone()) + .with_partition_columns(["modified", "id"]) + .await + .unwrap(); + assert_eq!(table.version(), 0); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(arrow::array::StringArray::from(vec![ + "2021-02-01", + "2021-02-01", + "2021-02-02", + "2021-02-02", + ])), + Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])), + Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])), + ], + ) + .unwrap(); + // write some data + let table = crate::DeltaOps(table) + .write(vec![batch.clone()]) + .with_save_mode(crate::protocol::SaveMode::Append) + .await + .unwrap(); + + let config = DeltaScanConfigBuilder::new().build(&table.state).unwrap(); + + let provider = DeltaTableProvider::try_new(table.state, table.storage, config).unwrap(); + let ctx = SessionContext::new(); + ctx.register_table("test", Arc::new(provider)).unwrap(); + + let df = ctx.sql("select * from test").await.unwrap(); + let actual = df.collect().await.unwrap(); + let expected = vec![ + "+-------+------------+----+", + "| value | modified | id |", + "+-------+------------+----+", + "| 1 | 2021-02-01 | A |", + "| 10 | 2021-02-01 | B |", + "| 100 | 2021-02-02 | D |", + "| 20 | 2021-02-02 | C |", + "+-------+------------+----+", + ]; + assert_batches_sorted_eq!(&expected, &actual); + } }