diff --git a/crates/benchmarks/src/bin/merge.rs b/crates/benchmarks/src/bin/merge.rs index d174154e38..2f68fdc398 100644 --- a/crates/benchmarks/src/bin/merge.rs +++ b/crates/benchmarks/src/bin/merge.rs @@ -193,7 +193,7 @@ async fn benchmark_merge_tpcds( merge: fn(DataFrame, DeltaTable) -> Result, ) -> Result<(core::time::Duration, MergeMetrics), DataFusionError> { let table = DeltaTableBuilder::from_uri(path).load().await?; - let file_count = table.snapshot()?.files()?.len(); + let file_count = table.snapshot()?.file_actions()?.len(); let provider = DeltaTableProvider::try_new( table.snapshot()?.clone(), diff --git a/crates/deltalake-core/src/delta_datafusion/mod.rs b/crates/deltalake-core/src/delta_datafusion/mod.rs index 911c845943..85d6a4df34 100644 --- a/crates/deltalake-core/src/delta_datafusion/mod.rs +++ b/crates/deltalake-core/src/delta_datafusion/mod.rs @@ -334,7 +334,7 @@ impl<'a> DeltaScanBuilder<'a> { PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?; let files_to_prune = pruning_predicate.prune(self.snapshot)?; self.snapshot - .files()? + .file_actions()? .iter() .zip(files_to_prune.into_iter()) .filter_map( @@ -348,7 +348,7 @@ impl<'a> DeltaScanBuilder<'a> { ) .collect() } else { - self.snapshot.files()? + self.snapshot.file_actions()? } } }; @@ -1186,7 +1186,7 @@ pub(crate) async fn find_files_scan<'a>( expression: Expr, ) -> DeltaResult> { let candidate_map: HashMap = snapshot - .files()? + .file_actions()? .iter() .map(|add| (add.path.clone(), add.to_owned())) .collect(); @@ -1246,7 +1246,7 @@ pub(crate) async fn scan_memory_table( snapshot: &DeltaTableState, predicate: &Expr, ) -> DeltaResult> { - let actions = snapshot.files()?; + let actions = snapshot.file_actions()?; let batch = snapshot.add_actions_table(true)?; let mut arrays = Vec::new(); @@ -1334,7 +1334,7 @@ pub async fn find_files<'a>( } } None => Ok(FindFiles { - candidates: snapshot.files()?, + candidates: snapshot.file_actions()?, partition_scan: true, }), } diff --git a/crates/deltalake-core/src/kernel/arrow/mod.rs b/crates/deltalake-core/src/kernel/arrow/mod.rs index 3cf01700f6..a8bac82fe1 100644 --- a/crates/deltalake-core/src/kernel/arrow/mod.rs +++ b/crates/deltalake-core/src/kernel/arrow/mod.rs @@ -203,7 +203,7 @@ impl TryFrom<&ArrowDataType> for DataType { ArrowDataType::UInt64 => Ok(DataType::Primitive(PrimitiveType::Long)), // undocumented type ArrowDataType::UInt32 => Ok(DataType::Primitive(PrimitiveType::Integer)), ArrowDataType::UInt16 => Ok(DataType::Primitive(PrimitiveType::Short)), - ArrowDataType::UInt8 => Ok(DataType::Primitive(PrimitiveType::Boolean)), + ArrowDataType::UInt8 => Ok(DataType::Primitive(PrimitiveType::Byte)), ArrowDataType::Float32 => Ok(DataType::Primitive(PrimitiveType::Float)), ArrowDataType::Float64 => Ok(DataType::Primitive(PrimitiveType::Double)), ArrowDataType::Boolean => Ok(DataType::Primitive(PrimitiveType::Boolean)), diff --git a/crates/deltalake-core/src/kernel/client/expressions.rs b/crates/deltalake-core/src/kernel/client/expressions.rs index 8a52c97f59..740b46f8d2 100644 --- a/crates/deltalake-core/src/kernel/client/expressions.rs +++ b/crates/deltalake-core/src/kernel/client/expressions.rs @@ -12,7 +12,7 @@ use arrow_array::{ StructArray, TimestampMicrosecondArray, }; use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq}; -use arrow_schema::{ArrowError, Schema as ArrowSchema}; +use arrow_schema::{ArrowError, Field as ArrowField, Schema as ArrowSchema}; use arrow_select::nullif::nullif; use crate::kernel::error::{DeltaResult, Error}; @@ -78,6 +78,21 @@ impl Scalar { DataType::Map { .. } => unimplemented!(), DataType::Struct { .. } => unimplemented!(), }, + Struct(values, fields) => { + let mut columns = Vec::with_capacity(values.len()); + for val in values { + columns.push(val.to_array(num_rows)?); + } + Arc::new(StructArray::try_new( + fields + .iter() + .map(TryInto::::try_into) + .collect::, _>>()? + .into(), + columns, + None, + )?) + } }; Ok(arr) } diff --git a/crates/deltalake-core/src/kernel/expressions/mod.rs b/crates/deltalake-core/src/kernel/expressions/mod.rs index e043c0f28c..795e3a1b6d 100644 --- a/crates/deltalake-core/src/kernel/expressions/mod.rs +++ b/crates/deltalake-core/src/kernel/expressions/mod.rs @@ -5,7 +5,7 @@ use std::fmt::{Display, Formatter}; use itertools::Itertools; -use self::scalars::Scalar; +pub use self::scalars::*; pub mod scalars; diff --git a/crates/deltalake-core/src/kernel/expressions/scalars.rs b/crates/deltalake-core/src/kernel/expressions/scalars.rs index 1445e9dee5..94673cf377 100644 --- a/crates/deltalake-core/src/kernel/expressions/scalars.rs +++ b/crates/deltalake-core/src/kernel/expressions/scalars.rs @@ -3,10 +3,14 @@ use std::cmp::Ordering; use std::fmt::{Display, Formatter}; +use arrow_array::Array; +use arrow_schema::TimeUnit; use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Utc}; +use object_store::path::Path; use crate::kernel::schema::{DataType, PrimitiveType}; -use crate::kernel::Error; +use crate::kernel::{Error, StructField}; +use crate::NULL_PARTITION_VALUE_DATA_PATH; /// A single value, which can be null. Used for representing literal values /// in [Expressions][crate::expressions::Expression]. @@ -38,6 +42,8 @@ pub enum Scalar { Decimal(i128, u8, i8), /// Null value with a given data type. Null(DataType), + /// Struct value + Struct(Vec, Vec), } impl Scalar { @@ -57,9 +63,15 @@ impl Scalar { Self::Binary(_) => DataType::Primitive(PrimitiveType::Binary), Self::Decimal(_, precision, scale) => DataType::decimal(*precision, *scale), Self::Null(data_type) => data_type.clone(), + Self::Struct(_, fields) => DataType::struct_type(fields.clone()), } } + /// Returns true if this scalar is null. + pub fn is_null(&self) -> bool { + matches!(self, Self::Null(_)) + } + /// Serializes this scalar as a string. pub fn serialize(&self) -> String { match self { @@ -78,7 +90,7 @@ impl Scalar { } } Self::Timestamp(ts) => { - let ts = Utc.timestamp_millis_opt(*ts).single().unwrap(); + let ts = Utc.timestamp_micros(*ts).single().unwrap(); ts.format("%Y-%m-%d %H:%M:%S%.6f").to_string() } Self::Date(days) => { @@ -109,8 +121,178 @@ impl Scalar { s } }, + Self::Binary(val) => create_escaped_binary_string(val.as_slice()), Self::Null(_) => "null".to_string(), - _ => todo!(), + Self::Struct(_, _) => todo!("serializing struct values is not yet supported"), + } + } + + /// Serializes this scalar as a string for use in hive partition file names. + pub fn serialize_encoded(&self) -> String { + if self.is_null() { + return NULL_PARTITION_VALUE_DATA_PATH.to_string(); + } + Path::from(self.serialize()).to_string() + } + + /// Create a [`Scalar`] form a row in an arrow array. + pub fn from_array(arr: &dyn Array, index: usize) -> Option { + use arrow_array::*; + use arrow_schema::DataType::*; + + if arr.len() <= index { + return None; + } + if arr.is_null(index) { + return Some(Self::Null(arr.data_type().try_into().ok()?)); + } + + match arr.data_type() { + Utf8 => arr + .as_any() + .downcast_ref::() + .map(|v| Self::String(v.value(index).to_string())), + LargeUtf8 => arr + .as_any() + .downcast_ref::() + .map(|v| Self::String(v.value(index).to_string())), + Boolean => arr + .as_any() + .downcast_ref::() + .map(|v| Self::Boolean(v.value(index))), + Binary => arr + .as_any() + .downcast_ref::() + .map(|v| Self::Binary(v.value(index).to_vec())), + LargeBinary => arr + .as_any() + .downcast_ref::() + .map(|v| Self::Binary(v.value(index).to_vec())), + FixedSizeBinary(_) => arr + .as_any() + .downcast_ref::() + .map(|v| Self::Binary(v.value(index).to_vec())), + Int8 => arr + .as_any() + .downcast_ref::() + .map(|v| Self::Byte(v.value(index))), + Int16 => arr + .as_any() + .downcast_ref::() + .map(|v| Self::Short(v.value(index))), + Int32 => arr + .as_any() + .downcast_ref::() + .map(|v| Self::Integer(v.value(index))), + Int64 => arr + .as_any() + .downcast_ref::() + .map(|v| Self::Long(v.value(index))), + UInt8 => arr + .as_any() + .downcast_ref::() + .map(|v| Self::Byte(v.value(index) as i8)), + UInt16 => arr + .as_any() + .downcast_ref::() + .map(|v| Self::Short(v.value(index) as i16)), + UInt32 => arr + .as_any() + .downcast_ref::() + .map(|v| Self::Integer(v.value(index) as i32)), + UInt64 => arr + .as_any() + .downcast_ref::() + .map(|v| Self::Long(v.value(index) as i64)), + Float32 => arr + .as_any() + .downcast_ref::() + .map(|v| Self::Float(v.value(index))), + Float64 => arr + .as_any() + .downcast_ref::() + .map(|v| Self::Double(v.value(index))), + Decimal128(precision, scale) => { + arr.as_any().downcast_ref::().map(|v| { + let value = v.value(index); + Self::Decimal(value, *precision, *scale) + }) + } + Date32 => arr + .as_any() + .downcast_ref::() + .map(|v| Self::Date(v.value(index))), + // TODO handle timezones when implementing timestamp ntz feature. + Timestamp(TimeUnit::Microsecond, None) => arr + .as_any() + .downcast_ref::() + .map(|v| Self::Timestamp(v.value(index))), + Struct(fields) => { + let struct_fields = fields + .iter() + .flat_map(|f| TryFrom::try_from(f.as_ref())) + .collect::>(); + let values = arr + .as_any() + .downcast_ref::() + .and_then(|struct_arr| { + struct_fields + .iter() + .map(|f: &StructField| { + struct_arr + .column_by_name(f.name()) + .and_then(|c| Self::from_array(c.as_ref(), index)) + }) + .collect::>>() + })?; + if struct_fields.len() != values.len() { + return None; + } + Some(Self::Struct(values, struct_fields)) + } + Float16 + | Decimal256(_, _) + | List(_) + | LargeList(_) + | FixedSizeList(_, _) + | Map(_, _) + | Date64 + | Timestamp(_, _) + | Time32(_) + | Time64(_) + | Duration(_) + | Interval(_) + | Dictionary(_, _) + | RunEndEncoded(_, _) + | Union(_, _) + | Null => None, + } + } +} + +impl PartialOrd for Scalar { + fn partial_cmp(&self, other: &Self) -> Option { + use Scalar::*; + match (self, other) { + (Null(_), Null(_)) => Some(Ordering::Equal), + (Integer(a), Integer(b)) => a.partial_cmp(b), + (Long(a), Long(b)) => a.partial_cmp(b), + (Short(a), Short(b)) => a.partial_cmp(b), + (Byte(a), Byte(b)) => a.partial_cmp(b), + (Float(a), Float(b)) => a.partial_cmp(b), + (Double(a), Double(b)) => a.partial_cmp(b), + (String(a), String(b)) => a.partial_cmp(b), + (Boolean(a), Boolean(b)) => a.partial_cmp(b), + (Timestamp(a), Timestamp(b)) => a.partial_cmp(b), + (Date(a), Date(b)) => a.partial_cmp(b), + (Binary(a), Binary(b)) => a.partial_cmp(b), + (Decimal(a, _, _), Decimal(b, _, _)) => a.partial_cmp(b), + (Struct(a, _), Struct(b, _)) => a.partial_cmp(b), + // TODO should we make an assumption about the ordering of nulls? + // rigth now this is only used for internal purposes. + (Null(_), _) => Some(Ordering::Less), + (_, Null(_)) => Some(Ordering::Greater), + _ => None, } } } @@ -153,6 +335,16 @@ impl Display for Scalar { } }, Self::Null(_) => write!(f, "null"), + Self::Struct(values, fields) => { + write!(f, "{{")?; + for (i, (value, field)) in values.iter().zip(fields.iter()).enumerate() { + if i > 0 { + write!(f, ", ")?; + } + write!(f, "{}: {}", field.name, value)?; + } + write!(f, "}}") + } } } } @@ -202,7 +394,7 @@ impl PrimitiveType { static ref UNIX_EPOCH: DateTime = DateTime::from_timestamp(0, 0).unwrap(); } - if raw.is_empty() { + if raw.is_empty() || raw == NULL_PARTITION_VALUE_DATA_PATH { return Ok(Scalar::Null(self.data_type())); } @@ -242,7 +434,11 @@ impl PrimitiveType { .ok_or(self.parse_error(raw))?; Ok(Scalar::Timestamp(micros)) } - _ => todo!(), + Binary => { + let bytes = parse_escaped_binary_string(raw).map_err(|_| self.parse_error(raw))?; + Ok(Scalar::Binary(bytes)) + } + _ => todo!("parsing {:?} is not yet supported", self), } } @@ -262,10 +458,66 @@ impl PrimitiveType { } } +fn create_escaped_binary_string(data: &[u8]) -> String { + let mut escaped_string = String::new(); + for &byte in data { + // Convert each byte to its two-digit hexadecimal representation + let hex_representation = format!("{:04X}", byte); + // Append the hexadecimal representation with an escape sequence + escaped_string.push_str("\\u"); + escaped_string.push_str(&hex_representation); + } + escaped_string +} + +fn parse_escaped_binary_string(escaped_string: &str) -> Result, &'static str> { + let mut parsed_bytes = Vec::new(); + let mut chars = escaped_string.chars(); + + while let Some(ch) = chars.next() { + if ch == '\\' { + // Check for the escape sequence "\\u" indicating a hexadecimal value + if chars.next() == Some('u') { + // Read two hexadecimal digits and convert to u8 + if let (Some(digit1), Some(digit2), Some(digit3), Some(digit4)) = + (chars.next(), chars.next(), chars.next(), chars.next()) + { + if let Ok(byte) = + u8::from_str_radix(&format!("{}{}{}{}", digit1, digit2, digit3, digit4), 16) + { + parsed_bytes.push(byte); + } else { + return Err("Error parsing hexadecimal value"); + } + } else { + return Err("Incomplete escape sequence"); + } + } else { + // Unrecognized escape sequence + return Err("Unrecognized escape sequence"); + } + } else { + // Regular character, convert to u8 and push into the result vector + parsed_bytes.push(ch as u8); + } + } + + Ok(parsed_bytes) +} + #[cfg(test)] mod tests { use super::*; + #[test] + fn test_binary_roundtrip() { + let scalar = Scalar::Binary(vec![0, 1, 2, 3, 4, 5]); + let parsed = PrimitiveType::Binary + .parse_scalar(&scalar.serialize()) + .unwrap(); + assert_eq!(scalar, parsed); + } + #[test] fn test_decimal_display() { let s = Scalar::Decimal(123456789, 9, 2); diff --git a/crates/deltalake-core/src/kernel/schema.rs b/crates/deltalake-core/src/kernel/schema.rs index 2a427469d7..c7c7f3cdb4 100644 --- a/crates/deltalake-core/src/kernel/schema.rs +++ b/crates/deltalake-core/src/kernel/schema.rs @@ -601,6 +601,10 @@ impl DataType { pub fn decimal(precision: u8, scale: i8) -> Self { DataType::Primitive(PrimitiveType::Decimal(precision, scale)) } + + pub fn struct_type(fields: Vec) -> Self { + DataType::Struct(Box::new(StructType::new(fields))) + } } impl Display for DataType { diff --git a/crates/deltalake-core/src/kernel/snapshot/json.rs b/crates/deltalake-core/src/kernel/snapshot/json.rs index 59c7169ac7..53fe8c8fdc 100644 --- a/crates/deltalake-core/src/kernel/snapshot/json.rs +++ b/crates/deltalake-core/src/kernel/snapshot/json.rs @@ -32,6 +32,9 @@ fn insert_nulls( Ok(()) } +/// Parse an array of JSON strings into a record batch. +/// +/// Null values in the input array are preseverd in the output record batch. pub(super) fn parse_json( json_strings: &StringArray, output_schema: ArrowSchemaRef, @@ -50,7 +53,7 @@ pub(super) fn parse_json( if json_strings.is_null(it) { if value_count > 0 { let slice = json_strings.slice(value_start, value_count); - let batch = read_from_json(&mut decoder, get_reader(slice.value_data())) + let batch = decode_reader(&mut decoder, get_reader(slice.value_data())) .collect::, _>>()?; batches.extend(batch); value_count = 0; @@ -74,7 +77,7 @@ pub(super) fn parse_json( if value_count > 0 { let slice = json_strings.slice(value_start, value_count); - let batch = read_from_json(&mut decoder, get_reader(slice.value_data())) + let batch = decode_reader(&mut decoder, get_reader(slice.value_data())) .collect::, _>>()?; batches.extend(batch); } @@ -82,6 +85,7 @@ pub(super) fn parse_json( Ok(concat_batches(&output_schema, &batches)?) } +/// Decode a stream of bytes into a stream of record batches. pub(super) fn decode_stream> + Unpin>( mut decoder: Decoder, mut input: S, @@ -111,7 +115,8 @@ pub(super) fn decode_stream> + Unpin>( }) } -pub(super) fn read_from_json<'a, R: BufRead + 'a>( +/// Decode data prvided by a reader into an iterator of record batches. +pub(super) fn decode_reader<'a, R: BufRead + 'a>( decoder: &'a mut Decoder, mut reader: R, ) -> impl Iterator> + '_ { diff --git a/crates/deltalake-core/src/kernel/snapshot/log_data.rs b/crates/deltalake-core/src/kernel/snapshot/log_data.rs index 82d7dffb85..8ecceeddcc 100644 --- a/crates/deltalake-core/src/kernel/snapshot/log_data.rs +++ b/crates/deltalake-core/src/kernel/snapshot/log_data.rs @@ -1,25 +1,223 @@ -use std::collections::HashMap; +use std::borrow::Cow; +use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; use arrow_array::{Array, Int64Array, MapArray, RecordBatch, StringArray, StructArray}; +use chrono::{NaiveDateTime, TimeZone, Utc}; +use object_store::path::Path; +use object_store::ObjectMeta; +use percent_encoding::percent_decode_str; use super::extract::extract_and_cast; -use crate::kernel::scalars::Scalar; -use crate::kernel::{DataType, StructField, StructType}; -use crate::DeltaTableError; -use crate::{kernel::Metadata, DeltaResult}; +use crate::kernel::{DataType, Metadata, Scalar, StructField, StructType}; +use crate::{DeltaResult, DeltaTableError}; +const COL_NUM_RECORDS: &str = "numRecords"; +const COL_MIN_VALUES: &str = "minValues"; +const COL_MAX_VALUES: &str = "maxValues"; +const COL_NULL_COUNT: &str = "nullCount"; + +pub(crate) type PartitionFields<'a> = Arc>; +pub(crate) type PartitionValues<'a> = BTreeMap<&'a str, Scalar>; + +pub(crate) trait PartitionsExt { + fn hive_partition_path(&self) -> String; +} + +impl PartitionsExt for BTreeMap<&str, Scalar> { + fn hive_partition_path(&self) -> String { + let mut fields = self + .iter() + .map(|(k, v)| { + let encoded = v.serialize_encoded(); + format!("{k}={encoded}") + }) + .collect::>(); + fields.reverse(); + fields.join("/") + } +} + +impl PartitionsExt for BTreeMap { + fn hive_partition_path(&self) -> String { + let mut fields = self + .iter() + .map(|(k, v)| { + let encoded = v.serialize_encoded(); + format!("{k}={encoded}") + }) + .collect::>(); + fields.reverse(); + fields.join("/") + } +} + +impl PartitionsExt for Arc { + fn hive_partition_path(&self) -> String { + self.as_ref().hive_partition_path() + } +} + +/// A view into the log data for a single logical file. #[derive(Debug, PartialEq)] pub struct FileStats<'a> { - path: &'a str, - size: i64, - partition_values: HashMap<&'a str, Option>, - stats: StructArray, + path: &'a StringArray, + size: &'a Int64Array, + modification_time: &'a Int64Array, + partition_values: &'a MapArray, + partition_fields: PartitionFields<'a>, + stats: &'a StructArray, + index: usize, +} + +impl FileStats<'_> { + /// Path to the files storage location. + pub fn path(&self) -> Cow<'_, str> { + percent_decode_str(self.path.value(self.index)) + .decode_utf8() + .unwrap() + } + + /// an object store [`Path`] to the file. + /// + /// this tries to parse the file string and if that fails, it will return the string as is. + // TODO assert consisent handling of the paths encoding when reading log data so this logic can be removed. + pub fn object_store_path(&self) -> Path { + let path = self.path(); + // Try to preserve percent encoding if possible + match Path::parse(path.as_ref()) { + Ok(path) => path, + Err(_) => Path::from(path.as_ref()), + } + } + + /// File size stored on disk. + pub fn size(&self) -> i64 { + self.size.value(self.index) + } + + /// Last modification time of the file. + pub fn modification_time(&self) -> i64 { + self.modification_time.value(self.index) + } + + /// Datetime of the last modification time of the file. + pub fn modification_datetime(&self) -> DeltaResult> { + Ok(Utc.from_utc_datetime( + &NaiveDateTime::from_timestamp_millis(self.modification_time()).ok_or( + DeltaTableError::from(crate::protocol::ProtocolError::InvalidField(format!( + "invalid modification_time: {:?}", + self.modification_time() + ))), + )?, + )) + } + + /// The partition values for this logical file. + // TODO make this fallible + pub fn partition_values(&self) -> DeltaResult> { + if self.partition_fields.is_empty() { + return Ok(BTreeMap::new()); + } + let map_value = self.partition_values.value(self.index); + let keys = map_value + .column(0) + .as_any() + .downcast_ref::() + .ok_or(DeltaTableError::Generic("()".into()))?; + let values = map_value + .column(1) + .as_any() + .downcast_ref::() + .ok_or(DeltaTableError::Generic("()".into()))?; + + let values = keys + .iter() + .zip(values.iter()) + .map(|(k, v)| { + let (key, field) = self.partition_fields.get_key_value(k.unwrap()).unwrap(); + let field_type = match field.data_type() { + DataType::Primitive(p) => Ok(p), + _ => Err(DeltaTableError::Generic( + "nested partitioning values are not supported".to_string(), + )), + }?; + Ok(( + *key, + v.map(|vv| field_type.parse_scalar(vv)) + .transpose()? + .unwrap_or(Scalar::Null(field.data_type().clone())), + )) + }) + .collect::>>()?; + + // NOTE: we recreate the map as a BTreeMap to ensure the order of the keys is consistently + // the same as the order of partition fields. + self.partition_fields + .iter() + .map(|(k, f)| { + let val = values + .get(*k) + .cloned() + .unwrap_or(Scalar::Null(f.data_type.clone())); + Ok((*k, val)) + }) + .collect::>>() + } + + /// The number of records stored in the data file. + pub fn num_records(&self) -> Option { + self.stats + .column_by_name(COL_NUM_RECORDS) + .and_then(|c| c.as_any().downcast_ref::()) + .map(|a| a.value(self.index) as usize) + } + + /// Struct containing all available null counts for the columns in this file. + pub fn null_counts(&self) -> Option { + self.stats + .column_by_name(COL_NULL_COUNT) + .and_then(|c| Scalar::from_array(c.as_ref(), self.index)) + } + + /// Struct containing all available min values for the columns in this file. + pub fn min_values(&self) -> Option { + self.stats + .column_by_name(COL_MIN_VALUES) + .and_then(|c| Scalar::from_array(c.as_ref(), self.index)) + } + + /// Struct containing all available max values for the columns in this file. + pub fn max_values(&self) -> Option { + self.stats + .column_by_name(COL_MAX_VALUES) + .and_then(|c| Scalar::from_array(c.as_ref(), self.index)) + } +} + +impl<'a> TryFrom<&FileStats<'a>> for ObjectMeta { + type Error = DeltaTableError; + + fn try_from(file_stats: &FileStats<'a>) -> Result { + let location = file_stats.object_store_path(); + let size = file_stats.size() as usize; + let last_modified = file_stats.modification_datetime()?; + Ok(ObjectMeta { + location, + size, + last_modified, + version: None, + e_tag: None, + }) + } } +/// Helper for processing data from the materialized Delta log. pub struct FileStatsAccessor<'a> { - partition_fields: HashMap<&'a str, &'a StructField>, + partition_fields: PartitionFields<'a>, paths: &'a StringArray, sizes: &'a Int64Array, + modification_times: &'a Int64Array, stats: &'a StructArray, partition_values: &'a MapArray, length: usize, @@ -34,17 +232,21 @@ impl<'a> FileStatsAccessor<'a> { ) -> DeltaResult { let paths = extract_and_cast::(data, "add.path")?; let sizes = extract_and_cast::(data, "add.size")?; + let modification_times = extract_and_cast::(data, "add.modificationTime")?; let stats = extract_and_cast::(data, "add.stats_parsed")?; let partition_values = extract_and_cast::(data, "add.partitionValues")?; - let partition_fields = metadata - .partition_columns - .iter() - .map(|c| Ok::<_, DeltaTableError>((c.as_str(), schema.field_with_name(c.as_str())?))) - .collect::, _>>()?; + let partition_fields = Arc::new( + metadata + .partition_columns + .iter() + .map(|c| Ok((c.as_str(), schema.field_with_name(c.as_str())?))) + .collect::>>()?, + ); Ok(Self { partition_fields, paths, sizes, + modification_times, stats, partition_values, length: data.num_rows(), @@ -52,70 +254,50 @@ impl<'a> FileStatsAccessor<'a> { }) } - fn get_partition_values(&self, index: usize) -> DeltaResult>> { - let map_value = self.partition_values.value(index); - let keys = map_value - .column(0) - .as_any() - .downcast_ref::() - .ok_or(DeltaTableError::Generic("unexpected key type".into()))?; - let values = map_value - .column(1) - .as_any() - .downcast_ref::() - .ok_or(DeltaTableError::Generic("unexpected value type".into()))?; - keys.iter() - .zip(values.iter()) - .map(|(k, v)| { - let (key, field) = self.partition_fields.get_key_value(k.unwrap()).unwrap(); - let field_type = match field.data_type() { - DataType::Primitive(p) => p, - _ => todo!(), - }; - Ok((*key, v.and_then(|vv| field_type.parse_scalar(vv).ok()))) - }) - .collect::, _>>() - } - pub(crate) fn get(&self, index: usize) -> DeltaResult> { - let path = self.paths.value(index); - let size = self.sizes.value(index); - let stats = self.stats.slice(index, 1); - let partition_values = self.get_partition_values(index)?; + if index >= self.length { + return Err(DeltaTableError::Generic(format!( + "index out of bounds: {} >= {}", + index, self.length + ))); + } Ok(FileStats { - path, - size, - partition_values, - stats, + path: self.paths, + size: self.sizes, + modification_time: self.modification_times, + partition_values: self.partition_values, + partition_fields: self.partition_fields.clone(), + stats: self.stats, + index, }) } } impl<'a> Iterator for FileStatsAccessor<'a> { - type Item = DeltaResult>; + type Item = FileStats<'a>; fn next(&mut self) -> Option { if self.pointer >= self.length { return None; } - - let file_stats = self.get(self.pointer); - if file_stats.is_err() { - return Some(Err(file_stats.unwrap_err())); - } - + // Safety: we know that the pointer is within bounds + let file_stats = self.get(self.pointer).unwrap(); self.pointer += 1; Some(file_stats) } } -pub struct FileStatsHandler<'a> { +/// Provides semanitc access to the log data. +/// +/// This is a helper struct that provides access to the log data in a more semantic way +/// to avid the necessiity of knowing the exact layout of the underlying log data. +pub struct LogDataHandler<'a> { data: &'a Vec, metadata: &'a Metadata, schema: &'a StructType, } -impl<'a> FileStatsHandler<'a> { +impl<'a> LogDataHandler<'a> { pub(crate) fn new( data: &'a Vec, metadata: &'a Metadata, @@ -129,8 +311,8 @@ impl<'a> FileStatsHandler<'a> { } } -impl<'a> IntoIterator for FileStatsHandler<'a> { - type Item = DeltaResult>; +impl<'a> IntoIterator for LogDataHandler<'a> { + type Item = FileStats<'a>; type IntoIter = Box + 'a>; fn into_iter(self) -> Self::IntoIter { @@ -243,7 +425,7 @@ mod datafusion { } fn num_records(&self) -> Precision { - self.collect_count("numRecords") + self.collect_count(COL_NUM_RECORDS) } fn total_size_files(&self) -> Precision { @@ -256,10 +438,11 @@ mod datafusion { } fn column_stats(&self, name: impl AsRef) -> DeltaResult { - let null_count_col = format!("nullCount.{}", name.as_ref()); + let null_count_col = format!("{COL_NULL_COUNT}.{}", name.as_ref()); let null_count = self.collect_count(&null_count_col); - let min_value = self.column_bounds("minValues", name.as_ref(), &AggregateFunction::Min); + let min_value = + self.column_bounds(COL_MIN_VALUES, name.as_ref(), &AggregateFunction::Min); let min_value = match &min_value { Precision::Exact(value) if value.is_null() => Precision::Absent, // TODO this is a hack, we should not be casting here but rather when we read the checkpoint data. @@ -270,7 +453,8 @@ mod datafusion { _ => min_value, }; - let max_value = self.column_bounds("maxValues", name.as_ref(), &AggregateFunction::Max); + let max_value = + self.column_bounds(COL_MAX_VALUES, name.as_ref(), &AggregateFunction::Max); let max_value = match &max_value { Precision::Exact(value) if value.is_null() => Precision::Absent, Precision::Exact(ScalarValue::TimestampNanosecond(a, b)) => Precision::Exact( @@ -303,7 +487,7 @@ mod datafusion { } } - impl FileStatsHandler<'_> { + impl LogDataHandler<'_> { fn num_records(&self) -> Precision { self.data .iter() @@ -362,7 +546,6 @@ mod datafusion { #[cfg(all(test, feature = "datafusion"))] mod tests { - use arrow_array::Array; #[tokio::test] async fn read_delta_1_2_1_struct_stats_table() { @@ -371,27 +554,42 @@ mod tests { let table_from_json_stats = crate::open_table_with_version(table_uri, 1).await.unwrap(); let json_action = table_from_json_stats - .snapshot() - .unwrap() - .snapshot - .file_stats_iter() - .find(|f| matches!(f, Ok(f) if f.path.ends_with("part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet"))).unwrap().unwrap(); + .snapshot() + .unwrap() + .snapshot + .file_stats() + .find(|f| { + f.path().ends_with( + "part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet", + ) + }) + .unwrap(); let struct_action = table_from_struct_stats - .snapshot() - .unwrap() - .snapshot - .file_stats_iter() - .find(|f| matches!(f, Ok(f) if f.path.ends_with("part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet"))).unwrap().unwrap(); - - assert_eq!(json_action.path, struct_action.path); - assert_eq!(json_action.partition_values, struct_action.partition_values); - assert_eq!(json_action.stats.len(), 1); - assert!(json_action - .stats - .column(0) - .eq(struct_action.stats.column(0))); - assert_eq!(json_action.stats.len(), struct_action.stats.len()); + .snapshot() + .unwrap() + .snapshot + .file_stats() + .find(|f| { + f.path().ends_with( + "part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet", + ) + }) + .unwrap(); + + assert_eq!(json_action.path(), struct_action.path()); + assert_eq!( + json_action.partition_values().unwrap(), + struct_action.partition_values().unwrap() + ); + // assert_eq!( + // json_action.max_values().unwrap(), + // struct_action.max_values().unwrap() + // ); + // assert_eq!( + // json_action.min_values().unwrap(), + // struct_action.min_values().unwrap() + // ); } #[tokio::test] @@ -403,7 +601,7 @@ mod tests { .snapshot() .unwrap() .snapshot - .file_stats(); + .log_data(); let col_stats = file_stats.statistics(); println!("{:?}", col_stats); diff --git a/crates/deltalake-core/src/kernel/snapshot/log_segment.rs b/crates/deltalake-core/src/kernel/snapshot/log_segment.rs index 60b2d2ee4d..bc2f3c0418 100644 --- a/crates/deltalake-core/src/kernel/snapshot/log_segment.rs +++ b/crates/deltalake-core/src/kernel/snapshot/log_segment.rs @@ -92,15 +92,6 @@ fn get_decoder(schema: ArrowSchemaRef, config: &DeltaTableConfig) -> DeltaResult .build_decoder()?) } -#[derive(Debug, Clone, Serialize, Deserialize)] -struct FileInfo { - location: String, - size: usize, - last_modified: chrono::DateTime, - e_tag: Option, - version: Option, -} - #[derive(Debug, Clone, PartialEq)] pub(super) struct LogSegment { pub(super) version: i64, @@ -377,7 +368,7 @@ impl LogSegment { self.commit_files.push_front(meta); let reader = json::get_reader(&bytes); let batches = - json::read_from_json(&mut decoder, reader).collect::, _>>()?; + json::decode_reader(&mut decoder, reader).collect::, _>>()?; commit_data.push(batches); } diff --git a/crates/deltalake-core/src/kernel/snapshot/mod.rs b/crates/deltalake-core/src/kernel/snapshot/mod.rs index 504f84329b..2f24247bf5 100644 --- a/crates/deltalake-core/src/kernel/snapshot/mod.rs +++ b/crates/deltalake-core/src/kernel/snapshot/mod.rs @@ -1,6 +1,15 @@ //! Delta table snapshots //! //! A snapshot represents the state of a Delta Table at a given version. +//! +//! There are two types of snapshots: +//! +//! - [`Snapshot`] is a snapshot where most data is loaded on demand and only the +//! bare minimum - [`Protocol`] and [`Metadata`] - is cached in memory. +//! - [`EagerSnapshot`] is a snapshot where much more log data is eagerly loaded into memory. +//! +//! The sub modules provide structures and methods that aid in generating +//! and consuming snapshots. use std::io::{BufRead, BufReader, Cursor}; use std::sync::Arc; @@ -12,9 +21,8 @@ use futures::{StreamExt, TryStreamExt}; use object_store::path::Path; use object_store::ObjectStore; -use self::log_data::{FileStats, FileStatsHandler}; use self::log_segment::{CommitData, LogSegment, PathExt}; -use self::parse::{extract_adds, extract_removes}; +use self::parse::{read_adds, read_removes}; use self::replay::{LogReplayScanner, ReplayStream}; use super::{Action, Add, CommitInfo, Metadata, Protocol, Remove}; use crate::kernel::StructType; @@ -29,6 +37,8 @@ pub(crate) mod parse; mod replay; mod serde; +pub use log_data::*; + /// A snapshot of a Delta table #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] pub struct Snapshot { @@ -244,12 +254,9 @@ impl Snapshot { let reader = BufReader::new(Cursor::new(commit_log_bytes)); for line in reader.lines() { let action: Action = serde_json::from_str(line?.as_str())?; - match action { - Action::CommitInfo(commit_info) => { - return Ok::<_, DeltaTableError>(Some(commit_info)) - } - _ => (), - }; + if let Action::CommitInfo(commit_info) = action { + return Ok::<_, DeltaTableError>(Some(commit_info)); + } } Ok(None) } @@ -274,7 +281,7 @@ impl Snapshot { Ok(log_stream .chain(checkpoint_stream) .map(|batch| match batch { - Ok(batch) => extract_removes(&batch), + Ok(batch) => read_removes(&batch), Err(e) => Err(e), }) .boxed()) @@ -285,6 +292,8 @@ impl Snapshot { #[derive(Debug, Clone, PartialEq)] pub struct EagerSnapshot { snapshot: Snapshot, + // NOTE: this is a Vec of RecordBatch instead of a single RecordBatch because + // we do not yet enforce a consistent schema across all batches we read from the log. files: Vec, } @@ -398,6 +407,11 @@ impl EagerSnapshot { self.snapshot.table_config() } + /// Get a [`LogDataHandler`] for the snapshot to inspect the currently loaded state of the log. + pub fn log_data(&self) -> LogDataHandler<'_> { + LogDataHandler::new(&self.files, self.metadata(), self.schema()) + } + /// Get the number of files in the snapshot pub fn files_count(&self) -> usize { self.files.iter().map(|f| f.num_rows()).sum() @@ -405,17 +419,12 @@ impl EagerSnapshot { /// Get the files in the snapshot pub fn file_actions(&self) -> DeltaResult + '_> { - Ok(self.files.iter().flat_map(|b| extract_adds(b)).flatten()) - } - - /// Get a file action iterator for the given version - pub fn file_stats(&self) -> FileStatsHandler<'_> { - FileStatsHandler::new(&self.files, self.metadata(), self.schema()) + Ok(self.files.iter().flat_map(|b| read_adds(b)).flatten()) } /// Get a file action iterator for the given version - pub fn file_stats_iter(&self) -> impl Iterator>> { - self.file_stats().into_iter() + pub fn file_stats(&self) -> impl Iterator> { + self.log_data().into_iter() } /// Advance the snapshot based on the given commit actions @@ -485,7 +494,7 @@ mod datafusion { impl EagerSnapshot { /// Provide table level statistics to Datafusion pub fn datafusion_table_statistics(&self) -> Option { - self.file_stats().statistics() + self.log_data().statistics() } } } diff --git a/crates/deltalake-core/src/kernel/snapshot/parse.rs b/crates/deltalake-core/src/kernel/snapshot/parse.rs index bce8823b8d..4f0a09c40b 100644 --- a/crates/deltalake-core/src/kernel/snapshot/parse.rs +++ b/crates/deltalake-core/src/kernel/snapshot/parse.rs @@ -64,7 +64,7 @@ pub(super) fn read_protocol(batch: &dyn ProvidesColumnByName) -> DeltaResult DeltaResult> { +pub(super) fn read_adds(array: &dyn ProvidesColumnByName) -> DeltaResult> { let mut result = Vec::new(); if let Some(arr) = ex::extract_and_cast_opt::(array, "add") { @@ -134,7 +134,7 @@ pub(super) fn extract_adds(array: &dyn ProvidesColumnByName) -> DeltaResult DeltaResult> { +pub(super) fn read_removes(array: &dyn ProvidesColumnByName) -> DeltaResult> { let mut result = Vec::new(); if let Some(arr) = ex::extract_and_cast_opt::(array, "remove") { diff --git a/crates/deltalake-core/src/kernel/snapshot/replay.rs b/crates/deltalake-core/src/kernel/snapshot/replay.rs index 581ff0684d..3b2066af85 100644 --- a/crates/deltalake-core/src/kernel/snapshot/replay.rs +++ b/crates/deltalake-core/src/kernel/snapshot/replay.rs @@ -289,15 +289,13 @@ impl LogReplayScanner { } } } else { - for a in add_actions.into_iter() { - if let Some(a) = a { - let file_id = seen_key(&a); - if !self.seen.contains(&file_id) { - is_log_batch.then(|| self.seen.insert(file_id)); - keep.push(true); - } else { - keep.push(false); - } + for a in add_actions.into_iter().flatten() { + let file_id = seen_key(&a); + if !self.seen.contains(&file_id) { + is_log_batch.then(|| self.seen.insert(file_id)); + keep.push(true); + } else { + keep.push(false); } } }; diff --git a/crates/deltalake-core/src/kernel/snapshot/serde.rs b/crates/deltalake-core/src/kernel/snapshot/serde.rs index b69f8e7251..5162c4a1fe 100644 --- a/crates/deltalake-core/src/kernel/snapshot/serde.rs +++ b/crates/deltalake-core/src/kernel/snapshot/serde.rs @@ -1,6 +1,6 @@ use arrow_ipc::reader::FileReader; use arrow_ipc::writer::FileWriter; -use chrono::{DateTime, Utc}; +use chrono::{TimeZone, Utc}; use object_store::ObjectMeta; use serde::de::{self, Deserializer, SeqAccess, Visitor}; use serde::{ser::SerializeSeq, Deserialize, Serialize}; @@ -13,7 +13,7 @@ use super::EagerSnapshot; struct FileInfo { path: String, size: usize, - last_modified: DateTime, + last_modified: i64, e_tag: Option, version: Option, } @@ -29,7 +29,7 @@ impl Serialize for LogSegment { .map(|f| FileInfo { path: f.location.to_string(), size: f.size, - last_modified: f.last_modified, + last_modified: f.last_modified.timestamp_nanos_opt().unwrap(), e_tag: f.e_tag.clone(), version: f.version.clone(), }) @@ -40,7 +40,7 @@ impl Serialize for LogSegment { .map(|f| FileInfo { path: f.location.to_string(), size: f.size, - last_modified: f.last_modified, + last_modified: f.last_modified.timestamp_nanos_opt().unwrap(), e_tag: f.e_tag.clone(), version: f.version.clone(), }) @@ -82,12 +82,16 @@ impl<'de> Visitor<'de> for LogSegmentVisitor { version, commit_files: commit_files .into_iter() - .map(|f| ObjectMeta { - location: f.path.into(), - size: f.size, - last_modified: f.last_modified, - version: f.version, - e_tag: f.e_tag, + .map(|f| { + let seconds = f.last_modified / 1_000_000_000; + let nano_seconds = (f.last_modified % 1_000_000_000) as u32; + ObjectMeta { + location: f.path.into(), + size: f.size, + last_modified: Utc.timestamp_opt(seconds, nano_seconds).single().unwrap(), + version: f.version, + e_tag: f.e_tag, + } }) .collect(), checkpoint_files: checkpoint_files @@ -95,7 +99,9 @@ impl<'de> Visitor<'de> for LogSegmentVisitor { .map(|f| ObjectMeta { location: f.path.into(), size: f.size, - last_modified: f.last_modified, + last_modified: Utc.from_utc_datetime( + &chrono::NaiveDateTime::from_timestamp_millis(f.last_modified).unwrap(), + ), version: None, e_tag: None, }) diff --git a/crates/deltalake-core/src/lib.rs b/crates/deltalake-core/src/lib.rs index 9da976d056..dad58fd1b9 100644 --- a/crates/deltalake-core/src/lib.rs +++ b/crates/deltalake-core/src/lib.rs @@ -393,40 +393,40 @@ mod tests { value: crate::PartitionValue::NotEqual("2".to_string()), }]; assert_eq!( - table.get_files_by_partitions(&filters).unwrap(), - vec![ - Path::from("year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet"), - Path::from("year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet"), - Path::from("year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet"), - Path::from("year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet") - ] - ); + table.get_files_by_partitions(&filters).unwrap(), + vec![ + Path::from("year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet"), + Path::from("year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet"), + Path::from("year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet"), + Path::from("year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet") + ] + ); let filters = vec![crate::PartitionFilter { key: "month".to_string(), value: crate::PartitionValue::In(vec!["2".to_string(), "12".to_string()]), }]; assert_eq!( - table.get_files_by_partitions(&filters).unwrap(), - vec![ - Path::from("year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"), - Path::from("year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet"), - Path::from("year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet"), - Path::from("year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet") - ] - ); + table.get_files_by_partitions(&filters).unwrap(), + vec![ + Path::from("year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"), + Path::from("year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet"), + Path::from("year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet"), + Path::from("year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet") + ] + ); let filters = vec![crate::PartitionFilter { key: "month".to_string(), value: crate::PartitionValue::NotIn(vec!["2".to_string(), "12".to_string()]), }]; assert_eq!( - table.get_files_by_partitions(&filters).unwrap(), - vec![ - Path::from("year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet"), - Path::from("year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet") - ] - ); + table.get_files_by_partitions(&filters).unwrap(), + vec![ + Path::from("year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet"), + Path::from("year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet") + ] + ); } #[tokio::test] diff --git a/crates/deltalake-core/src/operations/convert_to_delta.rs b/crates/deltalake-core/src/operations/convert_to_delta.rs index aae22095ea..afde6724d8 100644 --- a/crates/deltalake-core/src/operations/convert_to_delta.rs +++ b/crates/deltalake-core/src/operations/convert_to_delta.rs @@ -2,14 +2,13 @@ // https://github.com/delta-io/delta/blob/1d5dd774111395b0c4dc1a69c94abc169b1c83b6/spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala use crate::{ - kernel::{Action, Add, Schema, StructField}, + kernel::{Action, Add, DataType, Schema, StructField}, logstore::{LogStore, LogStoreRef}, operations::create::CreateBuilder, protocol::SaveMode, table::builder::ensure_table_uri, table::config::DeltaConfigKey, - DeltaResult, DeltaTable, DeltaTableError, DeltaTablePartition, ObjectStoreError, - NULL_PARTITION_VALUE_DATA_PATH, + DeltaResult, DeltaTable, DeltaTableError, ObjectStoreError, NULL_PARTITION_VALUE_DATA_PATH, }; use arrow::{datatypes::Schema as ArrowSchema, error::ArrowError}; use futures::{ @@ -28,7 +27,7 @@ use std::{ str::{FromStr, Utf8Error}, sync::Arc, }; -use tracing::{debug, info}; +use tracing::debug; /// Error converting a Parquet table to a Delta table #[derive(Debug, thiserror::Error)] @@ -245,7 +244,7 @@ impl ConvertToDeltaBuilder { if log_store.is_delta_table_location().await? { return Err(Error::DeltaTableAlready); } - info!( + debug!( "Converting Parquet table in log store location: {:?}", log_store.root_uri() ); @@ -263,6 +262,7 @@ impl ConvertToDeltaBuilder { future::ready(Ok(())) }) .await?; + if files.is_empty() { return Err(Error::ParquetFileNotFound); } @@ -273,44 +273,49 @@ impl ConvertToDeltaBuilder { // A HashSet of all unique partition columns in a Parquet table let mut partition_columns = HashSet::new(); // A vector of StructField of all unique partition columns in a Parquet table - let mut partition_schema_fields = Vec::new(); + let mut partition_schema_fields = HashMap::new(); + for file in files { // A HashMap from partition column to value for this parquet file only let mut partition_values = HashMap::new(); - let mut iter = file.location.as_ref().split('/').peekable(); + let location = file.location.clone().to_string(); + let mut iter = location.split('/').peekable(); let mut subpath = iter.next(); + // Get partitions from subpaths. Skip the last subpath while iter.peek().is_some() { - if let Some(subpath) = subpath { - // Return an error if the partition is not hive-partitioning - let partition = DeltaTablePartition::try_from( - percent_decode_str(subpath).decode_utf8()?.as_ref(), - )?; - debug!( - "Found partition {partition:#?} in parquet file {:#?}", - file.location - ); - let (key, val) = (partition.key, partition.value); - partition_values.insert( - key.clone(), - if val == NULL_PARTITION_VALUE_DATA_PATH { - None - } else { - Some(val) - }, - ); - if partition_columns.insert(key.clone()) { - if let Some(schema) = self.partition_schema.take(key.as_str()) { - partition_schema_fields.push(schema); - } else { - // Return an error if the schema of a partition column is not provided by user - return Err(Error::MissingPartitionSchema); - } + let curr_path = subpath.unwrap(); + let (key, value) = curr_path + .split_once('=') + .ok_or(Error::MissingPartitionSchema)?; + + if partition_columns.insert(key.to_string()) { + if let Some(schema) = self.partition_schema.take(key) { + partition_schema_fields.insert(key.to_string(), schema); + } else { + // Return an error if the schema of a partition column is not provided by user + return Err(Error::MissingPartitionSchema); } + } + + // Safety: we just checked that the key is present in the map + let field = partition_schema_fields.get(key).unwrap(); + let scalar = if value == NULL_PARTITION_VALUE_DATA_PATH { + Ok(crate::kernel::Scalar::Null(field.data_type().clone())) } else { - // This error shouldn't happen. The while condition ensures that subpath is not none - panic!("Subpath iterator index overflows"); + let decoded = percent_decode_str(value).decode_utf8()?; + match field.data_type() { + DataType::Primitive(p) => p.parse_scalar(decoded.as_ref()), + _ => Err(crate::kernel::Error::Generic(format!( + "Exprected primitive type, found: {:?}", + field.data_type() + ))), + } } + .map_err(|_| Error::MissingPartitionSchema)?; + + partition_values.insert(key.to_string(), scalar); + subpath = iter.next(); } @@ -319,7 +324,19 @@ impl ConvertToDeltaBuilder { .decode_utf8()? .to_string(), size: i64::try_from(file.size)?, - partition_values, + partition_values: partition_values + .into_iter() + .map(|(k, v)| { + ( + k, + if v.is_null() { + None + } else { + Some(v.serialize()) + }, + ) + }) + .collect(), modification_time: file.last_modified.timestamp_millis(), data_change: true, ..Default::default() @@ -333,6 +350,7 @@ impl ConvertToDeltaBuilder { .schema() .as_ref() .clone(); + // Arrow schema of Parquet files may have conflicting metatdata // Since Arrow schema metadata is not used to generate Delta table schema, we set the metadata field to an empty HashMap arrow_schema.metadata = HashMap::new(); @@ -349,8 +367,12 @@ impl ConvertToDeltaBuilder { let mut schema_fields = Schema::try_from(&ArrowSchema::try_merge(arrow_schemas)?)? .fields() .clone(); - schema_fields.append(&mut partition_schema_fields); - debug!("Schema fields for the parquet table: {schema_fields:#?}"); + schema_fields.append( + &mut partition_schema_fields + .values() + .cloned() + .collect::>(), + ); // Generate CreateBuilder with corresponding add actions, schemas and operation meta let mut builder = CreateBuilder::new() @@ -369,6 +391,7 @@ impl ConvertToDeltaBuilder { if let Some(metadata) = self.metadata { builder = builder.with_metadata(metadata); } + Ok(builder) } } @@ -395,7 +418,10 @@ impl std::future::IntoFuture for ConvertToDeltaBuilder { mod tests { use super::*; use crate::{ - kernel::schema::{DataType, PrimitiveType}, + kernel::{ + schema::{DataType, PrimitiveType}, + Scalar, + }, open_table, storage::StorageOptions, Path, @@ -490,7 +516,7 @@ mod tests { expected_version: i64, expected_paths: Vec, expected_schema: Vec, - expected_partition_values: &[(String, Option)], + expected_partition_values: &[(String, Scalar)], ) { assert_eq!( table.version(), @@ -519,13 +545,17 @@ mod tests { let mut partition_values = table .snapshot() .unwrap() - .files() - .unwrap() + .log_data() .into_iter() - .map(|add| add.partition_values) - .flat_map(|map| map.clone()) + .flat_map(|add| { + add.partition_values() + .unwrap() + .iter() + .map(|(k, v)| (k.to_string(), v.clone())) + .collect::>() + }) .collect::>(); - partition_values.sort(); + partition_values.sort_by_key(|(k, v)| (k.clone(), v.serialize())); assert_eq!(partition_values, expected_partition_values); } @@ -538,12 +568,13 @@ mod tests { .get_active_add_actions_by_partitions(&[]) .expect("Failed to get Add actions") .next() - .expect("Iterator index overflows"); + .expect("Iterator index overflows") + .unwrap(); assert_eq!( - action.path, + action.path(), "part-00000-d22c627d-9655-4153-9527-f8995620fa42-c000.snappy.parquet" ); - assert!(action.data_change); + assert_delta_table( table, path, @@ -552,7 +583,7 @@ mod tests { "part-00000-d22c627d-9655-4153-9527-f8995620fa42-c000.snappy.parquet", )], vec![ - schema_field("date", PrimitiveType::Date, true), + StructField::new("date", DataType::DATE, true), schema_field("dayOfYear", PrimitiveType::Integer, true), ], &[], @@ -574,12 +605,12 @@ mod tests { Path::from("k=__HIVE_DEFAULT_PARTITION__/part-00001-8474ac85-360b-4f58-b3ea-23990c71b932.c000.snappy.parquet") ], vec![ - schema_field("k", PrimitiveType::String, true), - schema_field("v", PrimitiveType::Long, true), + StructField::new("k", DataType::STRING, true), + StructField::new("v", DataType::LONG, true), ], &[ - ("k".to_string(), None), - ("k".to_string(), Some("A".to_string())), + ("k".to_string(), Scalar::String("A".to_string())), + ("k".to_string(), Scalar::Null(DataType::STRING)), ], ); @@ -609,8 +640,8 @@ mod tests { schema_field("y", PrimitiveType::Long, true), ], &[ - ("x".to_string(), Some("A/A".to_string())), - ("x".to_string(), Some("B B".to_string())), + ("x".to_string(), Scalar::String("A/A".to_string())), + ("x".to_string(), Scalar::String("B B".to_string())), ], ); @@ -656,24 +687,24 @@ mod tests { schema_field("year", PrimitiveType::String, true), ], &[ - ("day".to_string(), Some("1".to_string())), - ("day".to_string(), Some("20".to_string())), - ("day".to_string(), Some("3".to_string())), - ("day".to_string(), Some("4".to_string())), - ("day".to_string(), Some("5".to_string())), - ("day".to_string(), Some("5".to_string())), - ("month".to_string(), Some("1".to_string())), - ("month".to_string(), Some("12".to_string())), - ("month".to_string(), Some("12".to_string())), - ("month".to_string(), Some("2".to_string())), - ("month".to_string(), Some("2".to_string())), - ("month".to_string(), Some("4".to_string())), - ("year".to_string(), Some("2020".to_string())), - ("year".to_string(), Some("2020".to_string())), - ("year".to_string(), Some("2020".to_string())), - ("year".to_string(), Some("2021".to_string())), - ("year".to_string(), Some("2021".to_string())), - ("year".to_string(), Some("2021".to_string())), + ("day".to_string(), Scalar::String("1".to_string())), + ("day".to_string(), Scalar::String("20".to_string())), + ("day".to_string(), Scalar::String("3".to_string())), + ("day".to_string(), Scalar::String("4".to_string())), + ("day".to_string(), Scalar::String("5".to_string())), + ("day".to_string(), Scalar::String("5".to_string())), + ("month".to_string(), Scalar::String("1".to_string())), + ("month".to_string(), Scalar::String("12".to_string())), + ("month".to_string(), Scalar::String("12".to_string())), + ("month".to_string(), Scalar::String("2".to_string())), + ("month".to_string(), Scalar::String("2".to_string())), + ("month".to_string(), Scalar::String("4".to_string())), + ("year".to_string(), Scalar::String("2020".to_string())), + ("year".to_string(), Scalar::String("2020".to_string())), + ("year".to_string(), Scalar::String("2020".to_string())), + ("year".to_string(), Scalar::String("2021".to_string())), + ("year".to_string(), Scalar::String("2021".to_string())), + ("year".to_string(), Scalar::String("2021".to_string())), ], ); } @@ -764,12 +795,12 @@ mod tests { schema_field("c3", PrimitiveType::Integer, true), ], &[ - ("c1".to_string(), Some("4".to_string())), - ("c1".to_string(), Some("5".to_string())), - ("c1".to_string(), Some("6".to_string())), - ("c2".to_string(), Some("a".to_string())), - ("c2".to_string(), Some("b".to_string())), - ("c2".to_string(), Some("c".to_string())), + ("c1".to_string(), Scalar::Integer(4)), + ("c1".to_string(), Scalar::Integer(5)), + ("c1".to_string(), Scalar::Integer(6)), + ("c2".to_string(), Scalar::String("a".to_string())), + ("c2".to_string(), Scalar::String("b".to_string())), + ("c2".to_string(), Scalar::String("c".to_string())), ], ); @@ -801,10 +832,10 @@ mod tests { schema_field("z", PrimitiveType::String, true), ], &[ - ("x".to_string(), Some("10".to_string())), - ("x".to_string(), Some("9".to_string())), - ("y".to_string(), Some("10.0".to_string())), - ("y".to_string(), Some("9.9".to_string())), + ("x".to_string(), Scalar::Long(10)), + ("x".to_string(), Scalar::Long(9)), + ("y".to_string(), Scalar::Double(10.0)), + ("y".to_string(), Scalar::Double(9.9)), ], ); } diff --git a/crates/deltalake-core/src/operations/filesystem_check.rs b/crates/deltalake-core/src/operations/filesystem_check.rs index 0c9249fd41..923f0aea54 100644 --- a/crates/deltalake-core/src/operations/filesystem_check.rs +++ b/crates/deltalake-core/src/operations/filesystem_check.rs @@ -101,10 +101,10 @@ impl FileSystemCheckBuilder { async fn create_fsck_plan(&self) -> DeltaResult { let mut files_relative: HashMap = - HashMap::with_capacity(self.snapshot.files()?.len()); + HashMap::with_capacity(self.snapshot.file_actions()?.len()); let log_store = self.log_store.clone(); - for active in self.snapshot.files()? { + for active in self.snapshot.file_actions()? { if is_absolute_path(&active.path)? { return Err(DeltaTableError::Generic( "Filesystem check does not support absolute paths".to_string(), diff --git a/crates/deltalake-core/src/operations/merge/mod.rs b/crates/deltalake-core/src/operations/merge/mod.rs index 88144d4416..70403c4091 100644 --- a/crates/deltalake-core/src/operations/merge/mod.rs +++ b/crates/deltalake-core/src/operations/merge/mod.rs @@ -1334,7 +1334,7 @@ async fn execute( { let lock = survivors.lock().unwrap(); - for action in snapshot.files()? { + for action in snapshot.file_actions()? { if lock.contains(&action.path) { metrics.num_target_files_removed += 1; actions.push(Action::Remove(Remove { diff --git a/crates/deltalake-core/src/operations/optimize.rs b/crates/deltalake-core/src/operations/optimize.rs index b753e34af9..c2c95f65e9 100644 --- a/crates/deltalake-core/src/operations/optimize.rs +++ b/crates/deltalake-core/src/operations/optimize.rs @@ -20,7 +20,7 @@ //! let (table, metrics) = OptimizeBuilder::new(table.object_store(), table.state).await?; //! ```` -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; @@ -41,7 +41,7 @@ use tracing::debug; use super::transaction::{commit, PROTOCOL}; use super::writer::{PartitionWriter, PartitionWriterConfig}; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{Action, Remove}; +use crate::kernel::{Action, PartitionsExt, Remove, Scalar}; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; use crate::storage::ObjectStoreRef; @@ -308,7 +308,7 @@ impl From for DeltaOperation { fn create_remove( path: &str, - partitions: &HashMap>, + partitions: &BTreeMap, size: i64, ) -> Result { // NOTE unwrap is safe since UNIX_EPOCH will always be earlier then now. @@ -320,7 +320,21 @@ fn create_remove( deletion_timestamp: Some(deletion_time), data_change: false, extended_file_metadata: None, - partition_values: Some(partitions.to_owned()), + partition_values: Some( + partitions + .iter() + .map(|(k, v)| { + ( + k.clone(), + if v.is_null() { + None + } else { + Some(v.serialize()) + }, + ) + }) + .collect(), + ), size: Some(size), deletion_vector: None, tags: None, @@ -339,9 +353,12 @@ enum OptimizeOperations { /// /// Bins are determined by the bin-packing algorithm to reach an optimal size. /// Files that are large enough already are skipped. Bins of size 1 are dropped. - Compact(HashMap>), + Compact(HashMap, Vec)>), /// Plan to Z-order each partition - ZOrder(Vec, HashMap), + ZOrder( + Vec, + HashMap, MergeBin)>, + ), // TODO: Sort } @@ -370,8 +387,6 @@ pub struct MergeTaskParameters { input_parameters: OptimizeInput, /// Schema of written files file_schema: ArrowSchemaRef, - /// Column names the table is partitioned by. - partition_columns: Vec, /// Properties passed to parquet writer writer_properties: WriterProperties, } @@ -386,7 +401,7 @@ impl MergePlan { /// collected during the operation. async fn rewrite_files( task_parameters: Arc, - partition: PartitionTuples, + partition_values: BTreeMap, files: MergeBin, object_store: ObjectStoreRef, read_stream: F, @@ -394,9 +409,8 @@ impl MergePlan { where F: Future> + Send + 'static, { - debug!("Rewriting files in partition: {:?}", partition); + debug!("Rewriting files in partition: {:?}", partition_values); // First, initialize metrics - let partition_values = partition.to_hashmap(); let mut partial_actions = files .iter() .map(|file_meta| { @@ -430,7 +444,6 @@ impl MergePlan { let writer_config = PartitionWriterConfig::try_new( task_parameters.file_schema.clone(), partition_values.clone(), - task_parameters.partition_columns.clone(), Some(task_parameters.writer_properties.clone()), Some(task_parameters.input_parameters.target_size as usize), None, @@ -463,7 +476,10 @@ impl MergePlan { }); partial_actions.extend(add_actions); - debug!("Finished rewriting files in partition: {:?}", partition); + debug!( + "Finished rewriting files in partition: {:?}", + partition_values + ); Ok((partial_actions, partial_metrics)) } @@ -596,7 +612,7 @@ impl MergePlan { let stream = match operations { OptimizeOperations::Compact(bins) => futures::stream::iter(bins) - .flat_map(|(partition, bins)| { + .flat_map(|(_, (partition, bins))| { futures::stream::iter(bins).map(move |bin| (partition.clone(), bin)) }) .map(|(partition, files)| { @@ -653,7 +669,7 @@ impl MergePlan { let task_parameters = self.task_parameters.clone(); let log_store = log_store.clone(); futures::stream::iter(bins) - .map(move |(partition, files)| { + .map(move |(_, (partition, files))| { let batch_stream = Self::read_zorder(files.clone(), exec_context.clone()); let rewrite_result = tokio::task::spawn(Self::rewrite_files( task_parameters.clone(), @@ -748,27 +764,6 @@ impl MergePlan { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -struct PartitionTuples(Vec<(String, Option)>); - -impl PartitionTuples { - fn from_hashmap( - partition_columns: &[String], - partition_values: &HashMap>, - ) -> Self { - let mut tuples = Vec::new(); - for column in partition_columns { - let value = partition_values.get(column).cloned().flatten(); - tuples.push((column.clone(), value)); - } - Self(tuples) - } - - fn to_hashmap(&self) -> HashMap> { - self.0.iter().cloned().collect() - } -} - /// Build a Plan on which files to merge together. See [OptimizeBuilder] pub fn create_merge_plan( optimize_type: OptimizeType, @@ -778,13 +773,10 @@ pub fn create_merge_plan( writer_properties: WriterProperties, ) -> Result { let target_size = target_size.unwrap_or_else(|| snapshot.table_config().target_file_size()); - let partitions_keys = &snapshot.metadata().partition_columns; let (operations, metrics) = match optimize_type { - OptimizeType::Compact => { - build_compaction_plan(snapshot, partitions_keys, filters, target_size)? - } + OptimizeType::Compact => build_compaction_plan(snapshot, filters, target_size)?, OptimizeType::ZOrder(zorder_columns) => { build_zorder_plan(zorder_columns, snapshot, partitions_keys, filters)? } @@ -800,7 +792,6 @@ pub fn create_merge_plan( task_parameters: Arc::new(MergeTaskParameters { input_parameters, file_schema, - partition_columns: partitions_keys.clone(), writer_properties, }), read_table_version: snapshot.version(), @@ -853,14 +844,15 @@ impl IntoIterator for MergeBin { fn build_compaction_plan( snapshot: &DeltaTableState, - partition_keys: &[String], filters: &[PartitionFilter], target_size: i64, ) -> Result<(OptimizeOperations, Metrics), DeltaTableError> { let mut metrics = Metrics::default(); - let mut partition_files: HashMap> = HashMap::new(); + let mut partition_files: HashMap, Vec)> = + HashMap::new(); for add in snapshot.get_active_add_actions_by_partitions(filters)? { + let add = add?; metrics.total_considered_files += 1; let object_meta = ObjectMeta::try_from(&add)?; if (object_meta.size as i64) > target_size { @@ -868,18 +860,20 @@ fn build_compaction_plan( continue; } - let part = PartitionTuples::from_hashmap(partition_keys, &add.partition_values); - - partition_files.entry(part).or_default().push(object_meta); + partition_files + .entry(add.partition_values()?.hive_partition_path()) + .or_default() + .1 + .push(object_meta); } - for file in partition_files.values_mut() { + for (_, file) in partition_files.values_mut() { // Sort files by size: largest to smallest file.sort_by(|a, b| b.size.cmp(&a.size)); } - let mut operations: HashMap> = HashMap::new(); - for (part, files) in partition_files { + let mut operations: HashMap, Vec)> = HashMap::new(); + for (part, (partition, files)) in partition_files { let mut merge_bins = vec![MergeBin::new()]; 'files: for file in files { @@ -896,11 +890,11 @@ fn build_compaction_plan( merge_bins.push(new_bin); } - operations.insert(part, merge_bins); + operations.insert(part, (partition, merge_bins)); } // Prune merge bins with only 1 file, since they have no effect - for (_, bins) in operations.iter_mut() { + for (_, (_, bins)) in operations.iter_mut() { bins.retain(|bin| { if bin.len() == 1 { metrics.total_files_skipped += 1; @@ -910,7 +904,7 @@ fn build_compaction_plan( } }) } - operations.retain(|_, files| !files.is_empty()); + operations.retain(|_, (_, files)| !files.is_empty()); metrics.partitions_optimized = operations.len() as u64; @@ -956,15 +950,21 @@ fn build_zorder_plan( // For now, just be naive and optimize all files in each selected partition. let mut metrics = Metrics::default(); - let mut partition_files: HashMap = HashMap::new(); + let mut partition_files: HashMap, MergeBin)> = HashMap::new(); for add in snapshot.get_active_add_actions_by_partitions(filters)? { + let add = add?; + let partition_values = add + .partition_values()? + .into_iter() + .map(|(k, v)| (k.to_string(), v)) + .collect::>(); metrics.total_considered_files += 1; let object_meta = ObjectMeta::try_from(&add)?; - let part = PartitionTuples::from_hashmap(partition_keys, &add.partition_values); partition_files - .entry(part) - .or_insert_with(MergeBin::new) + .entry(partition_values.hive_partition_path()) + .or_insert_with(|| (partition_values, MergeBin::new())) + .1 .add(object_meta); } diff --git a/crates/deltalake-core/src/operations/restore.rs b/crates/deltalake-core/src/operations/restore.rs index d64c1ea4f7..2718ee34fb 100644 --- a/crates/deltalake-core/src/operations/restore.rs +++ b/crates/deltalake-core/src/operations/restore.rs @@ -171,8 +171,8 @@ async fn execute( snapshot.version(), ))); } - let state_to_restore_files = table.snapshot()?.files()?; - let latest_state_files = snapshot.files()?; + let state_to_restore_files = table.snapshot()?.file_actions()?; + let latest_state_files = snapshot.file_actions()?; let state_to_restore_files_set = HashSet::::from_iter(state_to_restore_files.iter().cloned()); let latest_state_files_set = HashSet::::from_iter(latest_state_files.iter().cloned()); diff --git a/crates/deltalake-core/src/operations/transaction/state.rs b/crates/deltalake-core/src/operations/transaction/state.rs index 199ac79912..6a48ea729f 100644 --- a/crates/deltalake-core/src/operations/transaction/state.rs +++ b/crates/deltalake-core/src/operations/transaction/state.rs @@ -81,7 +81,7 @@ impl DeltaTableState { let expr = logical_expr_to_physical_expr(&predicate, self.arrow_schema()?.as_ref()); let pruning_predicate = PruningPredicate::try_new(expr, self.arrow_schema()?)?; Ok(Either::Left( - self.files()? + self.file_actions()? .into_iter() .zip(pruning_predicate.prune(self)?) .filter_map( @@ -95,7 +95,7 @@ impl DeltaTableState { ), )) } else { - Ok(Either::Right(self.files()?.into_iter())) + Ok(Either::Right(self.file_actions()?.into_iter())) } } @@ -117,7 +117,11 @@ impl DeltaTableState { &self, object_store: Arc, ) -> DeltaResult { - if let Some(add) = self.files()?.iter().max_by_key(|obj| obj.modification_time) { + if let Some(add) = self + .file_actions()? + .iter() + .max_by_key(|obj| obj.modification_time) + { let file_meta = add.try_into()?; let file_reader = ParquetObjectReader::new(object_store, file_meta); let file_schema = ParquetRecordBatchStreamBuilder::new(file_reader) @@ -298,7 +302,7 @@ impl PruningStatistics for DeltaTableState { /// return the minimum values for the named column, if known. /// Note: the returned array must contain `num_containers()` rows fn min_values(&self, column: &Column) -> Option { - let files = self.files().ok()?; + let files = self.file_actions().ok()?; let partition_columns = &self.metadata().partition_columns; let container = AddContainer::new(&files, partition_columns, self.arrow_schema().ok()?); container.min_values(column) @@ -307,7 +311,7 @@ impl PruningStatistics for DeltaTableState { /// return the maximum values for the named column, if known. /// Note: the returned array must contain `num_containers()` rows. fn max_values(&self, column: &Column) -> Option { - let files = self.files().ok()?; + let files = self.file_actions().ok()?; let partition_columns = &self.metadata().partition_columns; let container = AddContainer::new(&files, partition_columns, self.arrow_schema().ok()?); container.max_values(column) @@ -316,7 +320,7 @@ impl PruningStatistics for DeltaTableState { /// return the number of containers (e.g. row groups) being /// pruned with these statistics fn num_containers(&self) -> usize { - self.files().unwrap().len() + self.file_actions().unwrap().len() } /// return the number of null values for the named column as an @@ -324,7 +328,7 @@ impl PruningStatistics for DeltaTableState { /// /// Note: the returned array must contain `num_containers()` rows. fn null_counts(&self, column: &Column) -> Option { - let files = self.files().ok()?; + let files = self.file_actions().ok()?; let partition_columns = &self.metadata().partition_columns; let container = AddContainer::new(&files, partition_columns, self.arrow_schema().ok()?); container.null_counts(column) diff --git a/crates/deltalake-core/src/operations/vacuum.rs b/crates/deltalake-core/src/operations/vacuum.rs index b0f8a3193a..68827cbd12 100644 --- a/crates/deltalake-core/src/operations/vacuum.rs +++ b/crates/deltalake-core/src/operations/vacuum.rs @@ -206,7 +206,7 @@ impl VacuumBuilder { self.log_store.object_store().clone(), ) .await?; - let valid_files = self.snapshot.file_paths_iter()?.collect::>(); + let valid_files = self.snapshot.file_paths_iter().collect::>(); let mut files_to_delete = vec![]; let mut file_sizes = vec![]; diff --git a/crates/deltalake-core/src/operations/write.rs b/crates/deltalake-core/src/operations/write.rs index 7ef271d43d..739ad18b42 100644 --- a/crates/deltalake-core/src/operations/write.rs +++ b/crates/deltalake-core/src/operations/write.rs @@ -43,13 +43,12 @@ use super::writer::{DeltaWriter, WriterConfig}; use super::{transaction::commit, CreateBuilder}; use crate::delta_datafusion::DeltaDataChecker; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{Action, Add, Remove, StructType}; +use crate::kernel::{Action, Add, PartitionsExt, Remove, StructType}; use crate::logstore::LogStoreRef; use crate::protocol::{DeltaOperation, SaveMode}; use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; use crate::writer::record_batch::divide_by_partition_values; -use crate::writer::utils::PartitionPath; use crate::DeltaTable; #[derive(thiserror::Error, Debug)] @@ -440,12 +439,7 @@ impl std::future::IntoFuture for WriteBuilder { &batch, )?; for part in divided { - let key = PartitionPath::from_hashmap( - &partition_columns, - &part.partition_values, - ) - .map_err(DeltaTableError::from)? - .into(); + let key = part.partition_values.hive_partition_path(); match partitions.get_mut(&key) { Some(part_batches) => { part_batches.push(part.record_batch); @@ -538,7 +532,7 @@ impl std::future::IntoFuture for WriteBuilder { } _ => { let remove_actions = snapshot - .files()? + .file_actions()? .iter() .map(to_remove_action) .collect::>(); diff --git a/crates/deltalake-core/src/operations/writer.rs b/crates/deltalake-core/src/operations/writer.rs index 8b31f9c252..d1249f1766 100644 --- a/crates/deltalake-core/src/operations/writer.rs +++ b/crates/deltalake-core/src/operations/writer.rs @@ -1,6 +1,6 @@ //! Abstractions and implementations for writing data to delta tables -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use arrow::datatypes::SchemaRef as ArrowSchemaRef; use arrow::error::ArrowError; @@ -14,13 +14,13 @@ use tracing::debug; use crate::crate_version; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::Add; +use crate::kernel::{Add, PartitionsExt, Scalar}; use crate::storage::ObjectStoreRef; use crate::writer::record_batch::{divide_by_partition_values, PartitionResult}; use crate::writer::stats::create_add; use crate::writer::utils::{ arrow_schema_without_partitions, next_data_path, record_batch_without_partitions, - PartitionPath, ShareableBuffer, + ShareableBuffer, }; // TODO databricks often suggests a file size of 100mb, should we set this default? @@ -40,11 +40,6 @@ enum WriteError { source: Box, }, - #[error("Error creating file name from partition info: {source}")] - FileName { - source: Box, - }, - #[error("Error handling Arrow data: {source}")] Arrow { #[from] @@ -160,12 +155,9 @@ impl DeltaWriter { pub async fn write_partition( &mut self, record_batch: RecordBatch, - partition_values: &HashMap>, + partition_values: &BTreeMap, ) -> DeltaResult<()> { - let partition_key = - PartitionPath::from_hashmap(&self.config.partition_columns, partition_values)? - .as_ref() - .into(); + let partition_key = Path::parse(partition_values.hive_partition_path())?; let record_batch = record_batch_without_partitions(&record_batch, &self.config.partition_columns)?; @@ -178,7 +170,6 @@ impl DeltaWriter { let config = PartitionWriterConfig::try_new( self.config.file_schema(), partition_values.clone(), - self.config.partition_columns.clone(), Some(self.config.writer_properties.clone()), Some(self.config.target_file_size), Some(self.config.write_batch_size), @@ -226,7 +217,7 @@ pub(crate) struct PartitionWriterConfig { /// Prefix applied to all paths prefix: Path, /// Values for all partition columns - partition_values: HashMap>, + partition_values: BTreeMap, /// Properties passed to underlying parquet writer writer_properties: WriterProperties, /// Size above which we will write a buffered parquet file to disk. @@ -239,17 +230,13 @@ pub(crate) struct PartitionWriterConfig { impl PartitionWriterConfig { pub fn try_new( file_schema: ArrowSchemaRef, - partition_values: HashMap>, - partition_columns: Vec, + partition_values: BTreeMap, writer_properties: Option, target_file_size: Option, write_batch_size: Option, ) -> DeltaResult { - let part_path = PartitionPath::from_hashmap(&partition_columns, &partition_values) - .map_err(|err| WriteError::FileName { - source: Box::new(err), - })?; - let prefix = Path::parse(part_path.as_ref())?; + let part_path = partition_values.hive_partition_path(); + let prefix = Path::parse(part_path)?; let writer_properties = writer_properties.unwrap_or_else(|| { WriterProperties::builder() .set_created_by(format!("delta-rs version {}", crate_version())) @@ -467,8 +454,7 @@ mod tests { ) -> PartitionWriter { let config = PartitionWriterConfig::try_new( batch.schema(), - HashMap::new(), - Vec::new(), + BTreeMap::new(), writer_properties, target_file_size, None, diff --git a/crates/deltalake-core/src/protocol/checkpoints.rs b/crates/deltalake-core/src/protocol/checkpoints.rs index 00d4062125..dd11c93e79 100644 --- a/crates/deltalake-core/src/protocol/checkpoints.rs +++ b/crates/deltalake-core/src/protocol/checkpoints.rs @@ -274,7 +274,7 @@ fn parquet_bytes_from_state( remove.extended_file_metadata = Some(false); } } - let files = state.files().unwrap(); + let files = state.file_actions().unwrap(); // protocol let jsons = std::iter::once(Action::Protocol(Protocol { min_reader_version: state.protocol().min_reader_version, diff --git a/crates/deltalake-core/src/schema/partitions.rs b/crates/deltalake-core/src/schema/partitions.rs index 4e8830596c..a52b82bd9d 100644 --- a/crates/deltalake-core/src/schema/partitions.rs +++ b/crates/deltalake-core/src/schema/partitions.rs @@ -1,13 +1,11 @@ //! Delta Table partition handling logic. - +//! +use std::cmp::Ordering; +use std::collections::HashMap; use std::convert::TryFrom; -use chrono::{NaiveDateTime, ParseResult}; - use crate::errors::DeltaTableError; -use crate::kernel::{DataType, PrimitiveType}; -use std::cmp::Ordering; -use std::collections::HashMap; +use crate::kernel::{DataType, PrimitiveType, Scalar}; /// A special value used in Hive to represent the null partition in partitioned tables pub const NULL_PARTITION_VALUE_DATA_PATH: &str = "__HIVE_DEFAULT_PARTITION__"; @@ -42,47 +40,18 @@ pub struct PartitionFilter { pub value: PartitionValue, } -fn parse_timestamp(timestamp_str: &str) -> ParseResult { - // Timestamp format as per https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization - let format = "%Y-%m-%d %H:%M:%S%.f"; - - NaiveDateTime::parse_from_str(timestamp_str, format) -} - fn compare_typed_value( - partition_value: &str, + partition_value: &Scalar, filter_value: &str, data_type: &DataType, ) -> Option { match data_type { - DataType::Primitive(primitive_type) => match primitive_type { - PrimitiveType::Long - | PrimitiveType::Integer - | PrimitiveType::Short - | PrimitiveType::Byte => match filter_value.parse::() { - Ok(parsed_filter_value) => { - let parsed_partition_value = partition_value.parse::().unwrap(); - parsed_partition_value.partial_cmp(&parsed_filter_value) - } - _ => None, - }, - PrimitiveType::Float | PrimitiveType::Double => match filter_value.parse::() { - Ok(parsed_filter_value) => { - let parsed_partition_value = partition_value.parse::().unwrap(); - parsed_partition_value.partial_cmp(&parsed_filter_value) - } - _ => None, - }, - PrimitiveType::Timestamp => match parse_timestamp(filter_value) { - Ok(parsed_filter_value) => { - let parsed_partition_value = parse_timestamp(partition_value).unwrap(); - parsed_partition_value.partial_cmp(&parsed_filter_value) - } - _ => None, - }, - _ => partition_value.partial_cmp(filter_value), - }, - _ => partition_value.partial_cmp(filter_value), + DataType::Primitive(primitive_type) => { + let other = primitive_type.parse_scalar(filter_value).ok()?; + partition_value.partial_cmp(&other) + } + // NOTE: complex types are not supported as partition columns + _ => None, } } @@ -93,6 +62,9 @@ impl PartitionFilter { if self.key != partition.key { return false; } + if self.value == PartitionValue::Equal("".to_string()) { + return partition.value.is_null(); + } match &self.value { PartitionValue::Equal(value) => { @@ -101,7 +73,7 @@ impl PartitionFilter { .map(|x| x.is_eq()) .unwrap_or(false) } else { - value == &partition.value + partition.value.serialize() == *value } } PartitionValue::NotEqual(value) => { @@ -110,7 +82,7 @@ impl PartitionFilter { .map(|x| !x.is_eq()) .unwrap_or(false) } else { - value != &partition.value + !(partition.value.serialize() == *value) } } PartitionValue::GreaterThan(value) => { @@ -133,8 +105,8 @@ impl PartitionFilter { .map(|x| x.is_le()) .unwrap_or(false) } - PartitionValue::In(value) => value.contains(&partition.value), - PartitionValue::NotIn(value) => !value.contains(&partition.value), + PartitionValue::In(value) => value.contains(&partition.value.serialize()), + PartitionValue::NotIn(value) => !value.contains(&partition.value.serialize()), } } @@ -215,82 +187,23 @@ impl TryFrom<(&str, &str, &[&str])> for PartitionFilter { } /// A Struct DeltaTablePartition used to represent a partition of a DeltaTable. -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq)] pub struct DeltaTablePartition { /// The key of the DeltaTable partition. pub key: String, /// The value of the DeltaTable partition. - pub value: String, + pub value: Scalar, } -/// Create a DeltaTable partition from a HivePartition string. -/// -/// A HivePartition string is represented by a "key=value" format. -/// -/// ```rust -/// use deltalake_core::DeltaTablePartition; -/// -/// let hive_part = "ds=2023-01-01"; -/// let partition = DeltaTablePartition::try_from(hive_part).unwrap(); -/// assert_eq!("ds", partition.key); -/// assert_eq!("2023-01-01", partition.value); -/// ``` -impl TryFrom<&str> for DeltaTablePartition { - type Error = DeltaTableError; - - /// Try to create a DeltaTable partition from a HivePartition string. - /// Returns a DeltaTableError if the string is not in the form of a HivePartition. - fn try_from(partition: &str) -> Result { - let partition_splitted: Vec<&str> = partition.split('=').collect(); - match partition_splitted { - partition_splitted if partition_splitted.len() == 2 => Ok(DeltaTablePartition { - key: partition_splitted[0].to_owned(), - value: partition_splitted[1].to_owned(), - }), - _ => Err(DeltaTableError::PartitionError { - partition: partition.to_string(), - }), - } - } -} +impl Eq for DeltaTablePartition {} impl DeltaTablePartition { - /// Try to create a DeltaTable partition from a partition value kv pair. - /// - /// ```rust - /// use deltalake_core::DeltaTablePartition; - /// - /// let value = ("ds", &Some("2023-01-01".to_string())); - /// let null_default = "1979-01-01"; - /// let partition = DeltaTablePartition::from_partition_value(value, null_default); - /// - /// assert_eq!("ds", partition.key); - /// assert_eq!("2023-01-01", partition.value); - /// ``` - pub fn from_partition_value( - partition_value: (&str, &Option), - default_for_null: &str, - ) -> Self { + /// Create a DeltaTable partition from a Tuple of (key, value). + pub fn from_partition_value(partition_value: (&str, &Scalar)) -> Self { let (k, v) = partition_value; - let v = match v { - Some(s) => s, - None => default_for_null, - }; DeltaTablePartition { key: k.to_owned(), value: v.to_owned(), } } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn tryfrom_invalid() { - let buf = "this-is-not-a-partition"; - let partition = DeltaTablePartition::try_from(buf); - assert!(partition.is_err()); - } -} diff --git a/crates/deltalake-core/src/table/mod.rs b/crates/deltalake-core/src/table/mod.rs index c10e0bc262..662bd32419 100644 --- a/crates/deltalake-core/src/table/mod.rs +++ b/crates/deltalake-core/src/table/mod.rs @@ -15,13 +15,13 @@ use tracing::debug; use self::builder::DeltaTableConfig; use self::state::DeltaTableState; -use crate::errors::DeltaTableError; -use crate::kernel::{Action, Add, CommitInfo, DataCheck, DataType, Metadata, Protocol, StructType}; -use crate::logstore::LogStoreRef; -use crate::logstore::{self, LogStoreConfig}; +use crate::kernel::{ + Action, CommitInfo, DataCheck, DataType, FileStats, Metadata, Protocol, StructType, +}; +use crate::logstore::{self, LogStoreConfig, LogStoreRef}; use crate::partitions::PartitionFilter; use crate::storage::{commit_uri_from_version, ObjectStoreRef}; -use crate::DeltaResult; +use crate::{DeltaResult, DeltaTableError}; pub mod builder; pub mod config; @@ -412,7 +412,7 @@ impl DeltaTable { pub fn get_active_add_actions_by_partitions<'a>( &'a self, filters: &'a [PartitionFilter], - ) -> Result + '_, DeltaTableError> { + ) -> Result>> + '_, DeltaTableError> { self.state .as_ref() .ok_or(DeltaTableError::NoMetadata)? @@ -425,15 +425,12 @@ impl DeltaTable { &self, filters: &[PartitionFilter], ) -> Result, DeltaTableError> { + println!("get_files_by_partitions ----------->"); Ok(self .get_active_add_actions_by_partitions(filters)? - .map(|add| { - // Try to preserve percent encoding if possible - match Path::parse(&add.path) { - Ok(path) => path, - Err(_) => Path::from(add.path.as_ref()), - } - }) + .collect::, _>>()? + .into_iter() + .map(|add| add.object_store_path()) .collect()) } @@ -452,10 +449,11 @@ impl DeltaTable { /// Returns an iterator of file names present in the loaded state #[inline] pub fn get_files_iter(&self) -> DeltaResult + '_> { - self.state + Ok(self + .state .as_ref() .ok_or(DeltaTableError::NoMetadata)? - .file_paths_iter() + .file_paths_iter()) } /// Returns a URIs for all active files present in the current table version. @@ -464,7 +462,7 @@ impl DeltaTable { .state .as_ref() .ok_or(DeltaTableError::NoMetadata)? - .file_paths_iter()? + .file_paths_iter() .map(|path| self.log_store.to_uri(&path))) } diff --git a/crates/deltalake-core/src/table/state.rs b/crates/deltalake-core/src/table/state.rs index f0994cf235..e6be024715 100644 --- a/crates/deltalake-core/src/table/state.rs +++ b/crates/deltalake-core/src/table/state.rs @@ -9,16 +9,14 @@ use object_store::{path::Path, ObjectStore}; use serde::{Deserialize, Serialize}; use super::config::TableConfig; -use super::get_partition_col_data_types; -use crate::errors::DeltaTableError; -use crate::kernel::EagerSnapshot; -use crate::kernel::{Action, Add, DataType, Remove, StructType}; -use crate::kernel::{Metadata, Protocol}; +use super::{get_partition_col_data_types, DeltaTableConfig}; +use crate::kernel::{ + Action, Add, DataType, EagerSnapshot, FileStats, LogDataHandler, Metadata, Protocol, Remove, + StructType, +}; use crate::partitions::{DeltaTablePartition, PartitionFilter}; use crate::protocol::DeltaOperation; -use crate::DeltaResult; - -use super::DeltaTableConfig; +use crate::{DeltaResult, DeltaTableError}; /// State snapshot currently held by the Delta Table instance. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -91,6 +89,11 @@ impl DeltaTableState { }) } + /// Returns a semantic accessor to the currently loaded log data. + pub fn log_data(&self) -> LogDataHandler<'_> { + self.snapshot.log_data() + } + /// Full list of tombstones (remove actions) representing files removed from table state). pub async fn all_tombstones( &self, @@ -125,7 +128,7 @@ impl DeltaTableState { /// Full list of add actions representing all parquet files that are part of the current /// delta table state. - pub fn files(&self) -> DeltaResult> { + pub fn file_actions(&self) -> DeltaResult> { Ok(self.snapshot.file_actions()?.collect()) } @@ -136,14 +139,10 @@ impl DeltaTableState { /// Returns an iterator of file names present in the loaded state #[inline] - pub fn file_paths_iter(&self) -> DeltaResult + '_> { - Ok(self - .snapshot - .file_actions()? - .map(|add| match Path::parse(&add.path) { - Ok(path) => path, - Err(_) => Path::from(add.path.as_ref()), - })) + pub fn file_paths_iter(&self) -> impl Iterator + '_ { + self.log_data() + .into_iter() + .map(|add| add.object_store_path()) } /// HashMap containing the last txn version stored for every app id writing txn @@ -180,7 +179,7 @@ impl DeltaTableState { /// function will update the tracked version if the version on `new_state` is larger then the /// currently set version however it is up to the caller to update the `version` field according /// to the version the merged state represents. - pub fn merge( + pub(crate) fn merge( &mut self, actions: Vec, operation: &DeltaOperation, @@ -208,7 +207,7 @@ impl DeltaTableState { pub fn get_active_add_actions_by_partitions<'a>( &'a self, filters: &'a [PartitionFilter], - ) -> Result + '_, DeltaTableError> { + ) -> Result>> + '_, DeltaTableError> { let current_metadata = self.metadata(); let nonpartitioned_columns: Vec = filters @@ -228,16 +227,27 @@ impl DeltaTableState { .into_iter() .collect(); - let actions = self.files()?.into_iter().filter(move |add| { - let partitions = add - .partition_values + Ok(self.log_data().into_iter().filter_map(move |add| { + let partitions = add.partition_values(); + if partitions.is_err() { + return Some(Err(DeltaTableError::Generic( + "Failed to parse partition values".to_string(), + ))); + } + let partitions = partitions + .unwrap() .iter() - .map(|p| DeltaTablePartition::from_partition_value((p.0, p.1), "")) - .collect::>(); - filters + .map(|(k, v)| DeltaTablePartition::from_partition_value((*k, v))) + .collect::>(); + let is_valid = filters .iter() - .all(|filter| filter.match_partitions(&partitions, &partition_col_data_types)) - }); - Ok(actions) + .all(|filter| filter.match_partitions(&partitions, &partition_col_data_types)); + + if is_valid { + Some(Ok(add)) + } else { + None + } + })) } } diff --git a/crates/deltalake-core/src/table/state_arrow.rs b/crates/deltalake-core/src/table/state_arrow.rs index af14550732..143ab23d1c 100644 --- a/crates/deltalake-core/src/table/state_arrow.rs +++ b/crates/deltalake-core/src/table/state_arrow.rs @@ -54,7 +54,7 @@ impl DeltaTableState { &self, flatten: bool, ) -> Result { - let files = self.files()?; + let files = self.file_actions()?; let mut paths = arrow::array::StringBuilder::with_capacity( files.len(), files.iter().map(|add| add.path.len()).sum(), @@ -397,7 +397,7 @@ impl DeltaTableState { flatten: bool, ) -> Result { let stats: Vec> = self - .files()? + .file_actions()? .iter() .map(|f| { f.get_stats() diff --git a/crates/deltalake-core/src/writer/json.rs b/crates/deltalake-core/src/writer/json.rs index 29e08519a7..5732d4ec49 100644 --- a/crates/deltalake-core/src/writer/json.rs +++ b/crates/deltalake-core/src/writer/json.rs @@ -1,5 +1,5 @@ //! Main writer API to write json messages to delta table -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::convert::TryFrom; use std::sync::Arc; @@ -19,11 +19,11 @@ use uuid::Uuid; use super::stats::create_add; use super::utils::{ arrow_schema_without_partitions, next_data_path, record_batch_from_message, - record_batch_without_partitions, stringified_partition_value, + record_batch_without_partitions, }; -use super::{utils::PartitionPath, DeltaWriter, DeltaWriterError}; +use super::{DeltaWriter, DeltaWriterError}; use crate::errors::DeltaTableError; -use crate::kernel::{Add, StructType}; +use crate::kernel::{Add, PartitionsExt, Scalar, StructType}; use crate::table::builder::DeltaTableBuilder; use crate::writer::utils::ShareableBuffer; use crate::DeltaTable; @@ -45,7 +45,7 @@ pub(crate) struct DataArrowWriter { writer_properties: WriterProperties, buffer: ShareableBuffer, arrow_writer: ArrowWriter, - partition_values: HashMap>, + partition_values: BTreeMap, buffered_record_batch_count: usize, } @@ -153,7 +153,7 @@ impl DataArrowWriter { writer_properties.clone(), )?; - let partition_values = HashMap::new(); + let partition_values = BTreeMap::new(); let buffered_record_batch_count = 0; Ok(Self { @@ -340,8 +340,7 @@ impl DeltaWriter> for JsonWriter { for (_, writer) in writers { let metadata = writer.arrow_writer.close()?; - let prefix = - PartitionPath::from_hashmap(&self.partition_columns, &writer.partition_values)?; + let prefix = writer.partition_values.hive_partition_path(); let prefix = Path::parse(prefix)?; let uuid = Uuid::new_v4(); @@ -398,18 +397,17 @@ fn quarantine_failed_parquet_rows( fn extract_partition_values( partition_cols: &[String], record_batch: &RecordBatch, -) -> Result>, DeltaWriterError> { - let mut partition_values = HashMap::new(); +) -> Result, DeltaWriterError> { + let mut partition_values = BTreeMap::new(); for col_name in partition_cols.iter() { let arrow_schema = record_batch.schema(); - let i = arrow_schema.index_of(col_name)?; let col = record_batch.column(i); + let value = Scalar::from_array(col.as_ref(), 0) + .ok_or(DeltaWriterError::MissingPartitionColumn(col_name.clone()))?; - let partition_string = stringified_partition_value(col)?; - - partition_values.insert(col_name.clone(), partition_string); + partition_values.insert(col_name.clone(), value); } Ok(partition_values) @@ -427,6 +425,7 @@ mod tests { use crate::arrow::datatypes::{ DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, }; + use crate::kernel::DataType; use crate::writer::test_utils::get_delta_schema; use crate::writer::DeltaWriter; use crate::writer::JsonWriter; @@ -499,15 +498,15 @@ mod tests { &record_batch ) .unwrap(), - HashMap::from([ - (String::from("col1"), Some(String::from("1"))), - (String::from("col2"), Some(String::from("2"))), - (String::from("col3"), None), + BTreeMap::from([ + (String::from("col1"), Scalar::Integer(1)), + (String::from("col2"), Scalar::Integer(2)), + (String::from("col3"), Scalar::Null(DataType::INTEGER)), ]) ); assert_eq!( extract_partition_values(&[String::from("col1")], &record_batch).unwrap(), - HashMap::from([(String::from("col1"), Some(String::from("1"))),]) + BTreeMap::from([(String::from("col1"), Scalar::Integer(1)),]) ); assert!(extract_partition_values(&[String::from("col4")], &record_batch).is_err()) } diff --git a/crates/deltalake-core/src/writer/record_batch.rs b/crates/deltalake-core/src/writer/record_batch.rs index 6c8eb871c4..48525a3335 100644 --- a/crates/deltalake-core/src/writer/record_batch.rs +++ b/crates/deltalake-core/src/writer/record_batch.rs @@ -5,6 +5,7 @@ //! the writer. Once written, add actions are returned by the writer. It's the users responsibility //! to create the transaction using those actions. +use std::collections::BTreeMap; use std::{collections::HashMap, sync::Arc}; use arrow::array::{Array, UInt32Array}; @@ -22,11 +23,11 @@ use uuid::Uuid; use super::stats::create_add; use super::utils::{ arrow_schema_without_partitions, next_data_path, record_batch_without_partitions, - stringified_partition_value, PartitionPath, ShareableBuffer, + ShareableBuffer, }; use super::{DeltaWriter, DeltaWriterError}; use crate::errors::DeltaTableError; -use crate::kernel::{Add, StructType}; +use crate::kernel::{Add, PartitionsExt, Scalar, StructType}; use crate::table::builder::DeltaTableBuilder; use crate::DeltaTable; @@ -126,12 +127,11 @@ impl RecordBatchWriter { pub async fn write_partition( &mut self, record_batch: RecordBatch, - partition_values: &HashMap>, + partition_values: &BTreeMap, ) -> Result<(), DeltaTableError> { let arrow_schema = arrow_schema_without_partitions(&self.arrow_schema_ref, &self.partition_columns); - let partition_key = - PartitionPath::from_hashmap(&self.partition_columns, partition_values)?.into(); + let partition_key = partition_values.hive_partition_path(); let record_batch = record_batch_without_partitions(&record_batch, &self.partition_columns)?; @@ -190,9 +190,7 @@ impl DeltaWriter for RecordBatchWriter { for (_, writer) in writers { let metadata = writer.arrow_writer.close()?; - let prefix = - PartitionPath::from_hashmap(&self.partition_columns, &writer.partition_values)?; - let prefix = Path::parse(prefix)?; + let prefix = Path::parse(writer.partition_values.hive_partition_path())?; let uuid = Uuid::new_v4(); let path = next_data_path(&prefix, 0, &uuid, &writer.writer_properties); let obj_bytes = Bytes::from(writer.buffer.to_vec()); @@ -214,7 +212,7 @@ impl DeltaWriter for RecordBatchWriter { #[derive(Clone, Debug)] pub struct PartitionResult { /// values found in partition columns - pub partition_values: HashMap>, + pub partition_values: BTreeMap, /// remaining dataset with partition column values removed pub record_batch: RecordBatch, } @@ -224,14 +222,14 @@ struct PartitionWriter { writer_properties: WriterProperties, pub(super) buffer: ShareableBuffer, pub(super) arrow_writer: ArrowWriter, - pub(super) partition_values: HashMap>, + pub(super) partition_values: BTreeMap, pub(super) buffered_record_batch_count: usize, } impl PartitionWriter { pub fn new( arrow_schema: Arc, - partition_values: HashMap>, + partition_values: BTreeMap, writer_properties: WriterProperties, ) -> Result { let buffer = ShareableBuffer::default(); @@ -304,7 +302,7 @@ pub(crate) fn divide_by_partition_values( if partition_columns.is_empty() { partitions.push(PartitionResult { - partition_values: HashMap::new(), + partition_values: BTreeMap::new(), record_batch: values.clone(), }); return Ok(partitions); @@ -332,15 +330,20 @@ pub(crate) fn divide_by_partition_values( .map(|i| Some(indices.value(i))) .collect(); - let partition_key_iter = sorted_partition_columns.iter().map(|col| { - stringified_partition_value(&col.slice(range.start, range.end - range.start)) - }); - - let mut partition_values = HashMap::new(); - for (key, value) in partition_columns.clone().iter().zip(partition_key_iter) { - partition_values.insert(key.clone(), value?); - } + let partition_key_iter = sorted_partition_columns + .iter() + .map(|col| { + Scalar::from_array(&col.slice(range.start, range.end - range.start), 0).ok_or( + DeltaWriterError::MissingPartitionColumn("failed to parse".into()), + ) + }) + .collect::, _>>()?; + let partition_values = partition_columns + .clone() + .into_iter() + .zip(partition_key_iter) + .collect(); let batch_data = arrow_schema .fields() .iter() @@ -372,10 +375,7 @@ fn lexsort_to_indices(arrays: &[ArrayRef]) -> UInt32Array { #[cfg(test)] mod tests { use super::*; - use crate::writer::{ - test_utils::{create_initialized_table, get_record_batch}, - utils::PartitionPath, - }; + use crate::writer::test_utils::{create_initialized_table, get_record_batch}; use arrow::json::ReaderBuilder; use std::path::Path; @@ -417,7 +417,7 @@ mod tests { String::from("modified=2021-02-01"), String::from("modified=2021-02-02"), ]; - validate_partition_map(partitions, &partition_cols, expected_keys) + validate_partition_map(partitions, expected_keys) } /* @@ -484,10 +484,7 @@ mod tests { assert_eq!(partitions.len(), expected_keys.len()); for result in partitions { - let partition_key = - PartitionPath::from_hashmap(&partition_cols, &result.partition_values) - .unwrap() - .into(); + let partition_key = result.partition_values.hive_partition_path(); assert!(expected_keys.contains(&partition_key)); } } @@ -507,7 +504,7 @@ mod tests { String::from("modified=2021-02-02/id=A"), String::from("modified=2021-02-02/id=B"), ]; - validate_partition_map(partitions, &partition_cols.clone(), expected_keys) + validate_partition_map(partitions, expected_keys) } #[tokio::test] @@ -547,17 +544,10 @@ mod tests { } } - fn validate_partition_map( - partitions: Vec, - partition_cols: &[String], - expected_keys: Vec, - ) { + fn validate_partition_map(partitions: Vec, expected_keys: Vec) { assert_eq!(partitions.len(), expected_keys.len()); for result in partitions { - let partition_key = - PartitionPath::from_hashmap(partition_cols, &result.partition_values) - .unwrap() - .into(); + let partition_key = result.partition_values.hive_partition_path(); assert!(expected_keys.contains(&partition_key)); let ref_batch = get_record_batch(Some(partition_key.clone()), false); assert_eq!(ref_batch, result.record_batch); diff --git a/crates/deltalake-core/src/writer/stats.rs b/crates/deltalake-core/src/writer/stats.rs index 3663f3aa99..4ba217cc1e 100644 --- a/crates/deltalake-core/src/writer/stats.rs +++ b/crates/deltalake-core/src/writer/stats.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeMap; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use std::{collections::HashMap, ops::AddAssign}; @@ -11,12 +12,12 @@ use parquet::{ }; use super::*; -use crate::kernel::Add; +use crate::kernel::{Add, Scalar}; use crate::protocol::{ColumnValueStat, Stats}; /// Creates an [`Add`] log action struct. pub fn create_add( - partition_values: &HashMap>, + partition_values: &BTreeMap, path: String, size: i64, file_metadata: &FileMetaData, @@ -32,7 +33,19 @@ pub fn create_add( Ok(Add { path, size, - partition_values: partition_values.to_owned(), + partition_values: partition_values + .iter() + .map(|(k, v)| { + ( + k.clone(), + if v.is_null() { + None + } else { + Some(v.serialize()) + }, + ) + }) + .collect(), modification_time, data_change: true, stats: Some(stats_string), @@ -46,7 +59,7 @@ pub fn create_add( } fn stats_from_file_metadata( - partition_values: &HashMap>, + partition_values: &BTreeMap, file_metadata: &FileMetaData, ) -> Result { let type_ptr = parquet::schema::types::from_thrift(file_metadata.schema.as_slice()); diff --git a/crates/deltalake-core/src/writer/utils.rs b/crates/deltalake-core/src/writer/utils.rs index 173340f368..3c95942993 100644 --- a/crates/deltalake-core/src/writer/utils.rs +++ b/crates/deltalake-core/src/writer/utils.rs @@ -1,109 +1,22 @@ //! Handle JSON messages when writing to delta tables -use std::collections::HashMap; -use std::fmt::Display; +//! + use std::io::Write; use std::sync::Arc; -use arrow::array::{ - as_boolean_array, as_generic_binary_array, as_largestring_array, as_primitive_array, - as_string_array, Array, -}; -use arrow::datatypes::{ - DataType, Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, - Int8Type, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit, - TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, - TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, -}; +use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use arrow::json::ReaderBuilder; use arrow::record_batch::*; use object_store::path::Path; -use object_store::path::DELIMITER_BYTE; use parking_lot::RwLock; use parquet::basic::Compression; use parquet::file::properties::WriterProperties; use parquet::schema::types::ColumnPath; -use percent_encoding::{percent_encode, AsciiSet, CONTROLS}; use serde_json::Value; use uuid::Uuid; use crate::errors::DeltaResult; use crate::writer::DeltaWriterError; -use crate::NULL_PARTITION_VALUE_DATA_PATH; - -const PARTITION_DATE_FORMAT: &str = "%Y-%m-%d"; -const PARTITION_DATETIME_FORMAT: &str = "%Y-%m-%d %H:%M:%S"; - -#[derive(Debug, Clone, Hash, PartialEq, Eq)] -pub(crate) struct PartitionPath { - path: String, -} - -impl PartitionPath { - pub fn from_hashmap( - partition_columns: &[String], - partition_values: &HashMap>, - ) -> Result { - let mut path_parts = vec![]; - for k in partition_columns.iter() { - let partition_value = partition_values - .get(k) - .ok_or_else(|| DeltaWriterError::MissingPartitionColumn(k.to_string()))?; - let path_part = if let Some(val) = partition_value.as_deref() { - let encoded = percent_encode(val.as_bytes(), INVALID).to_string(); - format!("{k}={encoded}") - } else { - format!("{k}={NULL_PARTITION_VALUE_DATA_PATH}") - }; - path_parts.push(path_part); - } - - Ok(PartitionPath { - path: path_parts.join("/"), - }) - } -} - -const INVALID: &AsciiSet = &CONTROLS - // everything object store needs encoded ... - .add(DELIMITER_BYTE) - .add(b'\\') - .add(b'{') - .add(b'^') - .add(b'}') - .add(b'%') - .add(b'`') - .add(b']') - .add(b'"') - .add(b'>') - .add(b'[') - .add(b'~') - .add(b'<') - .add(b'#') - .add(b'|') - .add(b'\r') - .add(b'\n') - .add(b'*') - .add(b'?') - //... and some more chars illegal on windows - .add(b':'); - -impl From for String { - fn from(path: PartitionPath) -> String { - path.path - } -} - -impl AsRef for PartitionPath { - fn as_ref(&self) -> &str { - &self.path - } -} - -impl Display for PartitionPath { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { - self.path.fmt(f) - } -} /// Generate the name of the file to be written /// prefix: The location of the file to be written @@ -159,90 +72,6 @@ pub fn record_batch_from_message( .ok_or_else(|| DeltaWriterError::EmptyRecordBatch.into()) } -// very naive implementation for plucking the partition value from the first element of a column array. -// ideally, we would do some validation to ensure the record batch containing the passed partition column contains only distinct values. -// if we calculate stats _first_, we can avoid the extra iteration by ensuring max and min match for the column. -// however, stats are optional and can be added later with `dataChange` false log entries, and it may be more appropriate to add stats _later_ to speed up the initial write. -// a happy middle-road might be to compute stats for partition columns only on the initial write since we should validate partition values anyway, and compute additional stats later (at checkpoint time perhaps?). -// also this does not currently support nested partition columns and many other data types. -// TODO is this comment still valid, since we should be sure now, that the arrays where this -// gets applied have a single unique value -pub(crate) fn stringified_partition_value( - arr: &Arc, -) -> Result, DeltaWriterError> { - let data_type = arr.data_type(); - - if arr.is_null(0) { - return Ok(None); - } - - let s = match data_type { - DataType::Int8 => as_primitive_array::(arr).value(0).to_string(), - DataType::Int16 => as_primitive_array::(arr).value(0).to_string(), - DataType::Int32 => as_primitive_array::(arr).value(0).to_string(), - DataType::Int64 => as_primitive_array::(arr).value(0).to_string(), - DataType::UInt8 => as_primitive_array::(arr).value(0).to_string(), - DataType::UInt16 => as_primitive_array::(arr).value(0).to_string(), - DataType::UInt32 => as_primitive_array::(arr).value(0).to_string(), - DataType::UInt64 => as_primitive_array::(arr).value(0).to_string(), - DataType::Float32 => as_primitive_array::(arr).value(0).to_string(), - DataType::Float64 => as_primitive_array::(arr).value(0).to_string(), - DataType::Utf8 => as_string_array(arr).value(0).to_string(), - DataType::LargeUtf8 => as_largestring_array(arr).value(0).to_string(), - DataType::Boolean => as_boolean_array(arr).value(0).to_string(), - DataType::Date32 => as_primitive_array::(arr) - .value_as_date(0) - .unwrap() - .format(PARTITION_DATE_FORMAT) - .to_string(), - DataType::Date64 => as_primitive_array::(arr) - .value_as_date(0) - .unwrap() - .format(PARTITION_DATE_FORMAT) - .to_string(), - DataType::Timestamp(TimeUnit::Second, _) => as_primitive_array::(arr) - .value_as_datetime(0) - .unwrap() - .format(PARTITION_DATETIME_FORMAT) - .to_string(), - DataType::Timestamp(TimeUnit::Millisecond, _) => { - as_primitive_array::(arr) - .value_as_datetime(0) - .unwrap() - .format(PARTITION_DATETIME_FORMAT) - .to_string() - } - DataType::Timestamp(TimeUnit::Microsecond, _) => { - as_primitive_array::(arr) - .value_as_datetime(0) - .unwrap() - .format(PARTITION_DATETIME_FORMAT) - .to_string() - } - DataType::Timestamp(TimeUnit::Nanosecond, _) => { - as_primitive_array::(arr) - .value_as_datetime(0) - .unwrap() - .format(PARTITION_DATETIME_FORMAT) - .to_string() - } - DataType::Binary => as_generic_binary_array::(arr) - .value(0) - .escape_ascii() - .to_string(), - DataType::LargeBinary => as_generic_binary_array::(arr) - .value(0) - .escape_ascii() - .to_string(), - // TODO: handle more types - _ => { - unimplemented!("Unimplemented data type: {:?}", data_type); - } - }; - - Ok(Some(s)) -} - /// Remove any partition related columns from the record batch pub(crate) fn record_batch_without_partitions( record_batch: &RecordBatch, @@ -331,70 +160,8 @@ impl Write for ShareableBuffer { #[cfg(test)] mod tests { use super::*; - use arrow::array::{ - BinaryArray, BooleanArray, Date32Array, Date64Array, Int16Array, Int32Array, Int64Array, - Int8Array, LargeBinaryArray, StringArray, TimestampMicrosecondArray, - TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array, - UInt32Array, UInt64Array, UInt8Array, - }; use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel}; - #[test] - fn test_stringified_partition_value() { - let reference_pairs: Vec<(Arc, Option<&str>)> = vec![ - (Arc::new(Int8Array::from(vec![None])), None), - (Arc::new(Int8Array::from(vec![1])), Some("1")), - (Arc::new(Int16Array::from(vec![1])), Some("1")), - (Arc::new(Int32Array::from(vec![1])), Some("1")), - (Arc::new(Int64Array::from(vec![1])), Some("1")), - (Arc::new(UInt8Array::from(vec![1])), Some("1")), - (Arc::new(UInt16Array::from(vec![1])), Some("1")), - (Arc::new(UInt32Array::from(vec![1])), Some("1")), - (Arc::new(UInt64Array::from(vec![1])), Some("1")), - (Arc::new(UInt8Array::from(vec![1])), Some("1")), - (Arc::new(StringArray::from(vec!["1"])), Some("1")), - (Arc::new(BooleanArray::from(vec![true])), Some("true")), - (Arc::new(BooleanArray::from(vec![false])), Some("false")), - (Arc::new(Date32Array::from(vec![1])), Some("1970-01-02")), - ( - Arc::new(Date64Array::from(vec![86400000])), - Some("1970-01-02"), - ), - ( - Arc::new(TimestampSecondArray::from(vec![1])), - Some("1970-01-01 00:00:01"), - ), - ( - Arc::new(TimestampMillisecondArray::from(vec![1000])), - Some("1970-01-01 00:00:01"), - ), - ( - Arc::new(TimestampMicrosecondArray::from(vec![1000000])), - Some("1970-01-01 00:00:01"), - ), - ( - Arc::new(TimestampNanosecondArray::from(vec![1000000000])), - Some("1970-01-01 00:00:01"), - ), - (Arc::new(BinaryArray::from_vec(vec![b"1"])), Some("1")), - ( - Arc::new(BinaryArray::from_vec(vec![b"\x00\\"])), - Some("\\x00\\\\"), - ), - (Arc::new(LargeBinaryArray::from_vec(vec![b"1"])), Some("1")), - ( - Arc::new(LargeBinaryArray::from_vec(vec![b"\x00\\"])), - Some("\\x00\\\\"), - ), - ]; - for (vals, result) in reference_pairs { - assert_eq!( - stringified_partition_value(&vals).unwrap().as_deref(), - result - ) - } - } - #[test] fn test_data_path() { let prefix = Path::parse("x=0/y=0").unwrap(); diff --git a/crates/deltalake-core/tests/command_filesystem_check.rs b/crates/deltalake-core/tests/command_filesystem_check.rs index e05f088d16..d61970c188 100644 --- a/crates/deltalake-core/tests/command_filesystem_check.rs +++ b/crates/deltalake-core/tests/command_filesystem_check.rs @@ -23,20 +23,20 @@ async fn test_filesystem_check(context: &IntegrationContext) -> TestResult { let table = context.table_builder(TestTables::Simple).load().await?; let version = table.snapshot()?.version(); - let active = table.snapshot()?.files()?.len(); + let active = table.snapshot()?.file_actions()?.len(); // Validate a Dry run does not mutate the table log and indentifies orphaned add actions let op = DeltaOps::from(table); let (table, metrics) = op.filesystem_check().with_dry_run(true).await?; assert_eq!(version, table.snapshot()?.version()); - assert_eq!(active, table.snapshot()?.files()?.len()); + assert_eq!(active, table.snapshot()?.file_actions()?.len()); assert_eq!(vec![file.to_string()], metrics.files_removed); // Validate a run updates the table version with proper remove actions let op = DeltaOps::from(table); let (table, metrics) = op.filesystem_check().await?; assert_eq!(version + 1, table.snapshot()?.version()); - assert_eq!(active - 1, table.snapshot()?.files()?.len()); + assert_eq!(active - 1, table.snapshot()?.file_actions()?.len()); assert_eq!(vec![file.to_string()], metrics.files_removed); let remove = table @@ -51,7 +51,7 @@ async fn test_filesystem_check(context: &IntegrationContext) -> TestResult { let op = DeltaOps::from(table); let (table, metrics) = op.filesystem_check().await?; assert_eq!(version + 1, table.snapshot()?.version()); - assert_eq!(active - 1, table.snapshot()?.files()?.len()); + assert_eq!(active - 1, table.snapshot()?.file_actions()?.len()); assert!(metrics.files_removed.is_empty()); Ok(()) @@ -77,13 +77,13 @@ async fn test_filesystem_check_partitioned() -> TestResult { .await?; let version = table.snapshot()?.version(); - let active = table.snapshot()?.files()?.len(); + let active = table.snapshot()?.file_actions()?.len(); // Validate a run updates the table version with proper remove actions let op = DeltaOps::from(table); let (table, metrics) = op.filesystem_check().await?; assert_eq!(version + 1, table.snapshot()?.version()); - assert_eq!(active - 1, table.snapshot()?.files()?.len()); + assert_eq!(active - 1, table.snapshot()?.file_actions()?.len()); assert_eq!(vec![file.to_string()], metrics.files_removed); let remove = table diff --git a/crates/deltalake-core/tests/command_optimize.rs b/crates/deltalake-core/tests/command_optimize.rs index 24f7f2767c..9e70e22bcf 100644 --- a/crates/deltalake-core/tests/command_optimize.rs +++ b/crates/deltalake-core/tests/command_optimize.rs @@ -275,7 +275,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { let uri = context.tmp_dir.path().to_str().to_owned().unwrap(); let other_dt = deltalake_core::open_table(uri).await?; - let add = &other_dt.snapshot()?.files()?[0]; + let add = &other_dt.snapshot()?.file_actions()?[0]; let remove = Remove { path: add.path.clone(), deletion_timestamp: Some( diff --git a/crates/deltalake-core/tests/command_restore.rs b/crates/deltalake-core/tests/command_restore.rs index 59f4ccf860..9e2431251a 100644 --- a/crates/deltalake-core/tests/command_restore.rs +++ b/crates/deltalake-core/tests/command_restore.rs @@ -103,7 +103,10 @@ async fn test_restore_by_version() -> Result<(), Box> { let table_uri = context.tmp_dir.path().to_str().to_owned().unwrap(); let mut table = DeltaOps::try_from_uri(table_uri).await?; table.0.load_version(1).await?; - assert_eq!(table.0.snapshot()?.files()?, result.0.snapshot()?.files()?); + assert_eq!( + table.0.snapshot()?.file_actions()?, + result.0.snapshot()?.file_actions()? + ); let result = DeltaOps(result.0) .restore() @@ -167,7 +170,7 @@ async fn test_restore_with_error_params() -> Result<(), Box> { async fn test_restore_file_missing() -> Result<(), Box> { let context = setup_test().await?; - for file in context.table.snapshot()?.files()?.iter() { + for file in context.table.snapshot()?.file_actions()?.iter() { let p = context.tmp_dir.path().join(file.clone().path); fs::remove_file(p).unwrap(); } @@ -194,7 +197,7 @@ async fn test_restore_file_missing() -> Result<(), Box> { async fn test_restore_allow_file_missing() -> Result<(), Box> { let context = setup_test().await?; - for file in context.table.snapshot()?.files()?.iter() { + for file in context.table.snapshot()?.file_actions()?.iter() { let p = context.tmp_dir.path().join(file.clone().path); fs::remove_file(p).unwrap(); } diff --git a/crates/deltalake-core/tests/read_delta_partitions_test.rs b/crates/deltalake-core/tests/read_delta_partitions_test.rs index 25882488e7..1516566faa 100644 --- a/crates/deltalake-core/tests/read_delta_partitions_test.rs +++ b/crates/deltalake-core/tests/read_delta_partitions_test.rs @@ -1,3 +1,4 @@ +#![cfg(feature = "deltalake")] use std::collections::HashMap; use std::convert::TryFrom; diff --git a/python/deltalake/table.py b/python/deltalake/table.py index f1d3c41ef3..94eab6eafd 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -722,8 +722,7 @@ def _backwards_enumerate( yield n, elem n -= 1 - commits = list(reversed(self._table.history(limit))) - + commits = list(self._table.history(limit)) history = [] for version, commit_info_raw in _backwards_enumerate( commits, start_end=self._table.get_latest_version() diff --git a/python/src/lib.rs b/python/src/lib.rs index 7e516c9d13..e3395d2610 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -25,7 +25,7 @@ use deltalake::datafusion::datasource::provider::TableProvider; use deltalake::datafusion::prelude::SessionContext; use deltalake::delta_datafusion::DeltaDataChecker; use deltalake::errors::DeltaTableError; -use deltalake::kernel::{Action, Add, Invariant, Remove, StructType}; +use deltalake::kernel::{Action, Add, FileStats, Invariant, Remove, Scalar, StructType}; use deltalake::operations::constraints::ConstraintBuilder; use deltalake::operations::convert_to_delta::{ConvertToDeltaBuilder, PartitionStrategy}; use deltalake::operations::delete::DeleteBuilder; @@ -45,7 +45,7 @@ use deltalake::DeltaTableBuilder; use deltalake::{DeltaOps, DeltaResult}; use pyo3::exceptions::{PyRuntimeError, PyValueError}; use pyo3::prelude::*; -use pyo3::types::PyFrozenSet; +use pyo3::types::{PyDict, PyFrozenSet}; use serde_json::{Map, Value}; use crate::error::DeltaProtocolError; @@ -717,40 +717,37 @@ impl RawDeltaTable { .map_err(PythonError::from)?) } - // pub fn dataset_partitions<'py>( - // &mut self, - // py: Python<'py>, - // schema: PyArrowType, - // partition_filters: Option>, - // ) -> PyResult)>> { - // let path_set = match partition_filters { - // Some(filters) => Some(HashSet::<_>::from_iter( - // self.files_by_partitions(filters)?.iter().cloned(), - // )), - // None => None, - // }; - // - // self._table - // .get_files_iter() - // .map_err(PythonError::from)? - // .map(|p| p.to_string()) - // .zip( - // self._table - // .get_partition_values() - // .map_err(PythonError::from)?, - // ) - // .zip(self._table.get_stats().map_err(PythonError::from)?) - // .filter(|((path, _), _)| match &path_set { - // Some(path_set) => path_set.contains(path), - // None => true, - // }) - // .map(|((path, partition_values), stats)| { - // let stats = stats.map_err(PythonError::from)?; - // let expression = filestats_to_expression(py, &schema, &partition_values, stats)?; - // Ok((path, expression)) - // }) - // .collect() - // } + pub fn dataset_partitions<'py>( + &mut self, + py: Python<'py>, + schema: PyArrowType, + partition_filters: Option>, + ) -> PyResult)>> { + let path_set = match partition_filters { + Some(filters) => Some(HashSet::<_>::from_iter( + self.files_by_partitions(filters)?.iter().cloned(), + )), + None => None, + }; + self._table + .snapshot() + .map_err(PythonError::from)? + .log_data() + .into_iter() + .filter_map(|f| { + let path = f.path().to_string(); + match &path_set { + Some(path_set) => path_set.contains(&path).then_some((path, f)), + None => Some((path, f)), + } + }) + .map(|(path, f)| { + let expression = filestats_to_expression_next(py, &schema, f)?; + println!("path: {:?}", path); + Ok((path, expression)) + }) + .collect() + } fn get_active_partitions<'py>( &self, @@ -810,14 +807,25 @@ impl RawDeltaTable { .map_err(PythonError::from)? .get_active_add_actions_by_partitions(&converted_filters) .map_err(PythonError::from)? - .collect::>(); - let active_partitions: HashSet)>> = adds + .collect::, _>>() + .map_err(PythonError::from)?; + let active_partitions: HashSet)>> = adds .iter() - .map(|add| { - partition_columns - .iter() - .map(|col| (*col, add.partition_values.get(*col).unwrap().as_deref())) - .collect() + .flat_map(|add| { + Ok::<_, PythonError>( + partition_columns + .iter() + .flat_map(|col| { + Ok::<_, PythonError>(( + *col, + add.partition_values() + .map_err(PythonError::from)? + .get(*col) + .map(|v| v.serialize()), + )) + }) + .collect(), + ) }) .collect(); @@ -862,17 +870,34 @@ impl RawDeltaTable { .map_err(PythonError::from)?; for old_add in add_actions { + let old_add = old_add.map_err(PythonError::from)?; let remove_action = Action::Remove(Remove { - path: old_add.path.clone(), + path: old_add.path().to_string(), deletion_timestamp: Some(current_timestamp()), data_change: true, - extended_file_metadata: Some(old_add.tags.is_some()), - partition_values: Some(old_add.partition_values.clone()), - size: Some(old_add.size), - deletion_vector: old_add.deletion_vector.clone(), - tags: old_add.tags.clone(), - base_row_id: old_add.base_row_id, - default_row_commit_version: old_add.default_row_commit_version, + extended_file_metadata: Some(true), + partition_values: Some( + old_add + .partition_values() + .map_err(PythonError::from)? + .iter() + .map(|(k, v)| { + ( + k.to_string(), + if v.is_null() { + None + } else { + Some(v.serialize()) + }, + ) + }) + .collect(), + ), + size: Some(old_add.size()), + deletion_vector: None, + tags: None, + base_row_id: None, + default_row_commit_version: None, }); actions.push(remove_action); } @@ -1069,23 +1094,46 @@ fn convert_partition_filters<'a>( .collect() } -// fn json_value_to_py(value: &serde_json::Value, py: Python) -> PyObject { -// match value { -// serde_json::Value::Null => py.None(), -// serde_json::Value::Bool(val) => val.to_object(py), -// serde_json::Value::Number(val) => { -// if val.is_f64() { -// val.as_f64().expect("not an f64").to_object(py) -// } else if val.is_i64() { -// val.as_i64().expect("not an i64").to_object(py) -// } else { -// val.as_u64().expect("not an u64").to_object(py) -// } -// } -// serde_json::Value::String(val) => val.to_object(py), -// _ => py.None(), -// } -// } +fn scalar_to_py(value: &Scalar, py_date: &PyAny, py: Python) -> PyResult { + use Scalar::*; + + let val = match value { + Null(_) => py.None(), + Boolean(val) => val.to_object(py), + Binary(val) => val.to_object(py), + String(val) => val.to_object(py), + Byte(val) => val.to_object(py), + Short(val) => val.to_object(py), + Integer(val) => val.to_object(py), + Long(val) => val.to_object(py), + Float(val) => val.to_object(py), + Double(val) => val.to_object(py), + // TODO: Since PyArrow 13.0.0, casting string -> timestamp fails if it ends with "Z" + // and the target type is timezone naive. The serialization does not produce "Z", + // but we need to consider timezones when doing timezone ntz. + Timestamp(_) => { + let value = value.serialize(); + println!("timestamp: {}", value); + value.to_object(py) + } + // NOTE: PyArrow 13.0.0 lost the ability to cast from string to date32, so + // we have to implement that manually. + Date(_) => { + let date = py_date.call_method1("fromisoformat", (value.serialize(),))?; + date.to_object(py) + } + Decimal(_, _, _) => value.serialize().to_object(py), + Struct(values, fields) => { + let py_struct = PyDict::new(py); + for (field, value) in fields.iter().zip(values.iter()) { + py_struct.set_item(field.name(), scalar_to_py(value, py_date, py)?)?; + } + py_struct.to_object(py) + } + }; + + Ok(val) +} /// Create expression that file statistics guarantee to be true. /// @@ -1099,129 +1147,129 @@ fn convert_partition_filters<'a>( /// /// Statistics are translated into inequalities. If there are null values, then /// they must be OR'd with is_null. -// fn filestats_to_expression<'py>( -// py: Python<'py>, -// schema: &PyArrowType, -// partitions_values: &HashMap>, -// stats: Option, -// ) -> PyResult> { -// let ds = PyModule::import(py, "pyarrow.dataset")?; -// let field = ds.getattr("field")?; -// let pa = PyModule::import(py, "pyarrow")?; -// let mut expressions: Vec> = Vec::new(); -// -// let cast_to_type = |column_name: &String, value: PyObject, schema: &ArrowSchema| { -// let column_type = schema -// .field_with_name(column_name) -// .map_err(|_| { -// PyValueError::new_err(format!("Column not found in schema: {column_name}")) -// })? -// .data_type() -// .clone(); -// -// let value = match column_type { -// // Since PyArrow 13.0.0, casting string -> timestamp fails if it ends with "Z" -// // and the target type is timezone naive. -// DataType::Timestamp(_, _) if value.extract::(py).is_ok() => { -// value.call_method1(py, "rstrip", ("Z",))? -// } -// // PyArrow 13.0.0 lost the ability to cast from string to date32, so -// // we have to implement that manually. -// DataType::Date32 if value.extract::(py).is_ok() => { -// let date = Python::import(py, "datetime")?.getattr("date")?; -// let date = date.call_method1("fromisoformat", (value,))?; -// date.to_object(py) -// } -// _ => value, -// }; -// -// let column_type = PyArrowType(column_type).into_py(py); -// pa.call_method1("scalar", (value,))? -// .call_method1("cast", (column_type,)) -// }; -// -// for (column, value) in partitions_values.iter() { -// if let Some(value) = value { -// // value is a string, but needs to be parsed into appropriate type -// let converted_value = cast_to_type(column, value.into_py(py), &schema.0)?; -// expressions.push( -// field -// .call1((column,))? -// .call_method1("__eq__", (converted_value,)), -// ); -// } else { -// expressions.push(field.call1((column,))?.call_method0("is_null")); -// } -// } -// -// if let Some(stats) = stats { -// let mut has_nulls_set: HashSet = HashSet::new(); -// -// for (col_name, null_count) in stats.null_count.iter().filter_map(|(k, v)| match v { -// ColumnCountStat::Value(val) => Some((k, val)), -// _ => None, -// }) { -// if *null_count == 0 { -// expressions.push(field.call1((col_name,))?.call_method0("is_valid")); -// } else if *null_count == stats.num_records { -// expressions.push(field.call1((col_name,))?.call_method0("is_null")); -// } else { -// has_nulls_set.insert(col_name.clone()); -// } -// } -// -// for (col_name, minimum) in stats.min_values.iter().filter_map(|(k, v)| match v { -// ColumnValueStat::Value(val) => Some((k.clone(), json_value_to_py(val, py))), -// // TODO(wjones127): Handle nested field statistics. -// // Blocked on https://issues.apache.org/jira/browse/ARROW-11259 -// _ => None, -// }) { -// let maybe_minimum = cast_to_type(&col_name, minimum, &schema.0); -// if let Ok(minimum) = maybe_minimum { -// let field_expr = field.call1((&col_name,))?; -// let expr = field_expr.call_method1("__ge__", (minimum,)); -// let expr = if has_nulls_set.contains(&col_name) { -// // col >= min_value OR col is null -// let is_null_expr = field_expr.call_method0("is_null"); -// expr?.call_method1("__or__", (is_null_expr?,)) -// } else { -// // col >= min_value -// expr -// }; -// expressions.push(expr); -// } -// } -// -// for (col_name, maximum) in stats.max_values.iter().filter_map(|(k, v)| match v { -// ColumnValueStat::Value(val) => Some((k.clone(), json_value_to_py(val, py))), -// _ => None, -// }) { -// let maybe_maximum = cast_to_type(&col_name, maximum, &schema.0); -// if let Ok(maximum) = maybe_maximum { -// let field_expr = field.call1((&col_name,))?; -// let expr = field_expr.call_method1("__le__", (maximum,)); -// let expr = if has_nulls_set.contains(&col_name) { -// // col <= max_value OR col is null -// let is_null_expr = field_expr.call_method0("is_null"); -// expr?.call_method1("__or__", (is_null_expr?,)) -// } else { -// // col <= max_value -// expr -// }; -// expressions.push(expr); -// } -// } -// } -// -// if expressions.is_empty() { -// Ok(None) -// } else { -// expressions -// .into_iter() -// .reduce(|accum, item| accum?.call_method1("__and__", (item?,))) -// .transpose() -// } -// } +fn filestats_to_expression_next<'py>( + py: Python<'py>, + schema: &PyArrowType, + file_info: FileStats<'_>, +) -> PyResult> { + let ds = PyModule::import(py, "pyarrow.dataset")?; + let py_field = ds.getattr("field")?; + let pa = PyModule::import(py, "pyarrow")?; + let py_date = Python::import(py, "datetime")?.getattr("date")?; + let mut expressions: Vec> = Vec::new(); + + let cast_to_type = |column_name: &String, value: PyObject, schema: &ArrowSchema| { + let column_type = schema + .field_with_name(column_name) + .map_err(|_| { + PyValueError::new_err(format!("Column not found in schema: {column_name}")) + })? + .data_type() + .clone(); + let column_type = PyArrowType(column_type).into_py(py); + pa.call_method1("scalar", (value,))? + .call_method1("cast", (column_type,)) + }; + + if let Ok(partitions_values) = file_info.partition_values() { + println!("partition_values: {:?}", partitions_values); + for (column, value) in partitions_values.iter() { + let column = column.to_string(); + if !value.is_null() { + // value is a string, but needs to be parsed into appropriate type + let converted_value = + cast_to_type(&column, scalar_to_py(value, py_date, py)?, &schema.0)?; + expressions.push( + py_field + .call1((&column,))? + .call_method1("__eq__", (converted_value,)), + ); + } else { + expressions.push(py_field.call1((column,))?.call_method0("is_null")); + } + } + } + + let mut has_nulls_set: HashSet = HashSet::new(); + + // NOTE: null_counts should always return a struct scalar. + if let Some(Scalar::Struct(values, fields)) = file_info.null_counts() { + for (field, value) in fields.iter().zip(values.iter()) { + if let Scalar::Long(val) = value { + if *val == 0 { + expressions.push(py_field.call1((field.name(),))?.call_method0("is_valid")); + } else if Some(*val as usize) == file_info.num_records() { + expressions.push(py_field.call1((field.name(),))?.call_method0("is_null")); + } else { + has_nulls_set.insert(field.name().to_string()); + } + } + } + } + + // NOTE: min_values should always return a struct scalar. + if let Some(Scalar::Struct(values, fields)) = file_info.min_values() { + for (field, value) in fields.iter().zip(values.iter()) { + match value { + // TODO: Handle nested field statistics. + Scalar::Struct(_, _) => {} + _ => { + let maybe_minimum = + cast_to_type(field.name(), scalar_to_py(value, py_date, py)?, &schema.0); + if let Ok(minimum) = maybe_minimum { + let field_expr = py_field.call1((field.name(),))?; + let expr = field_expr.call_method1("__ge__", (minimum,)); + let expr = if has_nulls_set.contains(field.name()) { + // col >= min_value OR col is null + let is_null_expr = field_expr.call_method0("is_null"); + expr?.call_method1("__or__", (is_null_expr?,)) + } else { + // col >= min_value + expr + }; + expressions.push(expr); + } + } + } + } + } + + // NOTE: max_values should always return a struct scalar. + if let Some(Scalar::Struct(values, fields)) = file_info.max_values() { + for (field, value) in fields.iter().zip(values.iter()) { + match value { + // TODO: Handle nested field statistics. + Scalar::Struct(_, _) => {} + _ => { + let maybe_maximum = + cast_to_type(field.name(), scalar_to_py(value, py_date, py)?, &schema.0); + if let Ok(maximum) = maybe_maximum { + let field_expr = py_field.call1((field.name(),))?; + let expr = field_expr.call_method1("__le__", (maximum,)); + let expr = if has_nulls_set.contains(field.name()) { + // col <= max_value OR col is null + let is_null_expr = field_expr.call_method0("is_null"); + expr?.call_method1("__or__", (is_null_expr?,)) + } else { + // col <= max_value + expr + }; + expressions.push(expr); + } + } + } + } + } + + if expressions.is_empty() { + Ok(None) + } else { + expressions + .into_iter() + .reduce(|accum, item| accum?.call_method1("__and__", (item?,))) + .transpose() + } +} #[pyfunction] fn rust_core_version() -> &'static str { diff --git a/python/tests/test_version.py b/python/tests/test_version.py index 8cf22d8045..df1442a66e 100644 --- a/python/tests/test_version.py +++ b/python/tests/test_version.py @@ -1,6 +1,6 @@ from deltalake import rust_core_version -def test_read_simple_table_to_dict() -> None: +def test_version() -> None: v = rust_core_version() assert len(v.split(".")) == 3