From 1ea1e4fc8cc658de87888558a8a606345e2cb749 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Thu, 26 Oct 2023 02:23:25 -0400 Subject: [PATCH 01/12] upgrade datafusion with git head --- Cargo.toml | 30 +- python/Cargo.toml | 2 +- rust/src/delta_datafusion/expr.rs | 4 + rust/src/delta_datafusion/mod.rs | 519 ++++++++------------------- rust/src/operations/mod.rs | 4 +- rust/tests/integration_datafusion.rs | 50 +-- 6 files changed, 192 insertions(+), 417 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d90700bdbe..048748578a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,23 +15,23 @@ debug = "line-tables-only" [workspace.dependencies] # arrow -arrow = { version = "47" } -arrow-array = { version = "47" } -arrow-buffer = { version = "47" } -arrow-cast = { version = "47" } -arrow-ord = { version = "47" } -arrow-row = { version = "47" } -arrow-schema = { version = "47" } -arrow-select = { version = "47" } -parquet = { version = "47" } +arrow = { version = "48" } +arrow-array = { version = "48" } +arrow-buffer = { version = "48" } +arrow-cast = { version = "48" } +arrow-ord = { version = "48" } +arrow-row = { version = "48" } +arrow-schema = { version = "48" } +arrow-select = { version = "48" } +parquet = { version = "48" } # datafusion -datafusion = { version = "32" } -datafusion-expr = { version = "32" } -datafusion-common = { version = "32" } -datafusion-proto = { version = "32" } -datafusion-sql = { version = "32" } -datafusion-physical-expr = { version = "32" } +datafusion = { git = "https://github.com/apache/arrow-datafusion" } +datafusion-expr = { git = "https://github.com/apache/arrow-datafusion" } +datafusion-common = { git = "https://github.com/apache/arrow-datafusion" } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion" } +datafusion-sql = { git = "https://github.com/apache/arrow-datafusion" } +datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion" } # serde serde = { version = "1", features = ["derive"] } diff --git a/python/Cargo.toml b/python/Cargo.toml index 38c2fcacbe..311aedfd0a 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -42,7 +42,7 @@ tokio = { workspace = true, features = ["rt-multi-thread"] } reqwest = { version = "*", features = ["native-tls-vendored"] } [dependencies.pyo3] -version = "0.19" +version = "0.20" features = ["extension-module", "abi3", "abi3-py37"] [dependencies.deltalake] diff --git a/rust/src/delta_datafusion/expr.rs b/rust/src/delta_datafusion/expr.rs index 815b01831f..0cd428adcb 100644 --- a/rust/src/delta_datafusion/expr.rs +++ b/rust/src/delta_datafusion/expr.rs @@ -70,6 +70,10 @@ impl<'a> ContextProvider for DeltaContextProvider<'a> { fn get_window_meta(&self, name: &str) -> Option> { self.state.window_functions().get(name).cloned() } + + fn get_table_source(&self, _name: TableReference) -> DFResult> { + unimplemented!() + } } /// Parse a string predicate into an `Expr` diff --git a/rust/src/delta_datafusion/mod.rs b/rust/src/delta_datafusion/mod.rs index 0d2df09e46..62900c9c9a 100644 --- a/rust/src/delta_datafusion/mod.rs +++ b/rust/src/delta_datafusion/mod.rs @@ -22,7 +22,6 @@ use std::any::Any; use std::collections::{HashMap, HashSet}; -use std::convert::TryFrom; use std::fmt::{self, Debug}; use std::sync::Arc; @@ -56,6 +55,7 @@ use datafusion::physical_plan::{ SendableRecordBatchStream, Statistics, }; use datafusion_common::scalar::ScalarValue; +use datafusion_common::stats::Precision; use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion}; use datafusion_common::{Column, DataFusionError, Result as DataFusionResult, ToDFSchema}; use datafusion_expr::expr::{ScalarFunction, ScalarUDF}; @@ -70,7 +70,7 @@ use serde::{Deserialize, Serialize}; use url::Url; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::protocol::{self, Add}; +use crate::protocol::{self, Add, ColumnCountStat, ColumnValueStat}; use crate::storage::ObjectStoreRef; use crate::table::builder::ensure_table_uri; use crate::table::state::DeltaTableState; @@ -107,149 +107,133 @@ impl From for DeltaTableError { impl DeltaTableState { /// Return statistics for Datafusion Table pub fn datafusion_table_statistics(&self) -> Statistics { - let stats = self - .files() - .iter() - .try_fold( - Statistics { - num_rows: Some(0), - total_byte_size: Some(0), - column_statistics: Some(vec![ - ColumnStatistics { - null_count: Some(0), - max_value: None, - min_value: None, - distinct_count: None - }; - self.schema().unwrap().get_fields().len() - ]), - is_exact: true, - }, - |acc, action| { - let new_stats = action - .get_stats() - .unwrap_or_else(|_| Some(protocol::Stats::default()))?; - Some(Statistics { - num_rows: acc - .num_rows - .map(|rows| rows + new_stats.num_records as usize), - total_byte_size: acc - .total_byte_size - .map(|total_size| total_size + action.size as usize), - column_statistics: acc.column_statistics.map(|col_stats| { - self.schema() - .unwrap() - .get_fields() - .iter() - .zip(col_stats) - .map(|(field, stats)| { - let null_count = new_stats - .null_count - .get(field.get_name()) - .and_then(|x| { - let null_count_acc = stats.null_count?; - let null_count = x.as_value()? as usize; - Some(null_count_acc + null_count) - }) - .or(stats.null_count); - - let max_value = new_stats - .max_values - .get(field.get_name()) - .and_then(|x| { - let old_stats = stats.clone(); - let max_value = to_scalar_value(x.as_value()?); - - match (max_value, old_stats.max_value) { - (Some(max_value), Some(old_max_value)) => { - if left_larger_than_right( - old_max_value.clone(), - max_value.clone(), - )? { - Some(old_max_value) - } else { - Some(max_value) - } - } - (Some(max_value), None) => Some(max_value), - (None, old) => old, - } - }) - .or_else(|| stats.max_value.clone()); - - let min_value = new_stats - .min_values - .get(field.get_name()) - .and_then(|x| { - let old_stats = stats.clone(); - let min_value = to_scalar_value(x.as_value()?); - - match (min_value, old_stats.min_value) { - (Some(min_value), Some(old_min_value)) => { - if left_larger_than_right( - min_value.clone(), - old_min_value.clone(), - )? { - Some(old_min_value) - } else { - Some(min_value) - } - } - (Some(min_value), None) => Some(min_value), - (None, old) => old, - } - }) - .or_else(|| stats.min_value.clone()); - - ColumnStatistics { - null_count, - max_value, - min_value, - distinct_count: None, // TODO: distinct - } - }) - .collect() - }), - is_exact: true, - }) - }, - ) - .unwrap_or_default(); - - // Convert column max/min scalar values to correct types based on arrow types. - Statistics { - is_exact: true, - num_rows: stats.num_rows, - total_byte_size: stats.total_byte_size, - column_statistics: stats.column_statistics.map(|col_stats| { - let fields = self.schema().unwrap().get_fields(); - col_stats - .iter() - .zip(fields) - .map(|(col_states, field)| { - let dt = self - .arrow_schema() - .unwrap() - .field_with_name(field.get_name()) - .unwrap() - .data_type() - .clone(); - ColumnStatistics { - null_count: col_states.null_count, - max_value: col_states - .max_value - .as_ref() - .and_then(|scalar| correct_scalar_value_type(scalar.clone(), &dt)), - min_value: col_states - .min_value - .as_ref() - .and_then(|scalar| correct_scalar_value_type(scalar.clone(), &dt)), - distinct_count: col_states.distinct_count, + let schema = self.arrow_schema().unwrap(); + // Downgrade statistics to absent if file metadata is not present. + let mut downgrade = false; + let unknown_stats = Statistics::new_unknown(&schema); + + let files = self.files(); + + // Initalize statistics + let mut table_stats = match files.get(0) { + Some(file) => { + match file.get_stats() { + Ok(Some(stats)) => { + let mut column_statistics = Vec::with_capacity(schema.fields().size()); + let total_byte_size = Precision::Exact(file.size as usize); + let num_rows = Precision::Exact(stats.num_records as usize); + + for field in schema.fields() { + let null_count = match stats.null_count.get(field.name()) { + Some(ColumnCountStat::Value(x)) => Precision::Exact(*x as usize), + //TODO: determine how to handle nested structures... + _ => Precision::Absent, + }; + + let max_value = match stats.max_values.get(field.name()) { + Some(ColumnValueStat::Value(value)) => Precision::Exact( + to_correct_scalar_value(value, field.data_type()).unwrap(), + ), + //TODO: determine how to handle nested structures... + _ => Precision::Absent, + }; + + let min_value = match stats.min_values.get(field.name()) { + Some(ColumnValueStat::Value(value)) => Precision::Exact( + to_correct_scalar_value(value, field.data_type()).unwrap(), + ), + //TODO: determine how to handle nested structures... + _ => Precision::Absent, + }; + + column_statistics.push(ColumnStatistics { + null_count, + max_value, + min_value, + distinct_count: Precision::Absent, + }); + } + + Statistics { + total_byte_size, + num_rows, + column_statistics, + } + } + Ok(None) => { + downgrade = true; + let mut stats = unknown_stats.clone(); + stats.total_byte_size = Precision::Exact(file.size as usize); + stats + } + _ => return unknown_stats, + } + } + None => { + // The Table is empty + let mut stats = unknown_stats; + stats.num_rows = Precision::Exact(0); + stats.total_byte_size = Precision::Exact(0); + return stats; + } + }; + + // Populate the remaining statistics. If file statistics are not present then relevant statistics are downgraded to absent. + for file in &files.as_slice()[1..] { + let byte_size = Precision::Exact(file.size as usize); + table_stats.total_byte_size = table_stats.total_byte_size.add(&byte_size); + + if !downgrade { + match file.get_stats() { + Ok(Some(stats)) => { + let num_records = Precision::Exact(stats.num_records as usize); + + table_stats.num_rows = table_stats.num_rows.add(&num_records); + + for (idx, field) in schema.fields().iter().enumerate() { + let column_stats = table_stats.column_statistics.get_mut(idx).unwrap(); + + let null_count = match stats.null_count.get(field.name()) { + Some(ColumnCountStat::Value(x)) => Precision::Exact(*x as usize), + //TODO: determine how to handle nested structures... + _ => Precision::Absent, + }; + + let max_value = match stats.max_values.get(field.name()) { + Some(ColumnValueStat::Value(value)) => Precision::Exact( + to_correct_scalar_value(value, field.data_type()).unwrap(), + ), + //TODO: determine how to handle nested structures... + _ => Precision::Absent, + }; + + let min_value = match stats.min_values.get(field.name()) { + Some(ColumnValueStat::Value(value)) => Precision::Exact( + to_correct_scalar_value(value, field.data_type()).unwrap(), + ), + //TODO: determine how to handle nested structures... + _ => Precision::Absent, + }; + + column_stats.null_count = column_stats.null_count.add(&null_count); + column_stats.max_value = column_stats.max_value.max(&max_value); + column_stats.min_value = column_stats.min_value.min(&min_value); } - }) - .collect() - }), + } + Ok(None) => { + downgrade = true; + } + Err(_) => return unknown_stats, + } + } + } + + if downgrade { + table_stats.column_statistics = unknown_stats.column_statistics; + table_stats.num_rows = Precision::Absent; } + + table_stats } } @@ -620,13 +604,14 @@ impl<'a> DeltaScanBuilder<'a> { let mut table_partition_cols = table_partition_cols .iter() - .map(|c| Ok((c.to_owned(), schema.field_with_name(c)?.data_type().clone()))) + .map(|name| schema.field_with_name(name).map(|f| f.to_owned())) .collect::, ArrowError>>()?; if let Some(file_column_name) = &config.file_column_name { - table_partition_cols.push(( + table_partition_cols.push(Field::new( file_column_name.clone(), wrap_partition_type_in_dict(DataType::Utf8), + false, )); } @@ -855,7 +840,7 @@ impl ExecutionPlan for DeltaScan { self.parquet_scan.execute(partition, context) } - fn statistics(&self) -> Statistics { + fn statistics(&self) -> DataFusionResult { self.parquet_scan.statistics() } } @@ -955,23 +940,27 @@ pub(crate) fn partitioned_file_from_action( } } -fn to_scalar_value(stat_val: &serde_json::Value) -> Option { - match stat_val { - serde_json::Value::Bool(val) => Some(ScalarValue::from(*val)), - serde_json::Value::Number(num) => { - if let Some(val) = num.as_i64() { - Some(ScalarValue::from(val)) - } else if let Some(val) = num.as_u64() { - Some(ScalarValue::from(val)) - } else { - num.as_f64().map(ScalarValue::from) - } - } - serde_json::Value::String(s) => Some(ScalarValue::from(s.as_str())), - serde_json::Value::Array(_) => None, - serde_json::Value::Object(_) => None, - serde_json::Value::Null => None, - } +fn parse_timestamp(stat_val: &serde_json::Value, field_dt: &ArrowDataType) -> Option { + let string = match stat_val { + serde_json::Value::String(s) => s.to_owned(), + _ => stat_val.to_string(), + }; + + let time_micro = ScalarValue::try_from_string( + string, + &ArrowDataType::Timestamp(TimeUnit::Microsecond, None), + ) + .ok()?; + let cast_arr = cast_with_options( + &time_micro.to_array(), + field_dt, + &CastOptions { + safe: false, + ..Default::default() + }, + ) + .ok()?; + ScalarValue::try_from_array(&cast_arr, 0).ok() } pub(crate) fn to_correct_scalar_value( @@ -983,146 +972,16 @@ pub(crate) fn to_correct_scalar_value( serde_json::Value::Object(_) => None, serde_json::Value::Null => get_null_of_arrow_type(field_dt).ok(), serde_json::Value::String(string_val) => match field_dt { - ArrowDataType::Timestamp(_, _) => { - let time_nanos = ScalarValue::try_from_string( - string_val.to_owned(), - &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), - ) - .ok()?; - let cast_arr = cast_with_options( - &time_nanos.to_array(), - field_dt, - &CastOptions { - safe: false, - ..Default::default() - }, - ) - .ok()?; - Some(ScalarValue::try_from_array(&cast_arr, 0).ok()?) - } + ArrowDataType::Timestamp(_, _) => parse_timestamp(stat_val, field_dt), _ => Some(ScalarValue::try_from_string(string_val.to_owned(), field_dt).ok()?), }, other => match field_dt { - ArrowDataType::Timestamp(_, _) => { - let time_nanos = ScalarValue::try_from_string( - other.to_string(), - &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), - ) - .ok()?; - let cast_arr = cast_with_options( - &time_nanos.to_array(), - field_dt, - &CastOptions { - safe: false, - ..Default::default() - }, - ) - .ok()?; - Some(ScalarValue::try_from_array(&cast_arr, 0).ok()?) - } + ArrowDataType::Timestamp(_, _) => parse_timestamp(stat_val, field_dt), _ => Some(ScalarValue::try_from_string(other.to_string(), field_dt).ok()?), }, } } -fn correct_scalar_value_type(value: ScalarValue, field_dt: &ArrowDataType) -> Option { - match field_dt { - ArrowDataType::Int64 => { - let raw_value = i64::try_from(value).ok()?; - Some(ScalarValue::from(raw_value)) - } - ArrowDataType::Int32 => { - let raw_value = i64::try_from(value).ok()? as i32; - Some(ScalarValue::from(raw_value)) - } - ArrowDataType::Int16 => { - let raw_value = i64::try_from(value).ok()? as i16; - Some(ScalarValue::from(raw_value)) - } - ArrowDataType::Int8 => { - let raw_value = i64::try_from(value).ok()? as i8; - Some(ScalarValue::from(raw_value)) - } - ArrowDataType::Float32 => { - let raw_value = f64::try_from(value).ok()? as f32; - Some(ScalarValue::from(raw_value)) - } - ArrowDataType::Float64 => { - let raw_value = f64::try_from(value).ok()?; - Some(ScalarValue::from(raw_value)) - } - ArrowDataType::Utf8 => match value { - ScalarValue::Utf8(val) => Some(ScalarValue::Utf8(val)), - _ => None, - }, - ArrowDataType::LargeUtf8 => match value { - ScalarValue::Utf8(val) => Some(ScalarValue::LargeUtf8(val)), - _ => None, - }, - ArrowDataType::Boolean => { - let raw_value = bool::try_from(value).ok()?; - Some(ScalarValue::from(raw_value)) - } - ArrowDataType::Decimal128(_, _) => { - let raw_value = f64::try_from(value).ok()?; - Some(ScalarValue::from(raw_value)) - } - ArrowDataType::Decimal256(_, _) => { - let raw_value = f64::try_from(value).ok()?; - Some(ScalarValue::from(raw_value)) - } - ArrowDataType::Date32 => { - let raw_value = i64::try_from(value).ok()? as i32; - Some(ScalarValue::Date32(Some(raw_value))) - } - ArrowDataType::Date64 => { - let raw_value = i64::try_from(value).ok()?; - Some(ScalarValue::Date64(Some(raw_value))) - } - ArrowDataType::Timestamp(TimeUnit::Nanosecond, None) => { - let raw_value = i64::try_from(value).ok()?; - Some(ScalarValue::TimestampNanosecond(Some(raw_value), None)) - } - ArrowDataType::Timestamp(TimeUnit::Microsecond, None) => { - let raw_value = i64::try_from(value).ok()?; - Some(ScalarValue::TimestampMicrosecond(Some(raw_value), None)) - } - ArrowDataType::Timestamp(TimeUnit::Millisecond, None) => { - let raw_value = i64::try_from(value).ok()?; - Some(ScalarValue::TimestampMillisecond(Some(raw_value), None)) - } - _ => { - log::error!( - "Scalar value of arrow type unimplemented for {:?} and {:?}", - value, - field_dt - ); - None - } - } -} - -fn left_larger_than_right(left: ScalarValue, right: ScalarValue) -> Option { - match (&left, &right) { - (ScalarValue::Float64(Some(l)), ScalarValue::Float64(Some(r))) => Some(l > r), - (ScalarValue::Float32(Some(l)), ScalarValue::Float32(Some(r))) => Some(l > r), - (ScalarValue::Int8(Some(l)), ScalarValue::Int8(Some(r))) => Some(l > r), - (ScalarValue::Int16(Some(l)), ScalarValue::Int16(Some(r))) => Some(l > r), - (ScalarValue::Int32(Some(l)), ScalarValue::Int32(Some(r))) => Some(l > r), - (ScalarValue::Int64(Some(l)), ScalarValue::Int64(Some(r))) => Some(l > r), - (ScalarValue::Utf8(Some(l)), ScalarValue::Utf8(Some(r))) => Some(l > r), - (ScalarValue::Boolean(Some(l)), ScalarValue::Boolean(Some(r))) => Some(l & !r), - _ => { - log::error!( - "Scalar value comparison unimplemented for {:?} and {:?}", - left, - right - ); - None - } - } -} - pub(crate) fn logical_expr_to_physical_expr( expr: &Expr, schema: &ArrowSchema, @@ -1705,76 +1564,6 @@ mod tests { } } - #[test] - fn test_to_scalar_value() { - let reference_pairs = &[ - ( - json!("val"), - Some(ScalarValue::Utf8(Some(String::from("val")))), - ), - (json!("2"), Some(ScalarValue::Utf8(Some(String::from("2"))))), - (json!(true), Some(ScalarValue::Boolean(Some(true)))), - (json!(false), Some(ScalarValue::Boolean(Some(false)))), - (json!(2), Some(ScalarValue::Int64(Some(2)))), - (json!(-2), Some(ScalarValue::Int64(Some(-2)))), - (json!(2.0), Some(ScalarValue::Float64(Some(2.0)))), - (json!(["1", "2"]), None), - (json!({"key": "val"}), None), - (json!(null), None), - ]; - for (stat_val, scalar_val) in reference_pairs { - assert_eq!(to_scalar_value(stat_val), *scalar_val) - } - } - - #[test] - fn test_left_larger_than_right() { - let correct_reference_pairs = vec![ - ( - ScalarValue::Float64(Some(1.0)), - ScalarValue::Float64(Some(2.0)), - ), - ( - ScalarValue::Float32(Some(1.0)), - ScalarValue::Float32(Some(2.0)), - ), - (ScalarValue::Int8(Some(1)), ScalarValue::Int8(Some(2))), - (ScalarValue::Int16(Some(1)), ScalarValue::Int16(Some(2))), - (ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(2))), - (ScalarValue::Int64(Some(1)), ScalarValue::Int64(Some(2))), - ( - ScalarValue::Boolean(Some(false)), - ScalarValue::Boolean(Some(true)), - ), - ( - ScalarValue::Utf8(Some(String::from("1"))), - ScalarValue::Utf8(Some(String::from("2"))), - ), - ]; - for (smaller_val, larger_val) in correct_reference_pairs { - assert_eq!( - left_larger_than_right(smaller_val.clone(), larger_val.clone()), - Some(false) - ); - assert_eq!(left_larger_than_right(larger_val, smaller_val), Some(true)); - } - - let incorrect_reference_pairs = vec![ - ( - ScalarValue::Float64(Some(1.0)), - ScalarValue::Float32(Some(2.0)), - ), - (ScalarValue::Int32(Some(1)), ScalarValue::Float32(Some(2.0))), - ( - ScalarValue::Boolean(Some(true)), - ScalarValue::Float32(Some(2.0)), - ), - ]; - for (left, right) in incorrect_reference_pairs { - assert_eq!(left_larger_than_right(left, right), None); - } - } - #[test] fn test_partitioned_file_from_action() { let mut partition_values = std::collections::HashMap::new(); diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index c15bb8052e..737837ed1b 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -211,7 +211,7 @@ mod datafusion_utils { metrics::{ExecutionPlanMetricsSet, MetricsSet}, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, }; - use datafusion_common::DFSchema; + use datafusion_common::{DFSchema, Statistics}; use datafusion_expr::Expr; use futures::{Stream, StreamExt}; @@ -337,7 +337,7 @@ mod datafusion_utils { })) } - fn statistics(&self) -> datafusion_common::Statistics { + fn statistics(&self) -> DataFusionResult { self.parent.statistics() } diff --git a/rust/tests/integration_datafusion.rs b/rust/tests/integration_datafusion.rs index 5aafe52e87..15df5441c4 100644 --- a/rust/tests/integration_datafusion.rs +++ b/rust/tests/integration_datafusion.rs @@ -45,6 +45,7 @@ use std::error::Error; mod common; mod local { + use datafusion::common::stats::Precision; use deltalake::{writer::JsonWriter, SchemaTypeMap}; use super::*; @@ -288,19 +289,22 @@ mod local { .unwrap(); let statistics = table.state.datafusion_table_statistics(); - assert_eq!(statistics.num_rows, Some(4),); + assert_eq!(statistics.num_rows, Precision::Exact(4 as usize),); - assert_eq!(statistics.total_byte_size, Some(440 + 440)); + assert_eq!( + statistics.total_byte_size, + Precision::Exact((440 + 440) as usize) + ); + let column_stats = statistics.column_statistics.get(0).unwrap(); + assert_eq!(column_stats.null_count, Precision::Exact(0)); assert_eq!( - statistics - .column_statistics - .clone() - .unwrap() - .iter() - .map(|x| x.null_count) - .collect::>>(), - vec![Some(0)], + column_stats.max_value, + Precision::Exact(ScalarValue::from(4_i32)) + ); + assert_eq!( + column_stats.min_value, + Precision::Exact(ScalarValue::from(0_i32)) ); let ctx = SessionContext::new(); @@ -324,28 +328,6 @@ mod local { Arc::new(Int32Array::from(vec![0])).as_ref(), ); - assert_eq!( - statistics - .column_statistics - .clone() - .unwrap() - .iter() - .map(|x| x.max_value.as_ref()) - .collect::>>(), - vec![Some(&ScalarValue::from(4_i32))], - ); - - assert_eq!( - statistics - .column_statistics - .clone() - .unwrap() - .iter() - .map(|x| x.min_value.as_ref()) - .collect::>>(), - vec![Some(&ScalarValue::from(0_i32))], - ); - Ok(()) } @@ -786,14 +768,14 @@ mod local { let expected_schema = ArrowSchema::new(vec![ ArrowField::new("c3", ArrowDataType::Int32, true), - ArrowField::new("c1", ArrowDataType::Int32, false), + ArrowField::new("c1", ArrowDataType::Int32, true), ArrowField::new( "c2", ArrowDataType::Dictionary( Box::new(ArrowDataType::UInt16), Box::new(ArrowDataType::Utf8), ), - false, + true, ), ]); From 33ce46e9a83b53b0c98ae628c4c506385d8790e8 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Thu, 26 Oct 2023 18:54:29 -0400 Subject: [PATCH 02/12] Refactor and expand tests for datafusion table stats --- rust/src/delta_datafusion/mod.rs | 212 +++++++++++++---------- rust/src/operations/transaction/state.rs | 15 +- rust/tests/integration_datafusion.rs | 118 ++++++++++++- 3 files changed, 238 insertions(+), 107 deletions(-) diff --git a/rust/src/delta_datafusion/mod.rs b/rust/src/delta_datafusion/mod.rs index 62900c9c9a..e8382ce15a 100644 --- a/rust/src/delta_datafusion/mod.rs +++ b/rust/src/delta_datafusion/mod.rs @@ -65,6 +65,7 @@ use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_physical_expr::{create_physical_expr, PhysicalExpr}; use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::physical_plan::PhysicalExtensionCodec; +use log::error; use object_store::ObjectMeta; use serde::{Deserialize, Serialize}; use url::Url; @@ -104,10 +105,34 @@ impl From for DeltaTableError { } } +fn get_scalar_value(value: Option<&ColumnValueStat>, field: &Arc) -> Precision { + match value { + Some(ColumnValueStat::Value(value)) => to_correct_scalar_value(value, field.data_type()) + .map(|maybe_scalar| { + maybe_scalar + .map(Precision::Exact) + .unwrap_or_default() + }) + .unwrap_or_else(|_| { + error!( + "Unable to parse scalar value of {:?} with type {} for column {}", + value, + field.data_type(), + field.name() + ); + Precision::Absent + }), + _ => Precision::Absent, + } +} + impl DeltaTableState { - /// Return statistics for Datafusion Table - pub fn datafusion_table_statistics(&self) -> Statistics { - let schema = self.arrow_schema().unwrap(); + /// Provide table level statistics to Datafusion + pub fn datafusion_table_statistics(&self) -> DataFusionResult { + // Statistics only support primitive types. Any non primitive column will not have their statistics captured + // If column statistics are missing for any add actions then we simply downgrade to Absent. + + let schema = self.arrow_schema()?; // Downgrade statistics to absent if file metadata is not present. let mut downgrade = false; let unknown_stats = Statistics::new_unknown(&schema); @@ -116,65 +141,49 @@ impl DeltaTableState { // Initalize statistics let mut table_stats = match files.get(0) { - Some(file) => { - match file.get_stats() { - Ok(Some(stats)) => { - let mut column_statistics = Vec::with_capacity(schema.fields().size()); - let total_byte_size = Precision::Exact(file.size as usize); - let num_rows = Precision::Exact(stats.num_records as usize); - - for field in schema.fields() { - let null_count = match stats.null_count.get(field.name()) { - Some(ColumnCountStat::Value(x)) => Precision::Exact(*x as usize), - //TODO: determine how to handle nested structures... - _ => Precision::Absent, - }; - - let max_value = match stats.max_values.get(field.name()) { - Some(ColumnValueStat::Value(value)) => Precision::Exact( - to_correct_scalar_value(value, field.data_type()).unwrap(), - ), - //TODO: determine how to handle nested structures... - _ => Precision::Absent, - }; - - let min_value = match stats.min_values.get(field.name()) { - Some(ColumnValueStat::Value(value)) => Precision::Exact( - to_correct_scalar_value(value, field.data_type()).unwrap(), - ), - //TODO: determine how to handle nested structures... - _ => Precision::Absent, - }; - - column_statistics.push(ColumnStatistics { - null_count, - max_value, - min_value, - distinct_count: Precision::Absent, - }); - } - - Statistics { - total_byte_size, - num_rows, - column_statistics, - } + Some(file) => match file.get_stats() { + Ok(Some(stats)) => { + let mut column_statistics = Vec::with_capacity(schema.fields().size()); + let total_byte_size = Precision::Exact(file.size as usize); + let num_rows = Precision::Exact(stats.num_records as usize); + + for field in schema.fields() { + let null_count = match stats.null_count.get(field.name()) { + Some(ColumnCountStat::Value(x)) => Precision::Exact(*x as usize), + _ => Precision::Absent, + }; + + let max_value = get_scalar_value(stats.max_values.get(field.name()), field); + let min_value = get_scalar_value(stats.min_values.get(field.name()), field); + + column_statistics.push(ColumnStatistics { + null_count, + max_value, + min_value, + distinct_count: Precision::Absent, + }); } - Ok(None) => { - downgrade = true; - let mut stats = unknown_stats.clone(); - stats.total_byte_size = Precision::Exact(file.size as usize); - stats + + Statistics { + total_byte_size, + num_rows, + column_statistics, } - _ => return unknown_stats, } - } + Ok(None) => { + downgrade = true; + let mut stats = unknown_stats.clone(); + stats.total_byte_size = Precision::Exact(file.size as usize); + stats + } + _ => return Ok(unknown_stats), + }, None => { // The Table is empty let mut stats = unknown_stats; stats.num_rows = Precision::Exact(0); stats.total_byte_size = Precision::Exact(0); - return stats; + return Ok(stats); } }; @@ -195,25 +204,13 @@ impl DeltaTableState { let null_count = match stats.null_count.get(field.name()) { Some(ColumnCountStat::Value(x)) => Precision::Exact(*x as usize), - //TODO: determine how to handle nested structures... - _ => Precision::Absent, - }; - - let max_value = match stats.max_values.get(field.name()) { - Some(ColumnValueStat::Value(value)) => Precision::Exact( - to_correct_scalar_value(value, field.data_type()).unwrap(), - ), - //TODO: determine how to handle nested structures... _ => Precision::Absent, }; - let min_value = match stats.min_values.get(field.name()) { - Some(ColumnValueStat::Value(value)) => Precision::Exact( - to_correct_scalar_value(value, field.data_type()).unwrap(), - ), - //TODO: determine how to handle nested structures... - _ => Precision::Absent, - }; + let max_value = + get_scalar_value(stats.max_values.get(field.name()), field); + let min_value = + get_scalar_value(stats.min_values.get(field.name()), field); column_stats.null_count = column_stats.null_count.add(&null_count); column_stats.max_value = column_stats.max_value.max(&max_value); @@ -223,7 +220,7 @@ impl DeltaTableState { Ok(None) => { downgrade = true; } - Err(_) => return unknown_stats, + Err(_) => return Ok(unknown_stats), } } } @@ -233,7 +230,7 @@ impl DeltaTableState { table_stats.num_rows = Precision::Absent; } - table_stats + Ok(table_stats) } } @@ -261,9 +258,12 @@ fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option Some(v) => serde_json::Value::String(v.to_string()), None => serde_json::Value::Null, }; - to_correct_scalar_value(&value, &data_type).unwrap_or( - get_null_of_arrow_type(&data_type).expect("Could not determine null type"), - ) + to_correct_scalar_value(&value, &data_type) + .ok() + .flatten() + .unwrap_or( + get_null_of_arrow_type(&data_type).expect("Could not determine null type"), + ) } else if let Ok(Some(statistics)) = add.get_stats() { let values = if get_max { statistics.max_values @@ -273,7 +273,11 @@ fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option values .get(&column.name) - .and_then(|f| to_correct_scalar_value(f.as_value()?, &data_type)) + .and_then(|f| { + to_correct_scalar_value(f.as_value()?, &data_type) + .ok() + .flatten() + }) .unwrap_or( get_null_of_arrow_type(&data_type).expect("Could not determine null type"), ) @@ -615,6 +619,17 @@ impl<'a> DeltaScanBuilder<'a> { )); } + let stats = self + .snapshot + .datafusion_table_statistics() + .unwrap_or_else(|e| { + error!( + "Error while computing table statistics. Using unknown statistics. {}", + e + ); + Statistics::new_unknown(&schema) + }); + let scan = ParquetFormat::new() .create_physical_plan( self.state, @@ -622,7 +637,7 @@ impl<'a> DeltaScanBuilder<'a> { object_store_url: self.object_store.object_store_url(), file_schema, file_groups: file_groups.into_values().collect(), - statistics: self.snapshot.datafusion_table_statistics(), + statistics: stats, projection: self.projection.cloned(), limit: self.limit, table_partition_cols, @@ -694,7 +709,7 @@ impl TableProvider for DeltaTable { } fn statistics(&self) -> Option { - Some(self.state.datafusion_table_statistics()) + self.state.datafusion_table_statistics().ok() } } @@ -773,7 +788,7 @@ impl TableProvider for DeltaTableProvider { } fn statistics(&self) -> Option { - Some(self.snapshot.datafusion_table_statistics()) + self.snapshot.datafusion_table_statistics().ok() } } @@ -919,6 +934,8 @@ pub(crate) fn partitioned_file_from_action( &serde_json::Value::String(value.to_string()), f.data_type(), ) + .ok() + .flatten() .unwrap_or(ScalarValue::Null), None => get_null_of_arrow_type(f.data_type()).unwrap_or(ScalarValue::Null), }) @@ -940,7 +957,10 @@ pub(crate) fn partitioned_file_from_action( } } -fn parse_timestamp(stat_val: &serde_json::Value, field_dt: &ArrowDataType) -> Option { +fn parse_timestamp( + stat_val: &serde_json::Value, + field_dt: &ArrowDataType, +) -> DataFusionResult { let string = match stat_val { serde_json::Value::String(s) => s.to_owned(), _ => stat_val.to_string(), @@ -949,8 +969,7 @@ fn parse_timestamp(stat_val: &serde_json::Value, field_dt: &ArrowDataType) -> Op let time_micro = ScalarValue::try_from_string( string, &ArrowDataType::Timestamp(TimeUnit::Microsecond, None), - ) - .ok()?; + )?; let cast_arr = cast_with_options( &time_micro.to_array(), field_dt, @@ -958,26 +977,31 @@ fn parse_timestamp(stat_val: &serde_json::Value, field_dt: &ArrowDataType) -> Op safe: false, ..Default::default() }, - ) - .ok()?; - ScalarValue::try_from_array(&cast_arr, 0).ok() + )?; + ScalarValue::try_from_array(&cast_arr, 0) } pub(crate) fn to_correct_scalar_value( stat_val: &serde_json::Value, field_dt: &ArrowDataType, -) -> Option { +) -> DataFusionResult> { match stat_val { - serde_json::Value::Array(_) => None, - serde_json::Value::Object(_) => None, - serde_json::Value::Null => get_null_of_arrow_type(field_dt).ok(), + serde_json::Value::Array(_) => Ok(None), + serde_json::Value::Object(_) => Ok(None), + serde_json::Value::Null => Ok(Some(get_null_of_arrow_type(field_dt)?)), serde_json::Value::String(string_val) => match field_dt { - ArrowDataType::Timestamp(_, _) => parse_timestamp(stat_val, field_dt), - _ => Some(ScalarValue::try_from_string(string_val.to_owned(), field_dt).ok()?), + ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)), + _ => Ok(Some(ScalarValue::try_from_string( + string_val.to_owned(), + field_dt, + )?)), }, other => match field_dt { - ArrowDataType::Timestamp(_, _) => parse_timestamp(stat_val, field_dt), - _ => Some(ScalarValue::try_from_string(other.to_string(), field_dt).ok()?), + ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)), + _ => Ok(Some(ScalarValue::try_from_string( + other.to_string(), + field_dt, + )?)), }, } } @@ -1559,7 +1583,7 @@ mod tests { ]; for (raw, data_type, ref_scalar) in reference_pairs { - let scalar = to_correct_scalar_value(raw, data_type).unwrap(); + let scalar = to_correct_scalar_value(raw, data_type).unwrap().unwrap(); assert_eq!(*ref_scalar, scalar) } } diff --git a/rust/src/operations/transaction/state.rs b/rust/src/operations/transaction/state.rs index bb9c3ff35e..8e9fa58fa6 100644 --- a/rust/src/operations/transaction/state.rs +++ b/rust/src/operations/transaction/state.rs @@ -188,9 +188,12 @@ impl<'a> AddContainer<'a> { Some(v) => serde_json::Value::String(v.to_string()), None => serde_json::Value::Null, }; - to_correct_scalar_value(&value, data_type).unwrap_or( - get_null_of_arrow_type(data_type).expect("Could not determine null type"), - ) + to_correct_scalar_value(&value, data_type) + .ok() + .flatten() + .unwrap_or( + get_null_of_arrow_type(data_type).expect("Could not determine null type"), + ) } else if let Ok(Some(statistics)) = add.get_stats() { let values = if get_max { statistics.max_values @@ -200,7 +203,11 @@ impl<'a> AddContainer<'a> { values .get(&column.name) - .and_then(|f| to_correct_scalar_value(f.as_value()?, data_type)) + .and_then(|f| { + to_correct_scalar_value(f.as_value()?, data_type) + .ok() + .flatten() + }) .unwrap_or( get_null_of_arrow_type(data_type).expect("Could not determine null type"), ) diff --git a/rust/tests/integration_datafusion.rs b/rust/tests/integration_datafusion.rs index 15df5441c4..dfb23a1421 100644 --- a/rust/tests/integration_datafusion.rs +++ b/rust/tests/integration_datafusion.rs @@ -284,10 +284,11 @@ mod local { #[tokio::test] async fn test_datafusion_stats() -> Result<()> { + // Validate a table that contains statisitics for all files let table = deltalake::open_table("./tests/data/delta-0.8.0") .await .unwrap(); - let statistics = table.state.datafusion_table_statistics(); + let statistics = table.state.datafusion_table_statistics()?; assert_eq!(statistics.num_rows, Precision::Exact(4 as usize),); @@ -309,25 +310,124 @@ mod local { let ctx = SessionContext::new(); ctx.register_table("test_table", Arc::new(table))?; - - let batches = ctx + let actual = ctx .sql("SELECT max(value), min(value) FROM test_table") .await? .collect() .await?; - assert_eq!(batches.len(), 1); - let batch = &batches[0]; + let expected = vec![ + "+-----------------------+-----------------------+", + "| MAX(test_table.value) | MIN(test_table.value) |", + "+-----------------------+-----------------------+", + "| 4 | 0 |", + "+-----------------------+-----------------------+", + ]; + assert_batches_sorted_eq!(&expected, &actual); + + // Validate a table that does not contain column statisitics + let table = deltalake::open_table("./tests/data/delta-0.2.0") + .await + .unwrap(); + let statistics = table.state.datafusion_table_statistics()?; + + assert_eq!(statistics.num_rows, Precision::Absent); + assert_eq!( - batch.column(0).as_ref(), - Arc::new(Int32Array::from(vec![4])).as_ref(), + statistics.total_byte_size, + Precision::Exact((400 + 404 + 396) as usize) ); + let column_stats = statistics.column_statistics.get(0).unwrap(); + assert_eq!(column_stats.null_count, Precision::Absent); + assert_eq!(column_stats.max_value, Precision::Absent); + assert_eq!(column_stats.min_value, Precision::Absent); + ctx.register_table("test_table2", Arc::new(table))?; + let actual = ctx + .sql("SELECT max(value), min(value) FROM test_table2") + .await? + .collect() + .await?; + + let expected = vec![ + "+------------------------+------------------------+", + "| MAX(test_table2.value) | MIN(test_table2.value) |", + "+------------------------+------------------------+", + "| 3 | 1 |", + "+------------------------+------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &actual); + + // Validate a table that contains nested structures. + + // This table is interesting since it goes through schema evolution. + // In particular 'new_column' contains statistics for when it + // is introduced (10) but the commit following (11) does not contain + // statistics for this column. + let table = deltalake::open_table("./tests/data/delta-1.2.1-only-struct-stats") + .await + .unwrap(); + let schema = table.get_schema().unwrap(); + let statistics = table.state.datafusion_table_statistics()?; + assert_eq!(statistics.num_rows, Precision::Exact(12)); + + // `new_column` statistics + let stats = statistics + .column_statistics + .get(schema.index_of("new_column").unwrap()) + .unwrap(); + assert_eq!(stats.null_count, Precision::Absent); + assert_eq!(stats.min_value, Precision::Absent); + assert_eq!(stats.max_value, Precision::Absent); + + // `date` statistics + let stats = statistics + .column_statistics + .get(schema.index_of("date").unwrap()) + .unwrap(); + assert_eq!(stats.null_count, Precision::Exact(0)); + // 2022-10-24 + assert_eq!( + stats.min_value, + Precision::Exact(ScalarValue::Date32(Some(19289))) + ); assert_eq!( - batch.column(1).as_ref(), - Arc::new(Int32Array::from(vec![0])).as_ref(), + stats.max_value, + Precision::Exact(ScalarValue::Date32(Some(19289))) ); + // `timestamp` statistics + let stats = statistics + .column_statistics + .get(schema.index_of("timestamp").unwrap()) + .unwrap(); + assert_eq!(stats.null_count, Precision::Exact(0)); + // 2022-10-24T22:59:32.846Z + assert_eq!( + stats.min_value, + Precision::Exact(ScalarValue::TimestampMicrosecond( + Some(1666652372846000), + None + )) + ); + // 2022-10-24T22:59:46.083Z + assert_eq!( + stats.max_value, + Precision::Exact(ScalarValue::TimestampMicrosecond( + Some(1666652386083000), + None + )) + ); + + // `struct_element` statistics + let stats = statistics + .column_statistics + .get(schema.index_of("nested_struct").unwrap()) + .unwrap(); + assert_eq!(stats.null_count, Precision::Absent); + assert_eq!(stats.min_value, Precision::Absent); + assert_eq!(stats.max_value, Precision::Absent); + Ok(()) } From eb3b6d184ad71548d3eee1dc8e97181370cc7354 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Fri, 27 Oct 2023 13:32:55 -0400 Subject: [PATCH 03/12] cargo fmt --- rust/src/delta_datafusion/mod.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/rust/src/delta_datafusion/mod.rs b/rust/src/delta_datafusion/mod.rs index e8382ce15a..0895798cc5 100644 --- a/rust/src/delta_datafusion/mod.rs +++ b/rust/src/delta_datafusion/mod.rs @@ -108,11 +108,7 @@ impl From for DeltaTableError { fn get_scalar_value(value: Option<&ColumnValueStat>, field: &Arc) -> Precision { match value { Some(ColumnValueStat::Value(value)) => to_correct_scalar_value(value, field.data_type()) - .map(|maybe_scalar| { - maybe_scalar - .map(Precision::Exact) - .unwrap_or_default() - }) + .map(|maybe_scalar| maybe_scalar.map(Precision::Exact).unwrap_or_default()) .unwrap_or_else(|_| { error!( "Unable to parse scalar value of {:?} with type {} for column {}", From e70fbfb4dc926635d444ae7f190e5640bbaf8b23 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sat, 4 Nov 2023 22:52:18 -0400 Subject: [PATCH 04/12] cargo fmt --- crates/deltalake-core/src/delta_datafusion/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/deltalake-core/src/delta_datafusion/mod.rs b/crates/deltalake-core/src/delta_datafusion/mod.rs index 48ac11d782..b8ba77e539 100644 --- a/crates/deltalake-core/src/delta_datafusion/mod.rs +++ b/crates/deltalake-core/src/delta_datafusion/mod.rs @@ -936,8 +936,7 @@ pub(crate) fn partitioned_file_from_action( field.data_type(), ) .unwrap_or(Some(ScalarValue::Null)) - .unwrap_or(ScalarValue::Null) - , + .unwrap_or(ScalarValue::Null), None => get_null_of_arrow_type(field.data_type()) .unwrap_or(ScalarValue::Null), }) From b7db2ecdfb78bfbdf164ec80886f5470081a827b Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sat, 11 Nov 2023 23:07:30 -0500 Subject: [PATCH 05/12] merge benchmarks --- crates/benchmarks/Cargo.toml | 48 ++ crates/benchmarks/src/bin/merge.rs | 705 +++++++++++++++++++++++++++++ crates/benchmarks/src/util/lib.rs | 12 + 3 files changed, 765 insertions(+) create mode 100644 crates/benchmarks/Cargo.toml create mode 100644 crates/benchmarks/src/bin/merge.rs create mode 100644 crates/benchmarks/src/util/lib.rs diff --git a/crates/benchmarks/Cargo.toml b/crates/benchmarks/Cargo.toml new file mode 100644 index 0000000000..334a80f407 --- /dev/null +++ b/crates/benchmarks/Cargo.toml @@ -0,0 +1,48 @@ +[package] +name = "delta-benchmarks" +version = "0.0.1" +authors = ["Qingping Hou "] +homepage = "https://github.com/delta-io/delta.rs" +license = "Apache-2.0" +keywords = ["deltalake", "delta", "datalake"] +description = "Delta-rs Benchmarks" +edition = "2021" + +[dependencies] +clap = { version = "4", features = [ "derive" ] } +chrono = { version = "0.4.31", default-features = false, features = ["clock"] } +tokio = { version = "1", features = ["fs", "macros", "rt", "io-util"] } +env_logger = "0" +jemallocator = "0.5.4" +jemalloc-ctl = "0.5.4" + +# arrow +arrow = { workspace = true } +arrow-array = { workspace = true } +arrow-buffer = { workspace = true } +arrow-cast = { workspace = true } +arrow-ord = { workspace = true } +arrow-row = { workspace = true } +arrow-schema = { workspace = true, features = ["serde"] } +arrow-select = { workspace = true } +parquet = { workspace = true, features = [ + "async", + "object_store", +] } + +# serde +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } + +# datafusion +datafusion = { workspace = true } +datafusion-expr = { workspace = true } +datafusion-common = { workspace = true } +datafusion-proto = { workspace = true } +datafusion-sql = { workspace = true } +datafusion-physical-expr = { workspace = true } + +[dependencies.deltalake-core] +path = "../deltalake-core" +version = "0" +features = ["datafusion"] diff --git a/crates/benchmarks/src/bin/merge.rs b/crates/benchmarks/src/bin/merge.rs new file mode 100644 index 0000000000..19224503b4 --- /dev/null +++ b/crates/benchmarks/src/bin/merge.rs @@ -0,0 +1,705 @@ +use std::{ + sync::{mpsc::TryRecvError, Arc}, + time::{SystemTime, UNIX_EPOCH}, +}; + +use arrow::datatypes::Schema as ArrowSchema; +use arrow_array::{RecordBatch, StringArray, UInt32Array}; +use chrono::Duration; +use clap::{command, Args, Parser, Subcommand}; +use datafusion::{datasource::MemTable, prelude::DataFrame}; +use datafusion_common::DataFusionError; +use datafusion_expr::{cast, col, lit, random}; +use deltalake_core::protocol::SaveMode; +use deltalake_core::{ + arrow::{ + self, + datatypes::{DataType, Field}, + }, + datafusion::prelude::{CsvReadOptions, SessionContext}, + delta_datafusion::{DeltaScanConfig, DeltaTableProvider}, + operations::merge::{MergeBuilder, MergeMetrics}, + DeltaOps, DeltaTable, DeltaTableBuilder, DeltaTableError, ObjectStore, Path, +}; +use serde_json::json; +use tokio::time::Instant; + +//#[global_allocator] +//static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; + +/* Convert web_returns dataset from TPC DS's datagen utility into a Delta table + This table will be partitioned on `wr_returned_date_sk` +*/ +pub async fn convert_tpcds_web_returns(input_path: String, table_path: String) -> Result<(), ()> { + let ctx = SessionContext::new(); + + let schema = ArrowSchema::new(vec![ + Field::new("wr_returned_date_sk", DataType::Int64, true), + Field::new("wr_returned_time_sk", DataType::Int64, true), + Field::new("wr_item_sk", DataType::Int64, false), + Field::new("wr_refunded_customer_sk", DataType::Int64, true), + Field::new("wr_refunded_cdemo_sk", DataType::Int64, true), + Field::new("wr_refunded_hdemo_sk", DataType::Int64, true), + Field::new("wr_refunded_addr_sk", DataType::Int64, true), + Field::new("wr_returning_customer_sk", DataType::Int64, true), + Field::new("wr_returning_cdemo_sk", DataType::Int64, true), + Field::new("wr_returning_hdemo_sk", DataType::Int64, true), + Field::new("wr_returning_addr_sk", DataType::Int64, true), + Field::new("wr_web_page_sk", DataType::Int64, true), + Field::new("wr_reason_sk", DataType::Int64, true), + Field::new("wr_order_number", DataType::Int64, false), + Field::new("wr_return_quantity", DataType::Int32, true), + Field::new("wr_return_amt", DataType::Decimal128(7, 2), true), + Field::new("wr_return_tax", DataType::Decimal128(7, 2), true), + Field::new("wr_return_amt_inc_tax", DataType::Decimal128(7, 2), true), + Field::new("wr_fee", DataType::Decimal128(7, 2), true), + Field::new("wr_return_ship_cost", DataType::Decimal128(7, 2), true), + Field::new("wr_refunded_cash", DataType::Decimal128(7, 2), true), + Field::new("wr_reversed_charge", DataType::Decimal128(7, 2), true), + Field::new("wr_account_credit", DataType::Decimal128(7, 2), true), + Field::new("wr_net_loss", DataType::Decimal128(7, 2), true), + ]); + + let table = ctx + .read_csv( + input_path, + CsvReadOptions { + has_header: false, + delimiter: b'|', + file_extension: ".dat", + schema: Some(&schema), + ..Default::default() + }, + ) + .await + .unwrap(); + + DeltaOps::try_from_uri(table_path) + .await + .unwrap() + .write(table.collect().await.unwrap()) + .with_partition_columns(vec!["wr_returned_date_sk"]) + .await + .unwrap(); + + Ok(()) +} + +fn merge_upsert(source: DataFrame, table: DeltaTable) -> Result { + DeltaOps(table) + .merge(source, "source.wr_item_sk = target.wr_item_sk and source.wr_order_number = target.wr_order_number") + .with_source_alias("source") + .with_target_alias("target") + .when_matched_update(|update| { + update + .update("wr_returned_date_sk", "source.wr_returned_date_sk") + .update("wr_returned_time_sk", "source.wr_returned_time_sk") + .update("wr_item_sk", "source.wr_item_sk") + .update("wr_refunded_customer_sk", "source.wr_refunded_customer_sk") + .update("wr_refunded_cdemo_sk", "source.wr_refunded_cdemo_sk") + .update("wr_refunded_hdemo_sk", "source.wr_refunded_hdemo_sk") + .update("wr_refunded_addr_sk", "source.wr_refunded_addr_sk") + .update("wr_returning_customer_sk", "source.wr_returning_customer_sk") + .update("wr_returning_cdemo_sk", "source.wr_returning_cdemo_sk") + .update("wr_returning_hdemo_sk", "source.wr_returning_hdemo_sk") + .update("wr_returning_addr_sk", "source.wr_returning_addr_sk") + .update("wr_web_page_sk", "source.wr_web_page_sk") + .update("wr_reason_sk", "source.wr_reason_sk") + .update("wr_order_number", "source.wr_order_number") + .update("wr_return_quantity", "source.wr_return_quantity") + .update("wr_return_amt", "source.wr_return_amt") + .update("wr_return_tax", "source.wr_return_tax") + .update("wr_return_amt_inc_tax", "source.wr_return_amt_inc_tax") + .update("wr_fee", "source.wr_fee") + .update("wr_return_ship_cost", "source.wr_return_ship_cost") + .update("wr_refunded_cash", "source.wr_refunded_cash") + .update("wr_reversed_charge", "source.wr_reversed_charge") + .update("wr_account_credit", "source.wr_account_credit") + .update("wr_net_loss", "source.wr_net_loss") + })? + .when_not_matched_insert(|insert| { + insert + .set("wr_returned_date_sk", "source.wr_returned_date_sk") + .set("wr_returned_time_sk", "source.wr_returned_time_sk") + .set("wr_item_sk", "source.wr_item_sk") + .set("wr_refunded_customer_sk", "source.wr_refunded_customer_sk") + .set("wr_refunded_cdemo_sk", "source.wr_refunded_cdemo_sk") + .set("wr_refunded_hdemo_sk", "source.wr_refunded_hdemo_sk") + .set("wr_refunded_addr_sk", "source.wr_refunded_addr_sk") + .set("wr_returning_customer_sk", "source.wr_returning_customer_sk") + .set("wr_returning_cdemo_sk", "source.wr_returning_cdemo_sk") + .set("wr_returning_hdemo_sk", "source.wr_returning_hdemo_sk") + .set("wr_returning_addr_sk", "source.wr_returning_addr_sk") + .set("wr_web_page_sk", "source.wr_web_page_sk") + .set("wr_reason_sk", "source.wr_reason_sk") + .set("wr_order_number", "source.wr_order_number") + .set("wr_return_quantity", "source.wr_return_quantity") + .set("wr_return_amt", "source.wr_return_amt") + .set("wr_return_tax", "source.wr_return_tax") + .set("wr_return_amt_inc_tax", "source.wr_return_amt_inc_tax") + .set("wr_fee", "source.wr_fee") + .set("wr_return_ship_cost", "source.wr_return_ship_cost") + .set("wr_refunded_cash", "source.wr_refunded_cash") + .set("wr_reversed_charge", "source.wr_reversed_charge") + .set("wr_account_credit", "source.wr_account_credit") + .set("wr_net_loss", "source.wr_net_loss") + }) +} + +fn merge_insert(source: DataFrame, table: DeltaTable) -> Result { + DeltaOps(table) + .merge(source, "source.wr_item_sk = target.wr_item_sk and source.wr_order_number = target.wr_order_number") + .with_source_alias("source") + .with_target_alias("target") + .when_not_matched_insert(|insert| { + insert + .set("wr_returned_date_sk", "source.wr_returned_date_sk") + .set("wr_returned_time_sk", "source.wr_returned_time_sk") + .set("wr_item_sk", "source.wr_item_sk") + .set("wr_refunded_customer_sk", "source.wr_refunded_customer_sk") + .set("wr_refunded_cdemo_sk", "source.wr_refunded_cdemo_sk") + .set("wr_refunded_hdemo_sk", "source.wr_refunded_hdemo_sk") + .set("wr_refunded_addr_sk", "source.wr_refunded_addr_sk") + .set("wr_returning_customer_sk", "source.wr_returning_customer_sk") + .set("wr_returning_cdemo_sk", "source.wr_returning_cdemo_sk") + .set("wr_returning_hdemo_sk", "source.wr_returning_hdemo_sk") + .set("wr_returning_addr_sk", "source.wr_returning_addr_sk") + .set("wr_web_page_sk", "source.wr_web_page_sk") + .set("wr_reason_sk", "source.wr_reason_sk") + .set("wr_order_number", "source.wr_order_number") + .set("wr_return_quantity", "source.wr_return_quantity") + .set("wr_return_amt", "source.wr_return_amt") + .set("wr_return_tax", "source.wr_return_tax") + .set("wr_return_amt_inc_tax", "source.wr_return_amt_inc_tax") + .set("wr_fee", "source.wr_fee") + .set("wr_return_ship_cost", "source.wr_return_ship_cost") + .set("wr_refunded_cash", "source.wr_refunded_cash") + .set("wr_reversed_charge", "source.wr_reversed_charge") + .set("wr_account_credit", "source.wr_account_credit") + .set("wr_net_loss", "source.wr_net_loss") + }) +} + +fn merge_delete(source: DataFrame, table: DeltaTable) -> Result { + DeltaOps(table) + .merge(source, "source.wr_item_sk = target.wr_item_sk and source.wr_order_number = target.wr_order_number") + .with_source_alias("source") + .with_target_alias("target") + .when_matched_delete(|delete| { + delete + }) +} + +async fn benchmark_merge_tpcds( + path: String, + parameters: MergePerfParams, + merge: fn(DataFrame, DeltaTable) -> Result, +) -> Result<(core::time::Duration, MergeMetrics), DataFusionError> { + let table = DeltaTableBuilder::from_uri(path).load().await?; + let file_count = table.state.files().len(); + + let provider = DeltaTableProvider::try_new( + table.state.clone(), + table.log_store(), + DeltaScanConfig { + file_column_name: Some("file_path".to_string()), + }, + ) + .unwrap(); + + let ctx = SessionContext::new(); + ctx.register_table("t1", Arc::new(provider))?; + + let files = ctx + .sql("select file_path as file from t1 group by file") + .await? + .with_column("r", random())? + .filter(col("r").lt_eq(lit(parameters.sample_files)))?; + + let file_sample = files.collect_partitioned().await?; + let schema = file_sample.get(0).unwrap().get(0).unwrap().schema(); + let mem_table = Arc::new(MemTable::try_new(schema, file_sample)?); + ctx.register_table("file_sample", mem_table)?; + let file_sample_count = ctx.table("file_sample").await?.count().await?; + + let row_sample = ctx.table("t1").await?.join( + ctx.table("file_sample").await?, + datafusion_common::JoinType::Inner, + &["file_path"], + &["file"], + None, + )?; + + let matched = row_sample + .clone() + .filter(random().lt_eq(lit(parameters.sample_matched_rows)))?; + + let rand = cast(random() * lit(u32::MAX), DataType::Int64); + let not_matched = row_sample + .filter(random().lt_eq(lit(parameters.sample_not_matched_rows)))? + .with_column("wr_item_sk", rand.clone())? + .with_column("wr_order_number", rand)?; + + let source = matched.union(not_matched)?; + + let start = Instant::now(); + let (table, metrics) = merge(source, table)?.await?; + let end = Instant::now(); + + let duration = end.duration_since(start); + + println!("Total File count: {}", file_count); + println!("File sample count: {}", file_sample_count); + println!("{:?}", metrics); + println!("Seconds: {}", duration.as_secs_f32()); + + // Clean up and restore to original state. + let (table, _) = DeltaOps(table).restore().with_version_to_restore(0).await?; + let (table, _) = DeltaOps(table) + .vacuum() + .with_retention_period(Duration::seconds(0)) + .with_enforce_retention_duration(false) + .await?; + table + .object_store() + .delete(&Path::parse("_delta_log/00000000000000000001.json")?) + .await?; + table + .object_store() + .delete(&Path::parse("_delta_log/00000000000000000002.json")?) + .await?; + + Ok((duration, metrics)) +} + +#[derive(Subcommand, Debug)] +enum Command { + Convert(Convert), + Bench(BenchArg), + Standard(Standard), + Compare(Compare), + Show(Show), +} + +#[derive(Debug, Args)] +struct Convert { + tpcds_path: String, + delta_path: String, +} + +#[derive(Debug, Args)] +struct Standard { + delta_path: String, + samples: Option, + output_path: Option, + group_id: Option, +} + +#[derive(Debug, Args)] +struct Compare { + before_path: String, + before_group_id: String, + after_path: String, + after_group_id: String, +} + +#[derive(Debug, Args)] +struct Show { + path: String, +} + +#[derive(Debug, Args)] +struct BenchArg { + table_path: String, + #[command(subcommand)] + name: MergeBench, +} + +struct Bench { + name: String, + op: fn(DataFrame, DeltaTable) -> Result, + params: MergePerfParams, +} + +impl Bench { + fn new( + name: S, + op: fn(DataFrame, DeltaTable) -> Result, + params: MergePerfParams, + ) -> Self { + Bench { + name: name.to_string(), + op, + params, + } + } +} + +#[derive(Debug, Args, Clone)] +struct MergePerfParams { + pub sample_files: f32, + pub sample_matched_rows: f32, + pub sample_not_matched_rows: f32, +} + +#[derive(Debug, Clone, Subcommand)] +enum MergeBench { + Upsert(MergePerfParams), + Delete(MergePerfParams), + Insert(MergePerfParams), +} + +#[derive(Parser, Debug)] +#[command(about)] +struct MergePrefArgs { + #[command(subcommand)] + command: Command, +} + +struct MemoryStats { + allocated: Vec, + resident: Vec, +} + +#[derive(Debug, Default)] +struct Stats { + // Percentiles + min: usize, + p25: usize, + p50: usize, + p75: usize, + max: usize, + //Other + mean: f64, +} + +impl Stats { + pub fn from(mut series: Vec) -> Stats { + let mut stats = Stats::default(); + let items = series.len(); + if items == 0 { + return stats; + } + + series.sort(); + + stats.min = series[0]; + stats.p25 = series[items / 4]; + stats.p50 = series[items / 2]; + stats.p75 = series[(items * 3) / 4]; + stats.max = series[items - 1]; + + let sum = series.iter().fold(0.0, |acc, x| acc + (*x as f64)); + stats.mean = sum / (items as f64); + + stats + } +} + +impl MemoryStats { + pub fn new() -> Self { + MemoryStats { + allocated: Vec::new(), + resident: Vec::new(), + } + } + + pub fn update(&mut self) { + let epoch = jemalloc_ctl::epoch::mib().unwrap(); + epoch.advance().unwrap(); + let allocated_mib = jemalloc_ctl::stats::allocated::mib().unwrap(); + let resident_mib = jemalloc_ctl::stats::resident::mib().unwrap(); + + self.allocated.push(allocated_mib.read().unwrap()); + self.resident.push(resident_mib.read().unwrap()); + } +} + +#[tokio::main] +async fn main() { + match MergePrefArgs::parse().command { + Command::Convert(Convert { + tpcds_path, + delta_path, + }) => { + convert_tpcds_web_returns(tpcds_path, delta_path) + .await + .unwrap(); + } + Command::Bench(BenchArg { table_path, name }) => { + /* + let (tx, rx) = std::sync::mpsc::channel(); + let handle = std::thread::spawn(move || { + let mut stats = MemoryStats::new(); + while let Err(TryRecvError::Empty) = rx.try_recv() { + stats.update(); + std::thread::sleep(std::time::Duration::from_secs(1)); + } + + stats.update(); + stats + }); + */ + + let (merge_op, params): ( + fn(DataFrame, DeltaTable) -> Result, + MergePerfParams, + ) = match name { + MergeBench::Upsert(params) => (merge_upsert, params), + MergeBench::Delete(params) => (merge_delete, params), + MergeBench::Insert(params) => (merge_insert, params), + }; + + benchmark_merge_tpcds(table_path, params, merge_op) + .await + .unwrap(); + + /* + tx.send(Some(())).unwrap(); + let stats = handle.join().unwrap(); + let a = Stats::from(stats.allocated); + let r = Stats::from(stats.resident); + + println!("{:?}", a); + println!("{:?}", r); + */ + } + Command::Standard(Standard { + delta_path, + samples, + output_path, + group_id, + }) => { + let benches = vec![Bench::new( + "delete_only_fileMatchedFraction_0.05_rowMatchedFraction_0.05", + merge_delete, + MergePerfParams { + sample_files: 0.05, + sample_matched_rows: 0.05, + sample_not_matched_rows: 0.0, + }, + ), + Bench::new( + "multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_0.05", + merge_insert, + MergePerfParams { + sample_files: 0.05, + sample_matched_rows: 0.00, + sample_not_matched_rows: 0.05, + }, + ), + Bench::new( + "multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_0.50", + merge_insert, + MergePerfParams { + sample_files: 0.05, + sample_matched_rows: 0.00, + sample_not_matched_rows: 0.50, + }, + ), + Bench::new( + "multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_1.0", + merge_insert, + MergePerfParams { + sample_files: 0.05, + sample_matched_rows: 0.00, + sample_not_matched_rows: 1.0, + }, + ), + Bench::new( + "upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.01_rowNotMatchedFraction_0.1", + merge_upsert, + MergePerfParams { + sample_files: 0.05, + sample_matched_rows: 0.01, + sample_not_matched_rows: 0.1, + }, + ), + Bench::new( + "upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.0_rowNotMatchedFraction_0.1", + merge_upsert, + MergePerfParams { + sample_files: 0.05, + sample_matched_rows: 0.00, + sample_not_matched_rows: 0.1, + }, + ), + Bench::new( + "upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.1_rowNotMatchedFraction_0.0", + merge_upsert, + MergePerfParams { + sample_files: 0.05, + sample_matched_rows: 0.1, + sample_not_matched_rows: 0.0, + }, + ), + Bench::new( + "upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.1_rowNotMatchedFraction_0.01", + merge_upsert, + MergePerfParams { + sample_files: 0.05, + sample_matched_rows: 0.1, + sample_not_matched_rows: 0.01, + }, + ), + Bench::new( + "upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.5_rowNotMatchedFraction_0.001", + merge_upsert, + MergePerfParams { + sample_files: 0.05, + sample_matched_rows: 0.5, + sample_not_matched_rows: 0.001, + }, + ), + Bench::new( + "upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.99_rowNotMatchedFraction_0.001", + merge_upsert, + MergePerfParams { + sample_files: 0.05, + sample_matched_rows: 0.99, + sample_not_matched_rows: 0.001, + }, + ), + Bench::new( + "upsert_fileMatchedFraction_0.05_rowMatchedFraction_1.0_rowNotMatchedFraction_0.001", + merge_upsert, + MergePerfParams { + sample_files: 0.05, + sample_matched_rows: 1.0, + sample_not_matched_rows: 0.001, + }, + ), + Bench::new( + "upsert_fileMatchedFraction_0.5_rowMatchedFraction_0.001_rowNotMatchedFraction_0.001", + merge_upsert, + MergePerfParams { + sample_files: 0.5, + sample_matched_rows: 0.001, + sample_not_matched_rows: 0.001, + }, + ), + Bench::new( + "upsert_fileMatchedFraction_1.0_rowMatchedFraction_0.001_rowNotMatchedFraction_0.001", + merge_upsert, + MergePerfParams { + sample_files: 1.0, + sample_matched_rows: 0.001, + sample_not_matched_rows: 0.001, + }, + ) + ]; + + let num_samples = samples.unwrap_or(1); + let group_id = group_id.unwrap_or( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() + .to_string(), + ); + let output = output_path.unwrap_or("data/benchmarks".into()); + + let mut group_ids = vec![]; + let mut name = vec![]; + let mut samples = vec![]; + let mut duration_ms = vec![]; + let mut data = vec![]; + + for bench in benches { + for sample in 0..num_samples { + println!("Test: {} Sample: {}", bench.name, sample); + let res = + benchmark_merge_tpcds(delta_path.clone(), bench.params.clone(), bench.op) + .await + .unwrap(); + + group_ids.push(group_id.clone()); + name.push(bench.name.clone()); + samples.push(sample); + duration_ms.push(res.0.as_millis() as u32); + data.push(json!(res.1).to_string()); + } + } + + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("group_id", DataType::Utf8, false), + Field::new("name", DataType::Utf8, false), + Field::new("sample", DataType::UInt32, false), + Field::new("duration_ms", DataType::UInt32, false), + Field::new("data", DataType::Utf8, true), + ])); + + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(group_ids)), + Arc::new(StringArray::from(name)), + Arc::new(UInt32Array::from(samples)), + Arc::new(UInt32Array::from(duration_ms)), + Arc::new(StringArray::from(data)), + ], + ) + .unwrap(); + + DeltaOps::try_from_uri(output) + .await + .unwrap() + .write(vec![batch]) + .with_save_mode(SaveMode::Append) + .await + .unwrap(); + } + Command::Compare(Compare { + before_path, + before_group_id, + after_path, + after_group_id, + }) => { + let before_table = DeltaTableBuilder::from_uri(before_path) + .load() + .await + .unwrap(); + let after_table = DeltaTableBuilder::from_uri(after_path) + .load() + .await + .unwrap(); + + let ctx = SessionContext::new(); + ctx.register_table("before", Arc::new(before_table)) + .unwrap(); + ctx.register_table("after", Arc::new(after_table)).unwrap(); + + + let before_stats = ctx.sql(&format!(" + select name as before_name, + avg(cast(duration_ms as float)) as before_duration_avg + from before where group_id = {} + group by name + ", before_group_id)).await.unwrap(); + + let after_stats = ctx.sql(&format!(" + select name as after_name, + avg(cast(duration_ms as float)) as after_duration_avg + from after where group_id = {} + group by name + ", after_group_id)).await.unwrap(); + + before_stats.join(after_stats, datafusion_common::JoinType::Inner, &["before_name"], &["after_name"], None).unwrap() + .select(vec![col("before_name").alias("name"), col("before_duration_avg"), col("after_duration_avg"), (col("after_duration_avg") / (col("before_duration_avg")))]).unwrap() + .show().await.unwrap(); + + } + Command::Show(Show { path }) => { + let stats = DeltaTableBuilder::from_uri(path).load().await.unwrap(); + let ctx = SessionContext::new(); + ctx.register_table("stats", Arc::new(stats)).unwrap(); + + ctx.sql("select * from stats") + .await + .unwrap() + .show() + .await + .unwrap(); + } + } +} diff --git a/crates/benchmarks/src/util/lib.rs b/crates/benchmarks/src/util/lib.rs new file mode 100644 index 0000000000..1a27500605 --- /dev/null +++ b/crates/benchmarks/src/util/lib.rs @@ -0,0 +1,12 @@ + + +struct TestConfig { + data: Box +} + +struct Runner { + setup: u32, + iteration_setup: u32, + iteration_cleanup: u32, + cleanup: u32 +} \ No newline at end of file From aa5c328e9bd5ff3209ecc49d3c1519be1fc2fdc9 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sat, 11 Nov 2023 23:08:10 -0500 Subject: [PATCH 06/12] remove dead code --- crates/benchmarks/src/util/lib.rs | 12 ------------ 1 file changed, 12 deletions(-) delete mode 100644 crates/benchmarks/src/util/lib.rs diff --git a/crates/benchmarks/src/util/lib.rs b/crates/benchmarks/src/util/lib.rs deleted file mode 100644 index 1a27500605..0000000000 --- a/crates/benchmarks/src/util/lib.rs +++ /dev/null @@ -1,12 +0,0 @@ - - -struct TestConfig { - data: Box -} - -struct Runner { - setup: u32, - iteration_setup: u32, - iteration_cleanup: u32, - cleanup: u32 -} \ No newline at end of file From f572d9a82b5331103b7c6ceff45c6e60f8411b62 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Mon, 13 Nov 2023 22:37:27 -0500 Subject: [PATCH 07/12] add readme --- crates/benchmarks/README.md | 55 +++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 crates/benchmarks/README.md diff --git a/crates/benchmarks/README.md b/crates/benchmarks/README.md new file mode 100644 index 0000000000..187ed5215e --- /dev/null +++ b/crates/benchmarks/README.md @@ -0,0 +1,55 @@ +# Merge +The merge benchmarks are similar to the ones used by [Delta Spark](https://github.com/delta-io/delta/pull/1835). + + +## Dataset + +Databricks maintains a public S3 bucket of the TPC-DS dataset with various factor where requesters must pay to download this dataset. Below is an example of how to list the 1gb scale factor + +``` +aws s3api list-objects --bucket devrel-delta-datasets --request-payer requester --prefix tpcds-2.13/tpcds_sf1_parquet/web_returns/ +``` + +You can generate the TPC-DS dataset yourself by downloading and compiling [the generator](https://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) +You may need to update the CFLAGS to include `-fcommon` to compile on newer versions of GCC. + +## Commands +These commands can be executed from the root of the benchmark crate. Some commands depend on the existance of the TPC-DS Dataset existing. + +### Convert +Converts a TPC-DS web_returns csv into a Delta table +Assumes the dataset is pipe delimited and records do not have a trailing delimiter + +``` +cargo run --bin merge -- convert data/tpcds/web_returns.dat data/web_returns +``` + +### Standard +Execute the standard merge bench suite. +Results can be saved to a delta table for further analysis. +This table has the following schema: + +group_id: Used to group all tests that executed as a part of this call. Default value is the timestamp of execution +name: The benchmark name that was executed +sample: The iteration number for a given benchmark name +duration_ms: How long the benchmark took in ms +data: Free field to pack any additonal data + +``` +cargo run --bin merge -- standard data/web_returns 1 data/merge_results +``` + +### Compare +Compare the results of two different runs. +The a Delta table paths and the `group_id` of each run and obtain the speedup for each test case + +``` +cargo run --bin merge -- compare data/benchmarks/ 1698636172801 data/benchmarks/ 1699759539902 +``` + +### Show +Show all benchmarks results from a delta table + +``` +cargo run --bin merge -- show data/benchmark +``` From f689db0d31da8763d473a424a18d54edf30b8866 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sat, 18 Nov 2023 23:40:59 -0500 Subject: [PATCH 08/12] remove memory profile code --- crates/benchmarks/src/bin/merge.rs | 132 ++++++++--------------------- 1 file changed, 36 insertions(+), 96 deletions(-) diff --git a/crates/benchmarks/src/bin/merge.rs b/crates/benchmarks/src/bin/merge.rs index 19224503b4..8be7e0fc05 100644 --- a/crates/benchmarks/src/bin/merge.rs +++ b/crates/benchmarks/src/bin/merge.rs @@ -1,5 +1,5 @@ use std::{ - sync::{mpsc::TryRecvError, Arc}, + sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; @@ -24,9 +24,6 @@ use deltalake_core::{ use serde_json::json; use tokio::time::Instant; -//#[global_allocator] -//static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; - /* Convert web_returns dataset from TPC DS's datagen utility into a Delta table This table will be partitioned on `wr_returned_date_sk` */ @@ -356,65 +353,6 @@ struct MergePrefArgs { command: Command, } -struct MemoryStats { - allocated: Vec, - resident: Vec, -} - -#[derive(Debug, Default)] -struct Stats { - // Percentiles - min: usize, - p25: usize, - p50: usize, - p75: usize, - max: usize, - //Other - mean: f64, -} - -impl Stats { - pub fn from(mut series: Vec) -> Stats { - let mut stats = Stats::default(); - let items = series.len(); - if items == 0 { - return stats; - } - - series.sort(); - - stats.min = series[0]; - stats.p25 = series[items / 4]; - stats.p50 = series[items / 2]; - stats.p75 = series[(items * 3) / 4]; - stats.max = series[items - 1]; - - let sum = series.iter().fold(0.0, |acc, x| acc + (*x as f64)); - stats.mean = sum / (items as f64); - - stats - } -} - -impl MemoryStats { - pub fn new() -> Self { - MemoryStats { - allocated: Vec::new(), - resident: Vec::new(), - } - } - - pub fn update(&mut self) { - let epoch = jemalloc_ctl::epoch::mib().unwrap(); - epoch.advance().unwrap(); - let allocated_mib = jemalloc_ctl::stats::allocated::mib().unwrap(); - let resident_mib = jemalloc_ctl::stats::resident::mib().unwrap(); - - self.allocated.push(allocated_mib.read().unwrap()); - self.resident.push(resident_mib.read().unwrap()); - } -} - #[tokio::main] async fn main() { match MergePrefArgs::parse().command { @@ -427,20 +365,6 @@ async fn main() { .unwrap(); } Command::Bench(BenchArg { table_path, name }) => { - /* - let (tx, rx) = std::sync::mpsc::channel(); - let handle = std::thread::spawn(move || { - let mut stats = MemoryStats::new(); - while let Err(TryRecvError::Empty) = rx.try_recv() { - stats.update(); - std::thread::sleep(std::time::Duration::from_secs(1)); - } - - stats.update(); - stats - }); - */ - let (merge_op, params): ( fn(DataFrame, DeltaTable) -> Result, MergePerfParams, @@ -453,16 +377,6 @@ async fn main() { benchmark_merge_tpcds(table_path, params, merge_op) .await .unwrap(); - - /* - tx.send(Some(())).unwrap(); - let stats = handle.join().unwrap(); - let a = Stats::from(stats.allocated); - let r = Stats::from(stats.resident); - - println!("{:?}", a); - println!("{:?}", r); - */ } Command::Standard(Standard { delta_path, @@ -669,25 +583,51 @@ async fn main() { .unwrap(); ctx.register_table("after", Arc::new(after_table)).unwrap(); - - let before_stats = ctx.sql(&format!(" + let before_stats = ctx + .sql(&format!( + " select name as before_name, avg(cast(duration_ms as float)) as before_duration_avg from before where group_id = {} group by name - ", before_group_id)).await.unwrap(); + ", + before_group_id + )) + .await + .unwrap(); - let after_stats = ctx.sql(&format!(" + let after_stats = ctx + .sql(&format!( + " select name as after_name, avg(cast(duration_ms as float)) as after_duration_avg from after where group_id = {} group by name - ", after_group_id)).await.unwrap(); - - before_stats.join(after_stats, datafusion_common::JoinType::Inner, &["before_name"], &["after_name"], None).unwrap() - .select(vec![col("before_name").alias("name"), col("before_duration_avg"), col("after_duration_avg"), (col("after_duration_avg") / (col("before_duration_avg")))]).unwrap() - .show().await.unwrap(); + ", + after_group_id + )) + .await + .unwrap(); + before_stats + .join( + after_stats, + datafusion_common::JoinType::Inner, + &["before_name"], + &["after_name"], + None, + ) + .unwrap() + .select(vec![ + col("before_name").alias("name"), + col("before_duration_avg"), + col("after_duration_avg"), + (col("after_duration_avg") / (col("before_duration_avg"))), + ]) + .unwrap() + .show() + .await + .unwrap(); } Command::Show(Show { path }) => { let stats = DeltaTableBuilder::from_uri(path).load().await.unwrap(); From 45971e8ea2b1dec220bcf8716ba7ce9b53256660 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sat, 18 Nov 2023 23:43:47 -0500 Subject: [PATCH 09/12] restore files to main --- crates/deltalake-core/tests/integration_datafusion.rs | 1 - crates/deltalake-sql/src/planner.rs | 5 ----- 2 files changed, 6 deletions(-) diff --git a/crates/deltalake-core/tests/integration_datafusion.rs b/crates/deltalake-core/tests/integration_datafusion.rs index d400faf3a6..a412ce6417 100644 --- a/crates/deltalake-core/tests/integration_datafusion.rs +++ b/crates/deltalake-core/tests/integration_datafusion.rs @@ -46,7 +46,6 @@ use std::error::Error; mod common; mod local { - use datafusion::common::stats::Precision; use datafusion::common::stats::Precision; use deltalake_core::writer::JsonWriter; diff --git a/crates/deltalake-sql/src/planner.rs b/crates/deltalake-sql/src/planner.rs index e1f78eb8fc..bf07825d4b 100644 --- a/crates/deltalake-sql/src/planner.rs +++ b/crates/deltalake-sql/src/planner.rs @@ -96,7 +96,6 @@ mod tests { use datafusion_expr::logical_plan::builder::LogicalTableSource; use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource}; use datafusion_sql::TableReference; - use datafusion_common::Result as DataFusionResult; use crate::parser::DeltaParser; @@ -139,10 +138,6 @@ mod tests { } } - fn get_table_source(&self, _name: TableReference) -> DataFusionResult> { - unimplemented!("TODO") - } - fn get_function_meta(&self, _name: &str) -> Option> { None } From 3562c48367596341d06ebf0da1a750a52483e083 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sat, 18 Nov 2023 23:49:18 -0500 Subject: [PATCH 10/12] remove jemalloc dep --- crates/benchmarks/Cargo.toml | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/benchmarks/Cargo.toml b/crates/benchmarks/Cargo.toml index 334a80f407..8e2e94c62b 100644 --- a/crates/benchmarks/Cargo.toml +++ b/crates/benchmarks/Cargo.toml @@ -13,8 +13,6 @@ clap = { version = "4", features = [ "derive" ] } chrono = { version = "0.4.31", default-features = false, features = ["clock"] } tokio = { version = "1", features = ["fs", "macros", "rt", "io-util"] } env_logger = "0" -jemallocator = "0.5.4" -jemalloc-ctl = "0.5.4" # arrow arrow = { workspace = true } From 912a6d7eb166eabe1649f3b4f228822cfadd06a3 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sun, 19 Nov 2023 12:35:23 -0500 Subject: [PATCH 11/12] use release & sort results --- crates/benchmarks/README.md | 8 ++++---- crates/benchmarks/src/bin/merge.rs | 4 +++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/crates/benchmarks/README.md b/crates/benchmarks/README.md index 187ed5215e..c5d6b0b920 100644 --- a/crates/benchmarks/README.md +++ b/crates/benchmarks/README.md @@ -21,7 +21,7 @@ Converts a TPC-DS web_returns csv into a Delta table Assumes the dataset is pipe delimited and records do not have a trailing delimiter ``` -cargo run --bin merge -- convert data/tpcds/web_returns.dat data/web_returns + cargo run --release --bin merge -- convert data/tpcds/web_returns.dat data/web_returns ``` ### Standard @@ -36,7 +36,7 @@ duration_ms: How long the benchmark took in ms data: Free field to pack any additonal data ``` -cargo run --bin merge -- standard data/web_returns 1 data/merge_results + cargo run --release --bin merge -- standard data/web_returns 1 data/merge_results ``` ### Compare @@ -44,12 +44,12 @@ Compare the results of two different runs. The a Delta table paths and the `group_id` of each run and obtain the speedup for each test case ``` -cargo run --bin merge -- compare data/benchmarks/ 1698636172801 data/benchmarks/ 1699759539902 + cargo run --release --bin merge -- compare data/benchmarks/ 1698636172801 data/benchmarks/ 1699759539902 ``` ### Show Show all benchmarks results from a delta table ``` -cargo run --bin merge -- show data/benchmark + cargo run --release --bin merge -- show data/benchmark ``` diff --git a/crates/benchmarks/src/bin/merge.rs b/crates/benchmarks/src/bin/merge.rs index 8be7e0fc05..5afa3e6f35 100644 --- a/crates/benchmarks/src/bin/merge.rs +++ b/crates/benchmarks/src/bin/merge.rs @@ -622,9 +622,11 @@ async fn main() { col("before_name").alias("name"), col("before_duration_avg"), col("after_duration_avg"), - (col("after_duration_avg") / (col("before_duration_avg"))), + (col("before_duration_avg") / (col("after_duration_avg"))), ]) .unwrap() + .sort(vec![col("name").sort(true, true)]) + .unwrap() .show() .await .unwrap(); From 55b1d1cdf0e74f504e036c679815feaa4725a3ea Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sun, 19 Nov 2023 18:47:46 -0500 Subject: [PATCH 12/12] update author --- crates/benchmarks/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/benchmarks/Cargo.toml b/crates/benchmarks/Cargo.toml index 8e2e94c62b..76bcc8a312 100644 --- a/crates/benchmarks/Cargo.toml +++ b/crates/benchmarks/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "delta-benchmarks" version = "0.0.1" -authors = ["Qingping Hou "] +authors = ["David Blajda "] homepage = "https://github.com/delta-io/delta.rs" license = "Apache-2.0" keywords = ["deltalake", "delta", "datalake"]