From 1ea1e4fc8cc658de87888558a8a606345e2cb749 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Thu, 26 Oct 2023 02:23:25 -0400 Subject: [PATCH 1/8] upgrade datafusion with git head --- Cargo.toml | 30 +- python/Cargo.toml | 2 +- rust/src/delta_datafusion/expr.rs | 4 + rust/src/delta_datafusion/mod.rs | 519 ++++++++------------------- rust/src/operations/mod.rs | 4 +- rust/tests/integration_datafusion.rs | 50 +-- 6 files changed, 192 insertions(+), 417 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d90700bdbe..048748578a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,23 +15,23 @@ debug = "line-tables-only" [workspace.dependencies] # arrow -arrow = { version = "47" } -arrow-array = { version = "47" } -arrow-buffer = { version = "47" } -arrow-cast = { version = "47" } -arrow-ord = { version = "47" } -arrow-row = { version = "47" } -arrow-schema = { version = "47" } -arrow-select = { version = "47" } -parquet = { version = "47" } +arrow = { version = "48" } +arrow-array = { version = "48" } +arrow-buffer = { version = "48" } +arrow-cast = { version = "48" } +arrow-ord = { version = "48" } +arrow-row = { version = "48" } +arrow-schema = { version = "48" } +arrow-select = { version = "48" } +parquet = { version = "48" } # datafusion -datafusion = { version = "32" } -datafusion-expr = { version = "32" } -datafusion-common = { version = "32" } -datafusion-proto = { version = "32" } -datafusion-sql = { version = "32" } -datafusion-physical-expr = { version = "32" } +datafusion = { git = "https://github.com/apache/arrow-datafusion" } +datafusion-expr = { git = "https://github.com/apache/arrow-datafusion" } +datafusion-common = { git = "https://github.com/apache/arrow-datafusion" } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion" } +datafusion-sql = { git = "https://github.com/apache/arrow-datafusion" } +datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion" } # serde serde = { version = "1", features = ["derive"] } diff --git a/python/Cargo.toml b/python/Cargo.toml index 38c2fcacbe..311aedfd0a 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -42,7 +42,7 @@ tokio = { workspace = true, features = ["rt-multi-thread"] } reqwest = { version = "*", features = ["native-tls-vendored"] } [dependencies.pyo3] -version = "0.19" +version = "0.20" features = ["extension-module", "abi3", "abi3-py37"] [dependencies.deltalake] diff --git a/rust/src/delta_datafusion/expr.rs b/rust/src/delta_datafusion/expr.rs index 815b01831f..0cd428adcb 100644 --- a/rust/src/delta_datafusion/expr.rs +++ b/rust/src/delta_datafusion/expr.rs @@ -70,6 +70,10 @@ impl<'a> ContextProvider for DeltaContextProvider<'a> { fn get_window_meta(&self, name: &str) -> Option> { self.state.window_functions().get(name).cloned() } + + fn get_table_source(&self, _name: TableReference) -> DFResult> { + unimplemented!() + } } /// Parse a string predicate into an `Expr` diff --git a/rust/src/delta_datafusion/mod.rs b/rust/src/delta_datafusion/mod.rs index 0d2df09e46..62900c9c9a 100644 --- a/rust/src/delta_datafusion/mod.rs +++ b/rust/src/delta_datafusion/mod.rs @@ -22,7 +22,6 @@ use std::any::Any; use std::collections::{HashMap, HashSet}; -use std::convert::TryFrom; use std::fmt::{self, Debug}; use std::sync::Arc; @@ -56,6 +55,7 @@ use datafusion::physical_plan::{ SendableRecordBatchStream, Statistics, }; use datafusion_common::scalar::ScalarValue; +use datafusion_common::stats::Precision; use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion}; use datafusion_common::{Column, DataFusionError, Result as DataFusionResult, ToDFSchema}; use datafusion_expr::expr::{ScalarFunction, ScalarUDF}; @@ -70,7 +70,7 @@ use serde::{Deserialize, Serialize}; use url::Url; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::protocol::{self, Add}; +use crate::protocol::{self, Add, ColumnCountStat, ColumnValueStat}; use crate::storage::ObjectStoreRef; use crate::table::builder::ensure_table_uri; use crate::table::state::DeltaTableState; @@ -107,149 +107,133 @@ impl From for DeltaTableError { impl DeltaTableState { /// Return statistics for Datafusion Table pub fn datafusion_table_statistics(&self) -> Statistics { - let stats = self - .files() - .iter() - .try_fold( - Statistics { - num_rows: Some(0), - total_byte_size: Some(0), - column_statistics: Some(vec![ - ColumnStatistics { - null_count: Some(0), - max_value: None, - min_value: None, - distinct_count: None - }; - self.schema().unwrap().get_fields().len() - ]), - is_exact: true, - }, - |acc, action| { - let new_stats = action - .get_stats() - .unwrap_or_else(|_| Some(protocol::Stats::default()))?; - Some(Statistics { - num_rows: acc - .num_rows - .map(|rows| rows + new_stats.num_records as usize), - total_byte_size: acc - .total_byte_size - .map(|total_size| total_size + action.size as usize), - column_statistics: acc.column_statistics.map(|col_stats| { - self.schema() - .unwrap() - .get_fields() - .iter() - .zip(col_stats) - .map(|(field, stats)| { - let null_count = new_stats - .null_count - .get(field.get_name()) - .and_then(|x| { - let null_count_acc = stats.null_count?; - let null_count = x.as_value()? as usize; - Some(null_count_acc + null_count) - }) - .or(stats.null_count); - - let max_value = new_stats - .max_values - .get(field.get_name()) - .and_then(|x| { - let old_stats = stats.clone(); - let max_value = to_scalar_value(x.as_value()?); - - match (max_value, old_stats.max_value) { - (Some(max_value), Some(old_max_value)) => { - if left_larger_than_right( - old_max_value.clone(), - max_value.clone(), - )? { - Some(old_max_value) - } else { - Some(max_value) - } - } - (Some(max_value), None) => Some(max_value), - (None, old) => old, - } - }) - .or_else(|| stats.max_value.clone()); - - let min_value = new_stats - .min_values - .get(field.get_name()) - .and_then(|x| { - let old_stats = stats.clone(); - let min_value = to_scalar_value(x.as_value()?); - - match (min_value, old_stats.min_value) { - (Some(min_value), Some(old_min_value)) => { - if left_larger_than_right( - min_value.clone(), - old_min_value.clone(), - )? { - Some(old_min_value) - } else { - Some(min_value) - } - } - (Some(min_value), None) => Some(min_value), - (None, old) => old, - } - }) - .or_else(|| stats.min_value.clone()); - - ColumnStatistics { - null_count, - max_value, - min_value, - distinct_count: None, // TODO: distinct - } - }) - .collect() - }), - is_exact: true, - }) - }, - ) - .unwrap_or_default(); - - // Convert column max/min scalar values to correct types based on arrow types. - Statistics { - is_exact: true, - num_rows: stats.num_rows, - total_byte_size: stats.total_byte_size, - column_statistics: stats.column_statistics.map(|col_stats| { - let fields = self.schema().unwrap().get_fields(); - col_stats - .iter() - .zip(fields) - .map(|(col_states, field)| { - let dt = self - .arrow_schema() - .unwrap() - .field_with_name(field.get_name()) - .unwrap() - .data_type() - .clone(); - ColumnStatistics { - null_count: col_states.null_count, - max_value: col_states - .max_value - .as_ref() - .and_then(|scalar| correct_scalar_value_type(scalar.clone(), &dt)), - min_value: col_states - .min_value - .as_ref() - .and_then(|scalar| correct_scalar_value_type(scalar.clone(), &dt)), - distinct_count: col_states.distinct_count, + let schema = self.arrow_schema().unwrap(); + // Downgrade statistics to absent if file metadata is not present. + let mut downgrade = false; + let unknown_stats = Statistics::new_unknown(&schema); + + let files = self.files(); + + // Initalize statistics + let mut table_stats = match files.get(0) { + Some(file) => { + match file.get_stats() { + Ok(Some(stats)) => { + let mut column_statistics = Vec::with_capacity(schema.fields().size()); + let total_byte_size = Precision::Exact(file.size as usize); + let num_rows = Precision::Exact(stats.num_records as usize); + + for field in schema.fields() { + let null_count = match stats.null_count.get(field.name()) { + Some(ColumnCountStat::Value(x)) => Precision::Exact(*x as usize), + //TODO: determine how to handle nested structures... + _ => Precision::Absent, + }; + + let max_value = match stats.max_values.get(field.name()) { + Some(ColumnValueStat::Value(value)) => Precision::Exact( + to_correct_scalar_value(value, field.data_type()).unwrap(), + ), + //TODO: determine how to handle nested structures... + _ => Precision::Absent, + }; + + let min_value = match stats.min_values.get(field.name()) { + Some(ColumnValueStat::Value(value)) => Precision::Exact( + to_correct_scalar_value(value, field.data_type()).unwrap(), + ), + //TODO: determine how to handle nested structures... + _ => Precision::Absent, + }; + + column_statistics.push(ColumnStatistics { + null_count, + max_value, + min_value, + distinct_count: Precision::Absent, + }); + } + + Statistics { + total_byte_size, + num_rows, + column_statistics, + } + } + Ok(None) => { + downgrade = true; + let mut stats = unknown_stats.clone(); + stats.total_byte_size = Precision::Exact(file.size as usize); + stats + } + _ => return unknown_stats, + } + } + None => { + // The Table is empty + let mut stats = unknown_stats; + stats.num_rows = Precision::Exact(0); + stats.total_byte_size = Precision::Exact(0); + return stats; + } + }; + + // Populate the remaining statistics. If file statistics are not present then relevant statistics are downgraded to absent. + for file in &files.as_slice()[1..] { + let byte_size = Precision::Exact(file.size as usize); + table_stats.total_byte_size = table_stats.total_byte_size.add(&byte_size); + + if !downgrade { + match file.get_stats() { + Ok(Some(stats)) => { + let num_records = Precision::Exact(stats.num_records as usize); + + table_stats.num_rows = table_stats.num_rows.add(&num_records); + + for (idx, field) in schema.fields().iter().enumerate() { + let column_stats = table_stats.column_statistics.get_mut(idx).unwrap(); + + let null_count = match stats.null_count.get(field.name()) { + Some(ColumnCountStat::Value(x)) => Precision::Exact(*x as usize), + //TODO: determine how to handle nested structures... + _ => Precision::Absent, + }; + + let max_value = match stats.max_values.get(field.name()) { + Some(ColumnValueStat::Value(value)) => Precision::Exact( + to_correct_scalar_value(value, field.data_type()).unwrap(), + ), + //TODO: determine how to handle nested structures... + _ => Precision::Absent, + }; + + let min_value = match stats.min_values.get(field.name()) { + Some(ColumnValueStat::Value(value)) => Precision::Exact( + to_correct_scalar_value(value, field.data_type()).unwrap(), + ), + //TODO: determine how to handle nested structures... + _ => Precision::Absent, + }; + + column_stats.null_count = column_stats.null_count.add(&null_count); + column_stats.max_value = column_stats.max_value.max(&max_value); + column_stats.min_value = column_stats.min_value.min(&min_value); } - }) - .collect() - }), + } + Ok(None) => { + downgrade = true; + } + Err(_) => return unknown_stats, + } + } + } + + if downgrade { + table_stats.column_statistics = unknown_stats.column_statistics; + table_stats.num_rows = Precision::Absent; } + + table_stats } } @@ -620,13 +604,14 @@ impl<'a> DeltaScanBuilder<'a> { let mut table_partition_cols = table_partition_cols .iter() - .map(|c| Ok((c.to_owned(), schema.field_with_name(c)?.data_type().clone()))) + .map(|name| schema.field_with_name(name).map(|f| f.to_owned())) .collect::, ArrowError>>()?; if let Some(file_column_name) = &config.file_column_name { - table_partition_cols.push(( + table_partition_cols.push(Field::new( file_column_name.clone(), wrap_partition_type_in_dict(DataType::Utf8), + false, )); } @@ -855,7 +840,7 @@ impl ExecutionPlan for DeltaScan { self.parquet_scan.execute(partition, context) } - fn statistics(&self) -> Statistics { + fn statistics(&self) -> DataFusionResult { self.parquet_scan.statistics() } } @@ -955,23 +940,27 @@ pub(crate) fn partitioned_file_from_action( } } -fn to_scalar_value(stat_val: &serde_json::Value) -> Option { - match stat_val { - serde_json::Value::Bool(val) => Some(ScalarValue::from(*val)), - serde_json::Value::Number(num) => { - if let Some(val) = num.as_i64() { - Some(ScalarValue::from(val)) - } else if let Some(val) = num.as_u64() { - Some(ScalarValue::from(val)) - } else { - num.as_f64().map(ScalarValue::from) - } - } - serde_json::Value::String(s) => Some(ScalarValue::from(s.as_str())), - serde_json::Value::Array(_) => None, - serde_json::Value::Object(_) => None, - serde_json::Value::Null => None, - } +fn parse_timestamp(stat_val: &serde_json::Value, field_dt: &ArrowDataType) -> Option { + let string = match stat_val { + serde_json::Value::String(s) => s.to_owned(), + _ => stat_val.to_string(), + }; + + let time_micro = ScalarValue::try_from_string( + string, + &ArrowDataType::Timestamp(TimeUnit::Microsecond, None), + ) + .ok()?; + let cast_arr = cast_with_options( + &time_micro.to_array(), + field_dt, + &CastOptions { + safe: false, + ..Default::default() + }, + ) + .ok()?; + ScalarValue::try_from_array(&cast_arr, 0).ok() } pub(crate) fn to_correct_scalar_value( @@ -983,146 +972,16 @@ pub(crate) fn to_correct_scalar_value( serde_json::Value::Object(_) => None, serde_json::Value::Null => get_null_of_arrow_type(field_dt).ok(), serde_json::Value::String(string_val) => match field_dt { - ArrowDataType::Timestamp(_, _) => { - let time_nanos = ScalarValue::try_from_string( - string_val.to_owned(), - &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), - ) - .ok()?; - let cast_arr = cast_with_options( - &time_nanos.to_array(), - field_dt, - &CastOptions { - safe: false, - ..Default::default() - }, - ) - .ok()?; - Some(ScalarValue::try_from_array(&cast_arr, 0).ok()?) - } + ArrowDataType::Timestamp(_, _) => parse_timestamp(stat_val, field_dt), _ => Some(ScalarValue::try_from_string(string_val.to_owned(), field_dt).ok()?), }, other => match field_dt { - ArrowDataType::Timestamp(_, _) => { - let time_nanos = ScalarValue::try_from_string( - other.to_string(), - &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), - ) - .ok()?; - let cast_arr = cast_with_options( - &time_nanos.to_array(), - field_dt, - &CastOptions { - safe: false, - ..Default::default() - }, - ) - .ok()?; - Some(ScalarValue::try_from_array(&cast_arr, 0).ok()?) - } + ArrowDataType::Timestamp(_, _) => parse_timestamp(stat_val, field_dt), _ => Some(ScalarValue::try_from_string(other.to_string(), field_dt).ok()?), }, } } -fn correct_scalar_value_type(value: ScalarValue, field_dt: &ArrowDataType) -> Option { - match field_dt { - ArrowDataType::Int64 => { - let raw_value = i64::try_from(value).ok()?; - Some(ScalarValue::from(raw_value)) - } - ArrowDataType::Int32 => { - let raw_value = i64::try_from(value).ok()? as i32; - Some(ScalarValue::from(raw_value)) - } - ArrowDataType::Int16 => { - let raw_value = i64::try_from(value).ok()? as i16; - Some(ScalarValue::from(raw_value)) - } - ArrowDataType::Int8 => { - let raw_value = i64::try_from(value).ok()? as i8; - Some(ScalarValue::from(raw_value)) - } - ArrowDataType::Float32 => { - let raw_value = f64::try_from(value).ok()? as f32; - Some(ScalarValue::from(raw_value)) - } - ArrowDataType::Float64 => { - let raw_value = f64::try_from(value).ok()?; - Some(ScalarValue::from(raw_value)) - } - ArrowDataType::Utf8 => match value { - ScalarValue::Utf8(val) => Some(ScalarValue::Utf8(val)), - _ => None, - }, - ArrowDataType::LargeUtf8 => match value { - ScalarValue::Utf8(val) => Some(ScalarValue::LargeUtf8(val)), - _ => None, - }, - ArrowDataType::Boolean => { - let raw_value = bool::try_from(value).ok()?; - Some(ScalarValue::from(raw_value)) - } - ArrowDataType::Decimal128(_, _) => { - let raw_value = f64::try_from(value).ok()?; - Some(ScalarValue::from(raw_value)) - } - ArrowDataType::Decimal256(_, _) => { - let raw_value = f64::try_from(value).ok()?; - Some(ScalarValue::from(raw_value)) - } - ArrowDataType::Date32 => { - let raw_value = i64::try_from(value).ok()? as i32; - Some(ScalarValue::Date32(Some(raw_value))) - } - ArrowDataType::Date64 => { - let raw_value = i64::try_from(value).ok()?; - Some(ScalarValue::Date64(Some(raw_value))) - } - ArrowDataType::Timestamp(TimeUnit::Nanosecond, None) => { - let raw_value = i64::try_from(value).ok()?; - Some(ScalarValue::TimestampNanosecond(Some(raw_value), None)) - } - ArrowDataType::Timestamp(TimeUnit::Microsecond, None) => { - let raw_value = i64::try_from(value).ok()?; - Some(ScalarValue::TimestampMicrosecond(Some(raw_value), None)) - } - ArrowDataType::Timestamp(TimeUnit::Millisecond, None) => { - let raw_value = i64::try_from(value).ok()?; - Some(ScalarValue::TimestampMillisecond(Some(raw_value), None)) - } - _ => { - log::error!( - "Scalar value of arrow type unimplemented for {:?} and {:?}", - value, - field_dt - ); - None - } - } -} - -fn left_larger_than_right(left: ScalarValue, right: ScalarValue) -> Option { - match (&left, &right) { - (ScalarValue::Float64(Some(l)), ScalarValue::Float64(Some(r))) => Some(l > r), - (ScalarValue::Float32(Some(l)), ScalarValue::Float32(Some(r))) => Some(l > r), - (ScalarValue::Int8(Some(l)), ScalarValue::Int8(Some(r))) => Some(l > r), - (ScalarValue::Int16(Some(l)), ScalarValue::Int16(Some(r))) => Some(l > r), - (ScalarValue::Int32(Some(l)), ScalarValue::Int32(Some(r))) => Some(l > r), - (ScalarValue::Int64(Some(l)), ScalarValue::Int64(Some(r))) => Some(l > r), - (ScalarValue::Utf8(Some(l)), ScalarValue::Utf8(Some(r))) => Some(l > r), - (ScalarValue::Boolean(Some(l)), ScalarValue::Boolean(Some(r))) => Some(l & !r), - _ => { - log::error!( - "Scalar value comparison unimplemented for {:?} and {:?}", - left, - right - ); - None - } - } -} - pub(crate) fn logical_expr_to_physical_expr( expr: &Expr, schema: &ArrowSchema, @@ -1705,76 +1564,6 @@ mod tests { } } - #[test] - fn test_to_scalar_value() { - let reference_pairs = &[ - ( - json!("val"), - Some(ScalarValue::Utf8(Some(String::from("val")))), - ), - (json!("2"), Some(ScalarValue::Utf8(Some(String::from("2"))))), - (json!(true), Some(ScalarValue::Boolean(Some(true)))), - (json!(false), Some(ScalarValue::Boolean(Some(false)))), - (json!(2), Some(ScalarValue::Int64(Some(2)))), - (json!(-2), Some(ScalarValue::Int64(Some(-2)))), - (json!(2.0), Some(ScalarValue::Float64(Some(2.0)))), - (json!(["1", "2"]), None), - (json!({"key": "val"}), None), - (json!(null), None), - ]; - for (stat_val, scalar_val) in reference_pairs { - assert_eq!(to_scalar_value(stat_val), *scalar_val) - } - } - - #[test] - fn test_left_larger_than_right() { - let correct_reference_pairs = vec![ - ( - ScalarValue::Float64(Some(1.0)), - ScalarValue::Float64(Some(2.0)), - ), - ( - ScalarValue::Float32(Some(1.0)), - ScalarValue::Float32(Some(2.0)), - ), - (ScalarValue::Int8(Some(1)), ScalarValue::Int8(Some(2))), - (ScalarValue::Int16(Some(1)), ScalarValue::Int16(Some(2))), - (ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(2))), - (ScalarValue::Int64(Some(1)), ScalarValue::Int64(Some(2))), - ( - ScalarValue::Boolean(Some(false)), - ScalarValue::Boolean(Some(true)), - ), - ( - ScalarValue::Utf8(Some(String::from("1"))), - ScalarValue::Utf8(Some(String::from("2"))), - ), - ]; - for (smaller_val, larger_val) in correct_reference_pairs { - assert_eq!( - left_larger_than_right(smaller_val.clone(), larger_val.clone()), - Some(false) - ); - assert_eq!(left_larger_than_right(larger_val, smaller_val), Some(true)); - } - - let incorrect_reference_pairs = vec![ - ( - ScalarValue::Float64(Some(1.0)), - ScalarValue::Float32(Some(2.0)), - ), - (ScalarValue::Int32(Some(1)), ScalarValue::Float32(Some(2.0))), - ( - ScalarValue::Boolean(Some(true)), - ScalarValue::Float32(Some(2.0)), - ), - ]; - for (left, right) in incorrect_reference_pairs { - assert_eq!(left_larger_than_right(left, right), None); - } - } - #[test] fn test_partitioned_file_from_action() { let mut partition_values = std::collections::HashMap::new(); diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index c15bb8052e..737837ed1b 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -211,7 +211,7 @@ mod datafusion_utils { metrics::{ExecutionPlanMetricsSet, MetricsSet}, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, }; - use datafusion_common::DFSchema; + use datafusion_common::{DFSchema, Statistics}; use datafusion_expr::Expr; use futures::{Stream, StreamExt}; @@ -337,7 +337,7 @@ mod datafusion_utils { })) } - fn statistics(&self) -> datafusion_common::Statistics { + fn statistics(&self) -> DataFusionResult { self.parent.statistics() } diff --git a/rust/tests/integration_datafusion.rs b/rust/tests/integration_datafusion.rs index 5aafe52e87..15df5441c4 100644 --- a/rust/tests/integration_datafusion.rs +++ b/rust/tests/integration_datafusion.rs @@ -45,6 +45,7 @@ use std::error::Error; mod common; mod local { + use datafusion::common::stats::Precision; use deltalake::{writer::JsonWriter, SchemaTypeMap}; use super::*; @@ -288,19 +289,22 @@ mod local { .unwrap(); let statistics = table.state.datafusion_table_statistics(); - assert_eq!(statistics.num_rows, Some(4),); + assert_eq!(statistics.num_rows, Precision::Exact(4 as usize),); - assert_eq!(statistics.total_byte_size, Some(440 + 440)); + assert_eq!( + statistics.total_byte_size, + Precision::Exact((440 + 440) as usize) + ); + let column_stats = statistics.column_statistics.get(0).unwrap(); + assert_eq!(column_stats.null_count, Precision::Exact(0)); assert_eq!( - statistics - .column_statistics - .clone() - .unwrap() - .iter() - .map(|x| x.null_count) - .collect::>>(), - vec![Some(0)], + column_stats.max_value, + Precision::Exact(ScalarValue::from(4_i32)) + ); + assert_eq!( + column_stats.min_value, + Precision::Exact(ScalarValue::from(0_i32)) ); let ctx = SessionContext::new(); @@ -324,28 +328,6 @@ mod local { Arc::new(Int32Array::from(vec![0])).as_ref(), ); - assert_eq!( - statistics - .column_statistics - .clone() - .unwrap() - .iter() - .map(|x| x.max_value.as_ref()) - .collect::>>(), - vec![Some(&ScalarValue::from(4_i32))], - ); - - assert_eq!( - statistics - .column_statistics - .clone() - .unwrap() - .iter() - .map(|x| x.min_value.as_ref()) - .collect::>>(), - vec![Some(&ScalarValue::from(0_i32))], - ); - Ok(()) } @@ -786,14 +768,14 @@ mod local { let expected_schema = ArrowSchema::new(vec![ ArrowField::new("c3", ArrowDataType::Int32, true), - ArrowField::new("c1", ArrowDataType::Int32, false), + ArrowField::new("c1", ArrowDataType::Int32, true), ArrowField::new( "c2", ArrowDataType::Dictionary( Box::new(ArrowDataType::UInt16), Box::new(ArrowDataType::Utf8), ), - false, + true, ), ]); From 33ce46e9a83b53b0c98ae628c4c506385d8790e8 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Thu, 26 Oct 2023 18:54:29 -0400 Subject: [PATCH 2/8] Refactor and expand tests for datafusion table stats --- rust/src/delta_datafusion/mod.rs | 212 +++++++++++++---------- rust/src/operations/transaction/state.rs | 15 +- rust/tests/integration_datafusion.rs | 118 ++++++++++++- 3 files changed, 238 insertions(+), 107 deletions(-) diff --git a/rust/src/delta_datafusion/mod.rs b/rust/src/delta_datafusion/mod.rs index 62900c9c9a..e8382ce15a 100644 --- a/rust/src/delta_datafusion/mod.rs +++ b/rust/src/delta_datafusion/mod.rs @@ -65,6 +65,7 @@ use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_physical_expr::{create_physical_expr, PhysicalExpr}; use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::physical_plan::PhysicalExtensionCodec; +use log::error; use object_store::ObjectMeta; use serde::{Deserialize, Serialize}; use url::Url; @@ -104,10 +105,34 @@ impl From for DeltaTableError { } } +fn get_scalar_value(value: Option<&ColumnValueStat>, field: &Arc) -> Precision { + match value { + Some(ColumnValueStat::Value(value)) => to_correct_scalar_value(value, field.data_type()) + .map(|maybe_scalar| { + maybe_scalar + .map(Precision::Exact) + .unwrap_or_default() + }) + .unwrap_or_else(|_| { + error!( + "Unable to parse scalar value of {:?} with type {} for column {}", + value, + field.data_type(), + field.name() + ); + Precision::Absent + }), + _ => Precision::Absent, + } +} + impl DeltaTableState { - /// Return statistics for Datafusion Table - pub fn datafusion_table_statistics(&self) -> Statistics { - let schema = self.arrow_schema().unwrap(); + /// Provide table level statistics to Datafusion + pub fn datafusion_table_statistics(&self) -> DataFusionResult { + // Statistics only support primitive types. Any non primitive column will not have their statistics captured + // If column statistics are missing for any add actions then we simply downgrade to Absent. + + let schema = self.arrow_schema()?; // Downgrade statistics to absent if file metadata is not present. let mut downgrade = false; let unknown_stats = Statistics::new_unknown(&schema); @@ -116,65 +141,49 @@ impl DeltaTableState { // Initalize statistics let mut table_stats = match files.get(0) { - Some(file) => { - match file.get_stats() { - Ok(Some(stats)) => { - let mut column_statistics = Vec::with_capacity(schema.fields().size()); - let total_byte_size = Precision::Exact(file.size as usize); - let num_rows = Precision::Exact(stats.num_records as usize); - - for field in schema.fields() { - let null_count = match stats.null_count.get(field.name()) { - Some(ColumnCountStat::Value(x)) => Precision::Exact(*x as usize), - //TODO: determine how to handle nested structures... - _ => Precision::Absent, - }; - - let max_value = match stats.max_values.get(field.name()) { - Some(ColumnValueStat::Value(value)) => Precision::Exact( - to_correct_scalar_value(value, field.data_type()).unwrap(), - ), - //TODO: determine how to handle nested structures... - _ => Precision::Absent, - }; - - let min_value = match stats.min_values.get(field.name()) { - Some(ColumnValueStat::Value(value)) => Precision::Exact( - to_correct_scalar_value(value, field.data_type()).unwrap(), - ), - //TODO: determine how to handle nested structures... - _ => Precision::Absent, - }; - - column_statistics.push(ColumnStatistics { - null_count, - max_value, - min_value, - distinct_count: Precision::Absent, - }); - } - - Statistics { - total_byte_size, - num_rows, - column_statistics, - } + Some(file) => match file.get_stats() { + Ok(Some(stats)) => { + let mut column_statistics = Vec::with_capacity(schema.fields().size()); + let total_byte_size = Precision::Exact(file.size as usize); + let num_rows = Precision::Exact(stats.num_records as usize); + + for field in schema.fields() { + let null_count = match stats.null_count.get(field.name()) { + Some(ColumnCountStat::Value(x)) => Precision::Exact(*x as usize), + _ => Precision::Absent, + }; + + let max_value = get_scalar_value(stats.max_values.get(field.name()), field); + let min_value = get_scalar_value(stats.min_values.get(field.name()), field); + + column_statistics.push(ColumnStatistics { + null_count, + max_value, + min_value, + distinct_count: Precision::Absent, + }); } - Ok(None) => { - downgrade = true; - let mut stats = unknown_stats.clone(); - stats.total_byte_size = Precision::Exact(file.size as usize); - stats + + Statistics { + total_byte_size, + num_rows, + column_statistics, } - _ => return unknown_stats, } - } + Ok(None) => { + downgrade = true; + let mut stats = unknown_stats.clone(); + stats.total_byte_size = Precision::Exact(file.size as usize); + stats + } + _ => return Ok(unknown_stats), + }, None => { // The Table is empty let mut stats = unknown_stats; stats.num_rows = Precision::Exact(0); stats.total_byte_size = Precision::Exact(0); - return stats; + return Ok(stats); } }; @@ -195,25 +204,13 @@ impl DeltaTableState { let null_count = match stats.null_count.get(field.name()) { Some(ColumnCountStat::Value(x)) => Precision::Exact(*x as usize), - //TODO: determine how to handle nested structures... - _ => Precision::Absent, - }; - - let max_value = match stats.max_values.get(field.name()) { - Some(ColumnValueStat::Value(value)) => Precision::Exact( - to_correct_scalar_value(value, field.data_type()).unwrap(), - ), - //TODO: determine how to handle nested structures... _ => Precision::Absent, }; - let min_value = match stats.min_values.get(field.name()) { - Some(ColumnValueStat::Value(value)) => Precision::Exact( - to_correct_scalar_value(value, field.data_type()).unwrap(), - ), - //TODO: determine how to handle nested structures... - _ => Precision::Absent, - }; + let max_value = + get_scalar_value(stats.max_values.get(field.name()), field); + let min_value = + get_scalar_value(stats.min_values.get(field.name()), field); column_stats.null_count = column_stats.null_count.add(&null_count); column_stats.max_value = column_stats.max_value.max(&max_value); @@ -223,7 +220,7 @@ impl DeltaTableState { Ok(None) => { downgrade = true; } - Err(_) => return unknown_stats, + Err(_) => return Ok(unknown_stats), } } } @@ -233,7 +230,7 @@ impl DeltaTableState { table_stats.num_rows = Precision::Absent; } - table_stats + Ok(table_stats) } } @@ -261,9 +258,12 @@ fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option Some(v) => serde_json::Value::String(v.to_string()), None => serde_json::Value::Null, }; - to_correct_scalar_value(&value, &data_type).unwrap_or( - get_null_of_arrow_type(&data_type).expect("Could not determine null type"), - ) + to_correct_scalar_value(&value, &data_type) + .ok() + .flatten() + .unwrap_or( + get_null_of_arrow_type(&data_type).expect("Could not determine null type"), + ) } else if let Ok(Some(statistics)) = add.get_stats() { let values = if get_max { statistics.max_values @@ -273,7 +273,11 @@ fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option values .get(&column.name) - .and_then(|f| to_correct_scalar_value(f.as_value()?, &data_type)) + .and_then(|f| { + to_correct_scalar_value(f.as_value()?, &data_type) + .ok() + .flatten() + }) .unwrap_or( get_null_of_arrow_type(&data_type).expect("Could not determine null type"), ) @@ -615,6 +619,17 @@ impl<'a> DeltaScanBuilder<'a> { )); } + let stats = self + .snapshot + .datafusion_table_statistics() + .unwrap_or_else(|e| { + error!( + "Error while computing table statistics. Using unknown statistics. {}", + e + ); + Statistics::new_unknown(&schema) + }); + let scan = ParquetFormat::new() .create_physical_plan( self.state, @@ -622,7 +637,7 @@ impl<'a> DeltaScanBuilder<'a> { object_store_url: self.object_store.object_store_url(), file_schema, file_groups: file_groups.into_values().collect(), - statistics: self.snapshot.datafusion_table_statistics(), + statistics: stats, projection: self.projection.cloned(), limit: self.limit, table_partition_cols, @@ -694,7 +709,7 @@ impl TableProvider for DeltaTable { } fn statistics(&self) -> Option { - Some(self.state.datafusion_table_statistics()) + self.state.datafusion_table_statistics().ok() } } @@ -773,7 +788,7 @@ impl TableProvider for DeltaTableProvider { } fn statistics(&self) -> Option { - Some(self.snapshot.datafusion_table_statistics()) + self.snapshot.datafusion_table_statistics().ok() } } @@ -919,6 +934,8 @@ pub(crate) fn partitioned_file_from_action( &serde_json::Value::String(value.to_string()), f.data_type(), ) + .ok() + .flatten() .unwrap_or(ScalarValue::Null), None => get_null_of_arrow_type(f.data_type()).unwrap_or(ScalarValue::Null), }) @@ -940,7 +957,10 @@ pub(crate) fn partitioned_file_from_action( } } -fn parse_timestamp(stat_val: &serde_json::Value, field_dt: &ArrowDataType) -> Option { +fn parse_timestamp( + stat_val: &serde_json::Value, + field_dt: &ArrowDataType, +) -> DataFusionResult { let string = match stat_val { serde_json::Value::String(s) => s.to_owned(), _ => stat_val.to_string(), @@ -949,8 +969,7 @@ fn parse_timestamp(stat_val: &serde_json::Value, field_dt: &ArrowDataType) -> Op let time_micro = ScalarValue::try_from_string( string, &ArrowDataType::Timestamp(TimeUnit::Microsecond, None), - ) - .ok()?; + )?; let cast_arr = cast_with_options( &time_micro.to_array(), field_dt, @@ -958,26 +977,31 @@ fn parse_timestamp(stat_val: &serde_json::Value, field_dt: &ArrowDataType) -> Op safe: false, ..Default::default() }, - ) - .ok()?; - ScalarValue::try_from_array(&cast_arr, 0).ok() + )?; + ScalarValue::try_from_array(&cast_arr, 0) } pub(crate) fn to_correct_scalar_value( stat_val: &serde_json::Value, field_dt: &ArrowDataType, -) -> Option { +) -> DataFusionResult> { match stat_val { - serde_json::Value::Array(_) => None, - serde_json::Value::Object(_) => None, - serde_json::Value::Null => get_null_of_arrow_type(field_dt).ok(), + serde_json::Value::Array(_) => Ok(None), + serde_json::Value::Object(_) => Ok(None), + serde_json::Value::Null => Ok(Some(get_null_of_arrow_type(field_dt)?)), serde_json::Value::String(string_val) => match field_dt { - ArrowDataType::Timestamp(_, _) => parse_timestamp(stat_val, field_dt), - _ => Some(ScalarValue::try_from_string(string_val.to_owned(), field_dt).ok()?), + ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)), + _ => Ok(Some(ScalarValue::try_from_string( + string_val.to_owned(), + field_dt, + )?)), }, other => match field_dt { - ArrowDataType::Timestamp(_, _) => parse_timestamp(stat_val, field_dt), - _ => Some(ScalarValue::try_from_string(other.to_string(), field_dt).ok()?), + ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)), + _ => Ok(Some(ScalarValue::try_from_string( + other.to_string(), + field_dt, + )?)), }, } } @@ -1559,7 +1583,7 @@ mod tests { ]; for (raw, data_type, ref_scalar) in reference_pairs { - let scalar = to_correct_scalar_value(raw, data_type).unwrap(); + let scalar = to_correct_scalar_value(raw, data_type).unwrap().unwrap(); assert_eq!(*ref_scalar, scalar) } } diff --git a/rust/src/operations/transaction/state.rs b/rust/src/operations/transaction/state.rs index bb9c3ff35e..8e9fa58fa6 100644 --- a/rust/src/operations/transaction/state.rs +++ b/rust/src/operations/transaction/state.rs @@ -188,9 +188,12 @@ impl<'a> AddContainer<'a> { Some(v) => serde_json::Value::String(v.to_string()), None => serde_json::Value::Null, }; - to_correct_scalar_value(&value, data_type).unwrap_or( - get_null_of_arrow_type(data_type).expect("Could not determine null type"), - ) + to_correct_scalar_value(&value, data_type) + .ok() + .flatten() + .unwrap_or( + get_null_of_arrow_type(data_type).expect("Could not determine null type"), + ) } else if let Ok(Some(statistics)) = add.get_stats() { let values = if get_max { statistics.max_values @@ -200,7 +203,11 @@ impl<'a> AddContainer<'a> { values .get(&column.name) - .and_then(|f| to_correct_scalar_value(f.as_value()?, data_type)) + .and_then(|f| { + to_correct_scalar_value(f.as_value()?, data_type) + .ok() + .flatten() + }) .unwrap_or( get_null_of_arrow_type(data_type).expect("Could not determine null type"), ) diff --git a/rust/tests/integration_datafusion.rs b/rust/tests/integration_datafusion.rs index 15df5441c4..dfb23a1421 100644 --- a/rust/tests/integration_datafusion.rs +++ b/rust/tests/integration_datafusion.rs @@ -284,10 +284,11 @@ mod local { #[tokio::test] async fn test_datafusion_stats() -> Result<()> { + // Validate a table that contains statisitics for all files let table = deltalake::open_table("./tests/data/delta-0.8.0") .await .unwrap(); - let statistics = table.state.datafusion_table_statistics(); + let statistics = table.state.datafusion_table_statistics()?; assert_eq!(statistics.num_rows, Precision::Exact(4 as usize),); @@ -309,25 +310,124 @@ mod local { let ctx = SessionContext::new(); ctx.register_table("test_table", Arc::new(table))?; - - let batches = ctx + let actual = ctx .sql("SELECT max(value), min(value) FROM test_table") .await? .collect() .await?; - assert_eq!(batches.len(), 1); - let batch = &batches[0]; + let expected = vec![ + "+-----------------------+-----------------------+", + "| MAX(test_table.value) | MIN(test_table.value) |", + "+-----------------------+-----------------------+", + "| 4 | 0 |", + "+-----------------------+-----------------------+", + ]; + assert_batches_sorted_eq!(&expected, &actual); + + // Validate a table that does not contain column statisitics + let table = deltalake::open_table("./tests/data/delta-0.2.0") + .await + .unwrap(); + let statistics = table.state.datafusion_table_statistics()?; + + assert_eq!(statistics.num_rows, Precision::Absent); + assert_eq!( - batch.column(0).as_ref(), - Arc::new(Int32Array::from(vec![4])).as_ref(), + statistics.total_byte_size, + Precision::Exact((400 + 404 + 396) as usize) ); + let column_stats = statistics.column_statistics.get(0).unwrap(); + assert_eq!(column_stats.null_count, Precision::Absent); + assert_eq!(column_stats.max_value, Precision::Absent); + assert_eq!(column_stats.min_value, Precision::Absent); + ctx.register_table("test_table2", Arc::new(table))?; + let actual = ctx + .sql("SELECT max(value), min(value) FROM test_table2") + .await? + .collect() + .await?; + + let expected = vec![ + "+------------------------+------------------------+", + "| MAX(test_table2.value) | MIN(test_table2.value) |", + "+------------------------+------------------------+", + "| 3 | 1 |", + "+------------------------+------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &actual); + + // Validate a table that contains nested structures. + + // This table is interesting since it goes through schema evolution. + // In particular 'new_column' contains statistics for when it + // is introduced (10) but the commit following (11) does not contain + // statistics for this column. + let table = deltalake::open_table("./tests/data/delta-1.2.1-only-struct-stats") + .await + .unwrap(); + let schema = table.get_schema().unwrap(); + let statistics = table.state.datafusion_table_statistics()?; + assert_eq!(statistics.num_rows, Precision::Exact(12)); + + // `new_column` statistics + let stats = statistics + .column_statistics + .get(schema.index_of("new_column").unwrap()) + .unwrap(); + assert_eq!(stats.null_count, Precision::Absent); + assert_eq!(stats.min_value, Precision::Absent); + assert_eq!(stats.max_value, Precision::Absent); + + // `date` statistics + let stats = statistics + .column_statistics + .get(schema.index_of("date").unwrap()) + .unwrap(); + assert_eq!(stats.null_count, Precision::Exact(0)); + // 2022-10-24 + assert_eq!( + stats.min_value, + Precision::Exact(ScalarValue::Date32(Some(19289))) + ); assert_eq!( - batch.column(1).as_ref(), - Arc::new(Int32Array::from(vec![0])).as_ref(), + stats.max_value, + Precision::Exact(ScalarValue::Date32(Some(19289))) ); + // `timestamp` statistics + let stats = statistics + .column_statistics + .get(schema.index_of("timestamp").unwrap()) + .unwrap(); + assert_eq!(stats.null_count, Precision::Exact(0)); + // 2022-10-24T22:59:32.846Z + assert_eq!( + stats.min_value, + Precision::Exact(ScalarValue::TimestampMicrosecond( + Some(1666652372846000), + None + )) + ); + // 2022-10-24T22:59:46.083Z + assert_eq!( + stats.max_value, + Precision::Exact(ScalarValue::TimestampMicrosecond( + Some(1666652386083000), + None + )) + ); + + // `struct_element` statistics + let stats = statistics + .column_statistics + .get(schema.index_of("nested_struct").unwrap()) + .unwrap(); + assert_eq!(stats.null_count, Precision::Absent); + assert_eq!(stats.min_value, Precision::Absent); + assert_eq!(stats.max_value, Precision::Absent); + Ok(()) } From eb3b6d184ad71548d3eee1dc8e97181370cc7354 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Fri, 27 Oct 2023 13:32:55 -0400 Subject: [PATCH 3/8] cargo fmt --- rust/src/delta_datafusion/mod.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/rust/src/delta_datafusion/mod.rs b/rust/src/delta_datafusion/mod.rs index e8382ce15a..0895798cc5 100644 --- a/rust/src/delta_datafusion/mod.rs +++ b/rust/src/delta_datafusion/mod.rs @@ -108,11 +108,7 @@ impl From for DeltaTableError { fn get_scalar_value(value: Option<&ColumnValueStat>, field: &Arc) -> Precision { match value { Some(ColumnValueStat::Value(value)) => to_correct_scalar_value(value, field.data_type()) - .map(|maybe_scalar| { - maybe_scalar - .map(Precision::Exact) - .unwrap_or_default() - }) + .map(|maybe_scalar| maybe_scalar.map(Precision::Exact).unwrap_or_default()) .unwrap_or_else(|_| { error!( "Unable to parse scalar value of {:?} with type {} for column {}", From e70fbfb4dc926635d444ae7f190e5640bbaf8b23 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sat, 4 Nov 2023 22:52:18 -0400 Subject: [PATCH 4/8] cargo fmt --- crates/deltalake-core/src/delta_datafusion/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/deltalake-core/src/delta_datafusion/mod.rs b/crates/deltalake-core/src/delta_datafusion/mod.rs index 48ac11d782..b8ba77e539 100644 --- a/crates/deltalake-core/src/delta_datafusion/mod.rs +++ b/crates/deltalake-core/src/delta_datafusion/mod.rs @@ -936,8 +936,7 @@ pub(crate) fn partitioned_file_from_action( field.data_type(), ) .unwrap_or(Some(ScalarValue::Null)) - .unwrap_or(ScalarValue::Null) - , + .unwrap_or(ScalarValue::Null), None => get_null_of_arrow_type(field.data_type()) .unwrap_or(ScalarValue::Null), }) From f8612e42d7c58531ce4c38d380658aae4ef1cf07 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Mon, 13 Nov 2023 21:11:43 -0500 Subject: [PATCH 5/8] Update arrow --- Cargo.toml | 31 +++++++++++++++-------------- crates/deltalake-sql/src/planner.rs | 8 ++++---- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 531f9ff4d1..2d1b13bc9b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,23 +19,24 @@ debug = "line-tables-only" [workspace.dependencies] # arrow -arrow = { version = "48" } -arrow-array = { version = "48" } -arrow-buffer = { version = "48" } -arrow-cast = { version = "48" } -arrow-ord = { version = "48" } -arrow-row = { version = "48" } -arrow-schema = { version = "48" } -arrow-select = { version = "48" } -parquet = { version = "48" } +arrow = { version = "48.0.1" } +arrow-array = { version = "48.0.1" } +arrow-buffer = { version = "48.0.1" } +arrow-cast = { version = "48.0.1" } +arrow-ord = { version = "48.0.1" } +arrow-row = { version = "48.0.1" } +arrow-schema = { version = "48.0.1" } +arrow-select = { version = "48.0.1" } +parquet = { version = "48.0.1" } # datafusion -datafusion = { git = "https://github.com/apache/arrow-datafusion.git" } -datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git" } -datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git" } -datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git" } -datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="33.0.0-rc2" } +datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev="33.0.0-rc2" } +datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev="33.0.0-rc2" } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="33.0.0-rc2" } +datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev="33.0.0-rc2" } +datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev="33.0.0-rc2" } + # serde serde = { version = "1", features = ["derive"] } diff --git a/crates/deltalake-sql/src/planner.rs b/crates/deltalake-sql/src/planner.rs index 6e2fd7cafa..1e0c818176 100644 --- a/crates/deltalake-sql/src/planner.rs +++ b/crates/deltalake-sql/src/planner.rs @@ -125,6 +125,10 @@ mod tests { impl ContextProvider for TestSchemaProvider { fn get_table_provider(&self, name: TableReference) -> DFResult> { + self.get_table_source(name) + } + + fn get_table_source(&self, name: TableReference) -> DFResult> { match self.tables.get(name.table()) { Some(table) => Ok(table.clone()), _ => Err(DataFusionError::Plan(format!( @@ -134,10 +138,6 @@ mod tests { } } - fn get_table_source(&self, _name: TableReference) -> DataFusionResult> { - unimplemented!("TODO") - } - fn get_function_meta(&self, _name: &str) -> Option> { None } From 5032040b39848ea8476cf267a4284d5c3f71b6b3 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Thu, 16 Nov 2023 21:02:04 -0500 Subject: [PATCH 6/8] datafusion 33 --- Cargo.toml | 12 ++++++------ crates/deltalake-sql/src/planner.rs | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2d1b13bc9b..a884ff5413 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,12 +30,12 @@ arrow-select = { version = "48.0.1" } parquet = { version = "48.0.1" } # datafusion -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="33.0.0-rc2" } -datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev="33.0.0-rc2" } -datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev="33.0.0-rc2" } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="33.0.0-rc2" } -datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev="33.0.0-rc2" } -datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev="33.0.0-rc2" } +datafusion = { version = "33.0.0" } +datafusion-expr = { version = "33.0.0" } +datafusion-common = { version = "33.0.0" } +datafusion-proto = { version = "33.0.0" } +datafusion-sql = { version = "33.0.0" } +datafusion-physical-expr = { version = "33.0.0" } # serde diff --git a/crates/deltalake-sql/src/planner.rs b/crates/deltalake-sql/src/planner.rs index 1e0c818176..bf07825d4b 100644 --- a/crates/deltalake-sql/src/planner.rs +++ b/crates/deltalake-sql/src/planner.rs @@ -92,10 +92,10 @@ mod tests { use arrow_schema::{DataType, Field, Schema}; use datafusion_common::config::ConfigOptions; use datafusion_common::DataFusionError; + use datafusion_common::Result as DataFusionResult; use datafusion_expr::logical_plan::builder::LogicalTableSource; use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource}; use datafusion_sql::TableReference; - use datafusion_common::Result as DataFusionResult; use crate::parser::DeltaParser; From 010f44a8bda45222c2bf94529cedc0f67da412b4 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Thu, 16 Nov 2023 21:23:44 -0500 Subject: [PATCH 7/8] fix integration tests --- crates/deltalake-core/tests/integration_datafusion.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/deltalake-core/tests/integration_datafusion.rs b/crates/deltalake-core/tests/integration_datafusion.rs index 1844ea7ed3..560b0a2319 100644 --- a/crates/deltalake-core/tests/integration_datafusion.rs +++ b/crates/deltalake-core/tests/integration_datafusion.rs @@ -283,12 +283,12 @@ mod local { #[tokio::test] async fn test_datafusion_stats() -> Result<()> { // Validate a table that contains statisitics for all files - let table = deltalake::open_table("./tests/data/delta-0.8.0") + let table = open_table("./tests/data/delta-0.8.0") .await .unwrap(); let statistics = table.state.datafusion_table_statistics()?; - assert_eq!(statistics.num_rows, Precision::Exact(4 as usize),); + assert_eq!(statistics.num_rows, Precision::Exact(4_usize),); assert_eq!( statistics.total_byte_size, @@ -324,7 +324,7 @@ mod local { assert_batches_sorted_eq!(&expected, &actual); // Validate a table that does not contain column statisitics - let table = deltalake::open_table("./tests/data/delta-0.2.0") + let table = open_table("./tests/data/delta-0.2.0") .await .unwrap(); let statistics = table.state.datafusion_table_statistics()?; @@ -362,7 +362,7 @@ mod local { // In particular 'new_column' contains statistics for when it // is introduced (10) but the commit following (11) does not contain // statistics for this column. - let table = deltalake::open_table("./tests/data/delta-1.2.1-only-struct-stats") + let table = open_table("./tests/data/delta-1.2.1-only-struct-stats") .await .unwrap(); let schema = table.get_schema().unwrap(); From b2260db681559e351fb75f4515672e545f85bf13 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Thu, 16 Nov 2023 21:24:39 -0500 Subject: [PATCH 8/8] cargo fmt --- crates/deltalake-core/tests/integration_datafusion.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/crates/deltalake-core/tests/integration_datafusion.rs b/crates/deltalake-core/tests/integration_datafusion.rs index 560b0a2319..a412ce6417 100644 --- a/crates/deltalake-core/tests/integration_datafusion.rs +++ b/crates/deltalake-core/tests/integration_datafusion.rs @@ -283,9 +283,7 @@ mod local { #[tokio::test] async fn test_datafusion_stats() -> Result<()> { // Validate a table that contains statisitics for all files - let table = open_table("./tests/data/delta-0.8.0") - .await - .unwrap(); + let table = open_table("./tests/data/delta-0.8.0").await.unwrap(); let statistics = table.state.datafusion_table_statistics()?; assert_eq!(statistics.num_rows, Precision::Exact(4_usize),); @@ -324,9 +322,7 @@ mod local { assert_batches_sorted_eq!(&expected, &actual); // Validate a table that does not contain column statisitics - let table = open_table("./tests/data/delta-0.2.0") - .await - .unwrap(); + let table = open_table("./tests/data/delta-0.2.0").await.unwrap(); let statistics = table.state.datafusion_table_statistics()?; assert_eq!(statistics.num_rows, Precision::Absent);