From 2d9a5eaba7ba9c254439a2b6b986e85496b96c67 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Mon, 15 Jan 2024 23:18:20 +0100 Subject: [PATCH] feat: improve logical file handling --- crates/benchmarks/src/bin/merge.rs | 2 +- .../src/delta_datafusion/mod.rs | 2 +- .../src/kernel/actions/types.rs | 5 + crates/deltalake-core/src/kernel/arrow/mod.rs | 4 +- crates/deltalake-core/src/kernel/schema.rs | 2 +- .../src/kernel/snapshot/log_data.rs | 185 +++++++++++++++--- .../deltalake-core/src/kernel/snapshot/mod.rs | 19 +- .../src/kernel/snapshot/replay.rs | 95 +++++---- crates/deltalake-core/src/lib.rs | 4 +- .../src/operations/merge/mod.rs | 26 +-- .../src/operations/transaction/state.rs | 2 +- crates/deltalake-core/src/operations/write.rs | 31 +-- .../src/protocol/checkpoints.rs | 4 +- crates/deltalake-core/src/protocol/mod.rs | 20 +- crates/deltalake-core/src/table/mod.rs | 9 +- crates/deltalake-core/src/table/state.rs | 4 +- .../tests/command_filesystem_check.rs | 12 +- .../deltalake-core/tests/command_optimize.rs | 26 +-- .../deltalake-core/tests/command_restore.rs | 8 +- python/src/lib.rs | 4 +- python/tests/test_writer.py | 2 +- 21 files changed, 290 insertions(+), 176 deletions(-) diff --git a/crates/benchmarks/src/bin/merge.rs b/crates/benchmarks/src/bin/merge.rs index 2f68fdc398..ea43171052 100644 --- a/crates/benchmarks/src/bin/merge.rs +++ b/crates/benchmarks/src/bin/merge.rs @@ -193,7 +193,7 @@ async fn benchmark_merge_tpcds( merge: fn(DataFrame, DeltaTable) -> Result, ) -> Result<(core::time::Duration, MergeMetrics), DataFusionError> { let table = DeltaTableBuilder::from_uri(path).load().await?; - let file_count = table.snapshot()?.file_actions()?.len(); + let file_count = table.snapshot()?.files_count(); let provider = DeltaTableProvider::try_new( table.snapshot()?.clone(), diff --git a/crates/deltalake-core/src/delta_datafusion/mod.rs b/crates/deltalake-core/src/delta_datafusion/mod.rs index 85d6a4df34..443d5ef4b5 100644 --- a/crates/deltalake-core/src/delta_datafusion/mod.rs +++ b/crates/deltalake-core/src/delta_datafusion/mod.rs @@ -481,7 +481,7 @@ impl TableProvider for DeltaTable { } fn statistics(&self) -> Option { - self.get_state()?.datafusion_table_statistics() + self.snapshot().ok()?.datafusion_table_statistics() } } diff --git a/crates/deltalake-core/src/kernel/actions/types.rs b/crates/deltalake-core/src/kernel/actions/types.rs index a324457ae6..b3321ade68 100644 --- a/crates/deltalake-core/src/kernel/actions/types.rs +++ b/crates/deltalake-core/src/kernel/actions/types.rs @@ -661,6 +661,11 @@ impl Remove { pub fn dv_unique_id(&self) -> Option { self.deletion_vector.clone().map(|dv| dv.unique_id()) } + + /// Convert into Action::Remove + pub fn into_action(self) -> super::Action { + super::Action::Remove(self) + } } /// Delta AddCDCFile action that describes a parquet CDC data file. diff --git a/crates/deltalake-core/src/kernel/arrow/mod.rs b/crates/deltalake-core/src/kernel/arrow/mod.rs index a8bac82fe1..4dc1e7d2f4 100644 --- a/crates/deltalake-core/src/kernel/arrow/mod.rs +++ b/crates/deltalake-core/src/kernel/arrow/mod.rs @@ -18,7 +18,7 @@ pub mod schemas; const MAP_ROOT_DEFAULT: &str = "entries"; const MAP_KEY_DEFAULT: &str = "keys"; const MAP_VALUE_DEFAULT: &str = "values"; -const LIST_ROOT_DEFAULT: &str = "element"; +const LIST_ROOT_DEFAULT: &str = "item"; impl TryFrom for ArrowField { type Error = ArrowError; @@ -71,7 +71,7 @@ impl TryFrom<&ArrayType> for ArrowField { LIST_ROOT_DEFAULT, ArrowDataType::try_from(a.element_type())?, // TODO check how to handle nullability - true, // a.contains_null(), + a.contains_null(), )) } } diff --git a/crates/deltalake-core/src/kernel/schema.rs b/crates/deltalake-core/src/kernel/schema.rs index c7c7f3cdb4..882889a91c 100644 --- a/crates/deltalake-core/src/kernel/schema.rs +++ b/crates/deltalake-core/src/kernel/schema.rs @@ -544,7 +544,7 @@ impl Display for PrimitiveType { PrimitiveType::Date => write!(f, "date"), PrimitiveType::Timestamp => write!(f, "timestamp"), PrimitiveType::Decimal(precision, scale) => { - write!(f, "decimal({}, {})", precision, scale) + write!(f, "decimal({},{})", precision, scale) } } } diff --git a/crates/deltalake-core/src/kernel/snapshot/log_data.rs b/crates/deltalake-core/src/kernel/snapshot/log_data.rs index 8ecceeddcc..47885805eb 100644 --- a/crates/deltalake-core/src/kernel/snapshot/log_data.rs +++ b/crates/deltalake-core/src/kernel/snapshot/log_data.rs @@ -2,14 +2,16 @@ use std::borrow::Cow; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; -use arrow_array::{Array, Int64Array, MapArray, RecordBatch, StringArray, StructArray}; +use arrow_array::{Array, Int32Array, Int64Array, MapArray, RecordBatch, StringArray, StructArray}; use chrono::{NaiveDateTime, TimeZone, Utc}; use object_store::path::Path; use object_store::ObjectMeta; use percent_encoding::percent_decode_str; -use super::extract::extract_and_cast; -use crate::kernel::{DataType, Metadata, Scalar, StructField, StructType}; +use super::extract::{extract_and_cast, extract_and_cast_opt}; +use crate::kernel::{ + DataType, DeletionVectorDescriptor, Metadata, Remove, Scalar, StructField, StructType, +}; use crate::{DeltaResult, DeltaTableError}; const COL_NUM_RECORDS: &str = "numRecords"; @@ -58,27 +60,101 @@ impl PartitionsExt for Arc { } } -/// A view into the log data for a single logical file. +/// Defines a deletion vector +#[derive(Debug, PartialEq, Clone)] +pub struct DeletionVector<'a> { + storage_type: &'a StringArray, + path_or_inline_dv: &'a StringArray, + size_in_bytes: &'a Int32Array, + cardinality: &'a Int64Array, + offset: Option<&'a Int32Array>, +} + +/// View into a deletion vector data. +#[derive(Debug)] +pub struct DeletionVectorView<'a> { + data: &'a DeletionVector<'a>, + /// Pointer to a specific row in the log data. + index: usize, +} + +impl<'a> DeletionVectorView<'a> { + /// get a unique idenitfier for the deletion vector + pub fn unique_id(&self) -> String { + if let Some(offset) = self.offset() { + format!( + "{}{}@{offset}", + self.storage_type(), + self.path_or_inline_dv() + ) + } else { + format!("{}{}", self.storage_type(), self.path_or_inline_dv()) + } + } + + fn descriptor(&self) -> DeletionVectorDescriptor { + DeletionVectorDescriptor { + storage_type: self.storage_type().parse().unwrap(), + path_or_inline_dv: self.path_or_inline_dv().to_string(), + size_in_bytes: self.size_in_bytes(), + cardinality: self.cardinality(), + offset: self.offset(), + } + } + + fn storage_type(&self) -> &str { + self.data.storage_type.value(self.index) + } + fn path_or_inline_dv(&self) -> &str { + self.data.path_or_inline_dv.value(self.index) + } + fn size_in_bytes(&self) -> i32 { + self.data.size_in_bytes.value(self.index) + } + fn cardinality(&self) -> i64 { + self.data.cardinality.value(self.index) + } + fn offset(&self) -> Option { + self.data + .offset + .and_then(|a| a.is_null(self.index).then(|| a.value(self.index))) + } +} + +/// A view into the log data representiang a single logical file. +/// +/// This stuct holds a pointer to a specific row in the log data and provides access to the +/// information stored in that row by tracking references to the underlying arrays. +/// +/// Additionally, references to some table metadata is tracked to provide higher level +/// functionality, e.g. parsing partition values. #[derive(Debug, PartialEq)] -pub struct FileStats<'a> { +pub struct LogicalFile<'a> { path: &'a StringArray, + /// The on-disk size of this data file in bytes size: &'a Int64Array, + /// Last modification time of the file in milliseconds since the epoch. modification_time: &'a Int64Array, + /// The partition values for this logical file. partition_values: &'a MapArray, - partition_fields: PartitionFields<'a>, + /// Struct containing all available statistics for the columns in this file. stats: &'a StructArray, + /// Array containing the deletion vector data. + deletion_vector: Option>, + + /// Pointer to a specific row in the log data. index: usize, + /// Schema fields the table is partitioned by. + partition_fields: PartitionFields<'a>, } -impl FileStats<'_> { +impl LogicalFile<'_> { /// Path to the files storage location. pub fn path(&self) -> Cow<'_, str> { - percent_decode_str(self.path.value(self.index)) - .decode_utf8() - .unwrap() + percent_decode_str(self.path.value(self.index)).decode_utf8_lossy() } - /// an object store [`Path`] to the file. + /// An object store [`Path`] to the file. /// /// this tries to parse the file string and if that fails, it will return the string as is. // TODO assert consisent handling of the paths encoding when reading log data so this logic can be removed. @@ -114,7 +190,6 @@ impl FileStats<'_> { } /// The partition values for this logical file. - // TODO make this fallible pub fn partition_values(&self) -> DeltaResult> { if self.partition_fields.is_empty() { return Ok(BTreeMap::new()); @@ -165,6 +240,18 @@ impl FileStats<'_> { .collect::>>() } + /// Defines a deletion vector + pub fn deletion_vector(&self) -> Option> { + self.deletion_vector.as_ref().and_then(|arr| { + arr.storage_type + .is_valid(self.index) + .then(|| DeletionVectorView { + data: arr, + index: self.index, + }) + }) + } + /// The number of records stored in the data file. pub fn num_records(&self) -> Option { self.stats @@ -193,19 +280,46 @@ impl FileStats<'_> { .column_by_name(COL_MAX_VALUES) .and_then(|c| Scalar::from_array(c.as_ref(), self.index)) } + + /// Create a remove action for this logical file. + pub fn remove_action(&self, data_change: bool) -> Remove { + Remove { + // TODO use the raw (still encoded) path here once we reconciled serde ... + path: self.path().to_string(), + data_change, + deletion_timestamp: Some(Utc::now().timestamp_millis()), + extended_file_metadata: Some(true), + size: Some(self.size()), + partition_values: self.partition_values().ok().map(|pv| { + pv.iter() + .map(|(k, v)| { + ( + k.to_string(), + if v.is_null() { + None + } else { + Some(v.serialize()) + }, + ) + }) + .collect() + }), + deletion_vector: self.deletion_vector().map(|dv| dv.descriptor()), + tags: None, + base_row_id: None, + default_row_commit_version: None, + } + } } -impl<'a> TryFrom<&FileStats<'a>> for ObjectMeta { +impl<'a> TryFrom<&LogicalFile<'a>> for ObjectMeta { type Error = DeltaTableError; - fn try_from(file_stats: &FileStats<'a>) -> Result { - let location = file_stats.object_store_path(); - let size = file_stats.size() as usize; - let last_modified = file_stats.modification_datetime()?; + fn try_from(file_stats: &LogicalFile<'a>) -> Result { Ok(ObjectMeta { - location, - size, - last_modified, + location: file_stats.object_store_path(), + size: file_stats.size() as usize, + last_modified: file_stats.modification_datetime()?, version: None, e_tag: None, }) @@ -219,6 +333,7 @@ pub struct FileStatsAccessor<'a> { sizes: &'a Int64Array, modification_times: &'a Int64Array, stats: &'a StructArray, + deletion_vector: Option>, partition_values: &'a MapArray, length: usize, pointer: usize, @@ -242,39 +357,57 @@ impl<'a> FileStatsAccessor<'a> { .map(|c| Ok((c.as_str(), schema.field_with_name(c.as_str())?))) .collect::>>()?, ); + let deletion_vector = extract_and_cast_opt::(data, "add.deletionVector"); + let deletion_vector = deletion_vector.and_then(|dv| { + let storage_type = extract_and_cast::(dv, "storageType").ok()?; + let path_or_inline_dv = extract_and_cast::(dv, "pathOrInlineDv").ok()?; + let size_in_bytes = extract_and_cast::(dv, "sizeInBytes").ok()?; + let cardinality = extract_and_cast::(dv, "cardinality").ok()?; + let offset = extract_and_cast_opt::(dv, "offset"); + Some(DeletionVector { + storage_type, + path_or_inline_dv, + size_in_bytes, + cardinality, + offset, + }) + }); + Ok(Self { partition_fields, paths, sizes, modification_times, stats, + deletion_vector, partition_values, length: data.num_rows(), pointer: 0, }) } - pub(crate) fn get(&self, index: usize) -> DeltaResult> { + pub(crate) fn get(&self, index: usize) -> DeltaResult> { if index >= self.length { return Err(DeltaTableError::Generic(format!( "index out of bounds: {} >= {}", index, self.length ))); } - Ok(FileStats { + Ok(LogicalFile { path: self.paths, size: self.sizes, modification_time: self.modification_times, partition_values: self.partition_values, partition_fields: self.partition_fields.clone(), stats: self.stats, + deletion_vector: self.deletion_vector.clone(), index, }) } } impl<'a> Iterator for FileStatsAccessor<'a> { - type Item = FileStats<'a>; + type Item = LogicalFile<'a>; fn next(&mut self) -> Option { if self.pointer >= self.length { @@ -312,7 +445,7 @@ impl<'a> LogDataHandler<'a> { } impl<'a> IntoIterator for LogDataHandler<'a> { - type Item = FileStats<'a>; + type Item = LogicalFile<'a>; type IntoIter = Box + 'a>; fn into_iter(self) -> Self::IntoIter { @@ -557,7 +690,7 @@ mod tests { .snapshot() .unwrap() .snapshot - .file_stats() + .files() .find(|f| { f.path().ends_with( "part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet", @@ -569,7 +702,7 @@ mod tests { .snapshot() .unwrap() .snapshot - .file_stats() + .files() .find(|f| { f.path().ends_with( "part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet", diff --git a/crates/deltalake-core/src/kernel/snapshot/mod.rs b/crates/deltalake-core/src/kernel/snapshot/mod.rs index 2f24247bf5..02e70e2931 100644 --- a/crates/deltalake-core/src/kernel/snapshot/mod.rs +++ b/crates/deltalake-core/src/kernel/snapshot/mod.rs @@ -10,6 +10,10 @@ //! //! The sub modules provide structures and methods that aid in generating //! and consuming snapshots. +//! +//! ## Reading the log +//! +//! use std::io::{BufRead, BufReader, Cursor}; use std::sync::Arc; @@ -23,7 +27,7 @@ use object_store::ObjectStore; use self::log_segment::{CommitData, LogSegment, PathExt}; use self::parse::{read_adds, read_removes}; -use self::replay::{LogReplayScanner, ReplayStream}; +use self::replay::{LogMapper, LogReplayScanner, ReplayStream}; use super::{Action, Add, CommitInfo, Metadata, Protocol, Remove}; use crate::kernel::StructType; use crate::table::config::TableConfig; @@ -316,6 +320,11 @@ impl EagerSnapshot { let mut files = Vec::new(); let mut scanner = LogReplayScanner::new(); files.push(scanner.process_files_batch(&batch, true)?); + let mapper = LogMapper::try_new(snapshot.schema(), snapshot.config.clone())?; + files = files + .into_iter() + .map(|b| mapper.map_batch(b)) + .collect::>>()?; Ok(Self { snapshot, files }) } @@ -350,12 +359,14 @@ impl EagerSnapshot { ) .boxed() }; + let mapper = LogMapper::try_new(self.snapshot.schema(), self.snapshot.config.clone())?; let files = ReplayStream::try_new( log_stream, checkpoint_stream, self.schema(), self.snapshot.config.clone(), )? + .map(|batch| batch.and_then(|b| mapper.map_batch(b))) .try_collect() .await?; @@ -423,7 +434,7 @@ impl EagerSnapshot { } /// Get a file action iterator for the given version - pub fn file_stats(&self) -> impl Iterator> { + pub fn files(&self) -> impl Iterator> { self.log_data().into_iter() } @@ -464,6 +475,7 @@ impl EagerSnapshot { files.push(scanner.process_files_batch(&batch?, true)?); } + let mapper = LogMapper::try_new(self.snapshot.schema(), self.snapshot.config.clone())?; self.files = files .into_iter() .chain( @@ -471,7 +483,8 @@ impl EagerSnapshot { .iter() .flat_map(|batch| scanner.process_files_batch(batch, false)), ) - .collect(); + .map(|b| mapper.map_batch(b)) + .collect::>>()?; if let Some(metadata) = metadata { self.snapshot.metadata = metadata; diff --git a/crates/deltalake-core/src/kernel/snapshot/replay.rs b/crates/deltalake-core/src/kernel/snapshot/replay.rs index 3b2066af85..cffd1d1670 100644 --- a/crates/deltalake-core/src/kernel/snapshot/replay.rs +++ b/crates/deltalake-core/src/kernel/snapshot/replay.rs @@ -15,6 +15,7 @@ use arrow_select::filter::filter_record_batch; use futures::Stream; use hashbrown::HashSet; use itertools::Itertools; +use percent_encoding::percent_decode_str; use pin_project_lite::pin_project; use tracing::debug; @@ -29,9 +30,7 @@ pin_project! { pub struct ReplayStream { scanner: LogReplayScanner, - stats_schema: ArrowSchemaRef, - - config: DeltaTableConfig, + mapper: Arc, #[pin] commits: S, @@ -58,6 +57,32 @@ fn to_count_field(field: &StructField) -> Option { } } +pub(super) fn get_stats_schema(table_schema: &StructType) -> DeltaResult { + let data_fields: Vec<_> = table_schema + .fields + .iter() + .enumerate() + .filter_map(|(idx, f)| match f.data_type() { + DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => None, + // TODO: the number of stats fields shopuld be configurable? + // or rather we should likely read all of we parse JSON? + _ if idx < 32 => Some(StructField::new(f.name(), f.data_type().clone(), true)), + _ => None, + }) + .collect(); + let stats_schema = StructType::new(vec![ + StructField::new("numRecords", DataType::LONG, true), + StructField::new("minValues", StructType::new(data_fields.clone()), true), + StructField::new("maxValues", StructType::new(data_fields.clone()), true), + StructField::new( + "nullCount", + StructType::new(data_fields.iter().filter_map(to_count_field).collect()), + true, + ), + ]); + Ok(std::sync::Arc::new((&stats_schema).try_into()?)) +} + impl ReplayStream { pub(super) fn try_new( commits: S, @@ -65,40 +90,39 @@ impl ReplayStream { table_schema: &Schema, config: DeltaTableConfig, ) -> DeltaResult { - let data_fields: Vec<_> = table_schema - .fields - .iter() - .enumerate() - .filter_map(|(idx, f)| match f.data_type() { - DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => None, - // TODO: the number of stats fields shopuld be configurable? - // or rather we should likely read all of we parse JSON? - _ if idx < 32 => Some(StructField::new(f.name(), f.data_type().clone(), true)), - _ => None, - }) - .collect(); - let stats_schema = StructType::new(vec![ - StructField::new("numRecords", DataType::LONG, true), - StructField::new("minValues", StructType::new(data_fields.clone()), true), - StructField::new("maxValues", StructType::new(data_fields.clone()), true), - StructField::new( - "nullCount", - StructType::new(data_fields.iter().filter_map(to_count_field).collect()), - true, - ), - ]); - let stats_schema = std::sync::Arc::new((&stats_schema).try_into()?); + let stats_schema = get_stats_schema(table_schema)?; + let mapper = Arc::new(LogMapper { + stats_schema, + config, + }); Ok(Self { commits, checkpoint, - stats_schema, - config, + mapper, scanner: LogReplayScanner::new(), }) } } -fn map_batch( +pub(super) struct LogMapper { + stats_schema: ArrowSchemaRef, + config: DeltaTableConfig, +} + +impl LogMapper { + pub(super) fn try_new(table_schema: &Schema, config: DeltaTableConfig) -> DeltaResult { + Ok(Self { + stats_schema: get_stats_schema(table_schema)?, + config, + }) + } + + pub fn map_batch(&self, batch: RecordBatch) -> DeltaResult { + map_batch(batch, self.stats_schema.clone(), &self.config) + } +} + +pub(super) fn map_batch( batch: RecordBatch, stats_schema: ArrowSchemaRef, config: &DeltaTableConfig, @@ -162,7 +186,7 @@ where let this = self.project(); let res = this.commits.poll_next(cx).map(|b| match b { Some(Ok(batch)) => match this.scanner.process_files_batch(&batch, true) { - Ok(filtered) => Some(map_batch(filtered, this.stats_schema.clone(), this.config)), + Ok(filtered) => Some(this.mapper.map_batch(filtered)), Err(e) => Some(Err(e)), }, Some(Err(e)) => Some(Err(e)), @@ -171,9 +195,7 @@ where if matches!(res, Poll::Ready(None)) { this.checkpoint.poll_next(cx).map(|b| match b { Some(Ok(batch)) => match this.scanner.process_files_batch(&batch, false) { - Ok(filtered) => { - Some(map_batch(filtered, this.stats_schema.clone(), this.config)) - } + Ok(filtered) => Some(this.mapper.map_batch(filtered)), Err(e) => Some(Err(e)), }, Some(Err(e)) => Some(Err(e)), @@ -210,17 +232,18 @@ pub(super) struct DVInfo<'a> { } fn seen_key(info: &FileInfo<'_>) -> String { + let path = percent_decode_str(info.path).decode_utf8_lossy(); if let Some(dv) = &info.dv { if let Some(offset) = &dv.offset { format!( "{}::{}{}@{offset}", - info.path, dv.storage_type, dv.path_or_inline_dv + path, dv.storage_type, dv.path_or_inline_dv ) } else { - format!("{}::{}{}", info.path, dv.storage_type, dv.path_or_inline_dv) + format!("{}::{}{}", path, dv.storage_type, dv.path_or_inline_dv) } } else { - info.path.to_string() + path.to_string() } } diff --git a/crates/deltalake-core/src/lib.rs b/crates/deltalake-core/src/lib.rs index dad58fd1b9..329cfab3af 100644 --- a/crates/deltalake-core/src/lib.rs +++ b/crates/deltalake-core/src/lib.rs @@ -189,7 +189,7 @@ mod tests { ] ); let tombstones = table - .get_state() + .snapshot() .unwrap() .all_tombstones(table.object_store().clone()) .await @@ -307,7 +307,7 @@ mod tests { null_counts.values().iter().for_each(|x| assert_eq!(*x, 0)); let tombstones = table - .get_state() + .snapshot() .unwrap() .all_tombstones(table.object_store().clone()) .await diff --git a/crates/deltalake-core/src/operations/merge/mod.rs b/crates/deltalake-core/src/operations/merge/mod.rs index 70403c4091..8e1bc9ad6c 100644 --- a/crates/deltalake-core/src/operations/merge/mod.rs +++ b/crates/deltalake-core/src/operations/merge/mod.rs @@ -30,7 +30,7 @@ use std::collections::HashMap; use std::sync::Arc; -use std::time::{Instant, SystemTime, UNIX_EPOCH}; +use std::time::Instant; use arrow_schema::Schema as ArrowSchema; use async_trait::async_trait; @@ -72,7 +72,7 @@ use crate::delta_datafusion::{ execute_plan_to_batch, register_store, DeltaColumn, DeltaScanConfigBuilder, DeltaSessionConfig, DeltaTableProvider, }; -use crate::kernel::{Action, Remove}; +use crate::kernel::Action; use crate::logstore::LogStoreRef; use crate::operations::merge::barrier::find_barrier_node; use crate::operations::write::write_execution_plan; @@ -1318,11 +1318,6 @@ async fn execute( metrics.rewrite_time_ms = Instant::now().duration_since(rewrite_start).as_millis() as u64; - let deletion_timestamp = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() as i64; - let mut actions: Vec = add_actions.into_iter().map(Action::Add).collect(); metrics.num_target_files_added = actions.len(); @@ -1334,21 +1329,10 @@ async fn execute( { let lock = survivors.lock().unwrap(); - for action in snapshot.file_actions()? { - if lock.contains(&action.path) { + for action in snapshot.log_data() { + if lock.contains(action.path().as_ref()) { metrics.num_target_files_removed += 1; - actions.push(Action::Remove(Remove { - path: action.path.clone(), - deletion_timestamp: Some(deletion_timestamp), - data_change: true, - extended_file_metadata: Some(true), - partition_values: Some(action.partition_values.clone()), - deletion_vector: action.deletion_vector.clone(), - size: Some(action.size), - tags: None, - base_row_id: action.base_row_id, - default_row_commit_version: action.default_row_commit_version, - })) + actions.push(action.remove_action(true).into_action()) } } } diff --git a/crates/deltalake-core/src/operations/transaction/state.rs b/crates/deltalake-core/src/operations/transaction/state.rs index 6a48ea729f..d3f680fcea 100644 --- a/crates/deltalake-core/src/operations/transaction/state.rs +++ b/crates/deltalake-core/src/operations/transaction/state.rs @@ -320,7 +320,7 @@ impl PruningStatistics for DeltaTableState { /// return the number of containers (e.g. row groups) being /// pruned with these statistics fn num_containers(&self) -> usize { - self.file_actions().unwrap().len() + self.files_count() } /// return the number of null values for the named column as an diff --git a/crates/deltalake-core/src/operations/write.rs b/crates/deltalake-core/src/operations/write.rs index 739ad18b42..ed0676fc7c 100644 --- a/crates/deltalake-core/src/operations/write.rs +++ b/crates/deltalake-core/src/operations/write.rs @@ -27,7 +27,6 @@ use std::collections::HashMap; use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; use arrow_array::RecordBatch; use arrow_cast::can_cast_types; @@ -43,7 +42,7 @@ use super::writer::{DeltaWriter, WriterConfig}; use super::{transaction::commit, CreateBuilder}; use crate::delta_datafusion::DeltaDataChecker; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{Action, Add, PartitionsExt, Remove, StructType}; +use crate::kernel::{Action, Add, PartitionsExt, StructType}; use crate::logstore::LogStoreRef; use crate::protocol::{DeltaOperation, SaveMode}; use crate::storage::ObjectStoreRef; @@ -501,27 +500,6 @@ impl std::future::IntoFuture for WriteBuilder { metadata.schema_string = serde_json::to_string(&delta_schema)?; actions.push(Action::Metadata(metadata)); } - // This should never error, since now() will always be larger than UNIX_EPOCH - let deletion_timestamp = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() as i64; - - let to_remove_action = |add: &Add| { - Action::Remove(Remove { - path: add.path.clone(), - deletion_timestamp: Some(deletion_timestamp), - data_change: true, - extended_file_metadata: Some(false), - partition_values: Some(add.partition_values.clone()), - size: Some(add.size), - // TODO add file metadata to remove action (tags missing) - tags: None, - deletion_vector: add.deletion_vector.clone(), - base_row_id: add.base_row_id, - default_row_commit_version: add.default_row_commit_version, - }) - }; match this.predicate { Some(_pred) => { @@ -532,10 +510,9 @@ impl std::future::IntoFuture for WriteBuilder { } _ => { let remove_actions = snapshot - .file_actions()? - .iter() - .map(to_remove_action) - .collect::>(); + .log_data() + .into_iter() + .map(|p| p.remove_action(true).into_action()); actions.extend(remove_actions); } } diff --git a/crates/deltalake-core/src/protocol/checkpoints.rs b/crates/deltalake-core/src/protocol/checkpoints.rs index dd11c93e79..e4a155e477 100644 --- a/crates/deltalake-core/src/protocol/checkpoints.rs +++ b/crates/deltalake-core/src/protocol/checkpoints.rs @@ -755,7 +755,7 @@ mod tests { let log_retention_timestamp = (Utc::now().timestamp_millis() + Duration::days(31).num_milliseconds()) - table - .get_state() + .snapshot() .unwrap() .table_config() .log_retention_duration() @@ -783,7 +783,7 @@ mod tests { let log_retention_timestamp = (Utc::now().timestamp_millis() + Duration::days(32).num_milliseconds()) - table - .get_state() + .snapshot() .unwrap() .table_config() .log_retention_duration() diff --git a/crates/deltalake-core/src/protocol/mod.rs b/crates/deltalake-core/src/protocol/mod.rs index 92049daa55..266ef605e7 100644 --- a/crates/deltalake-core/src/protocol/mod.rs +++ b/crates/deltalake-core/src/protocol/mod.rs @@ -864,7 +864,7 @@ mod tests { // test table with partitions let path = "../deltalake-test/tests/data/delta-0.8.0-null-partition"; let table = crate::open_table(path).await.unwrap(); - let actions = table.get_state().unwrap().add_actions_table(true).unwrap(); + let actions = table.snapshot().unwrap().add_actions_table(true).unwrap(); let actions = sort_batch_by(&actions, "path").unwrap(); let mut expected_columns: Vec<(&str, ArrayRef)> = vec![ @@ -883,7 +883,7 @@ mod tests { assert_eq!(expected, actions); - let actions = table.get_state().unwrap().add_actions_table(false).unwrap(); + let actions = table.snapshot().unwrap().add_actions_table(false).unwrap(); let actions = sort_batch_by(&actions, "path").unwrap(); expected_columns[4] = ( @@ -903,7 +903,7 @@ mod tests { // test table with partitions let path = "../deltalake-test/tests/data/table_with_deletion_logs"; let table = crate::open_table(path).await.unwrap(); - let actions = table.get_state().unwrap().add_actions_table(true).unwrap(); + let actions = table.snapshot().unwrap().add_actions_table(true).unwrap(); let actions = sort_batch_by(&actions, "path").unwrap(); let actions = actions .project(&[ @@ -961,7 +961,7 @@ mod tests { assert_eq!(expected, actions); - let actions = table.get_state().unwrap().add_actions_table(false).unwrap(); + let actions = table.snapshot().unwrap().add_actions_table(false).unwrap(); let actions = sort_batch_by(&actions, "path").unwrap(); let actions = actions .project(&[ @@ -1010,7 +1010,7 @@ mod tests { let path = "../deltalake-test/tests/data/simple_table"; let table = crate::open_table(path).await.unwrap(); - let actions = table.get_state().unwrap().add_actions_table(true).unwrap(); + let actions = table.snapshot().unwrap().add_actions_table(true).unwrap(); let actions = sort_batch_by(&actions, "path").unwrap(); let expected_columns: Vec<(&str, ArrayRef)> = vec![ @@ -1049,7 +1049,7 @@ mod tests { assert_eq!(expected, actions); - let actions = table.get_state().unwrap().add_actions_table(false).unwrap(); + let actions = table.snapshot().unwrap().add_actions_table(false).unwrap(); let actions = sort_batch_by(&actions, "path").unwrap(); // For now, this column is ignored. @@ -1067,7 +1067,7 @@ mod tests { // test table with column mapping and partitions let path = "../deltalake-test/tests/data/table_with_column_mapping"; let table = crate::open_table(path).await.unwrap(); - let actions = table.get_state().unwrap().add_actions_table(true).unwrap(); + let actions = table.snapshot().unwrap().add_actions_table(true).unwrap(); let expected_columns: Vec<(&str, ArrayRef)> = vec![ ( "path", @@ -1141,7 +1141,7 @@ mod tests { // test table with stats let path = "../deltalake-test/tests/data/delta-0.8.0"; let table = crate::open_table(path).await.unwrap(); - let actions = table.get_state().unwrap().add_actions_table(true).unwrap(); + let actions = table.snapshot().unwrap().add_actions_table(true).unwrap(); let actions = sort_batch_by(&actions, "path").unwrap(); let expected_columns: Vec<(&str, ArrayRef)> = vec![ @@ -1187,7 +1187,7 @@ mod tests { let mut table = crate::open_table(path).await.unwrap(); table.load_version(1).await.unwrap(); - let actions = table.get_state().unwrap().add_actions_table(true).unwrap(); + let actions = table.snapshot().unwrap().add_actions_table(true).unwrap(); let expected_columns: Vec<(&str, ArrayRef)> = vec![ ( @@ -1368,7 +1368,7 @@ mod tests { ); assert_eq!(expected, actions); - let actions = table.get_state().unwrap().add_actions_table(false).unwrap(); + let actions = table.snapshot().unwrap().add_actions_table(false).unwrap(); // For brevity, just checking a few nested columns in stats assert_eq!( diff --git a/crates/deltalake-core/src/table/mod.rs b/crates/deltalake-core/src/table/mod.rs index 662bd32419..ad3133a112 100644 --- a/crates/deltalake-core/src/table/mod.rs +++ b/crates/deltalake-core/src/table/mod.rs @@ -16,7 +16,7 @@ use tracing::debug; use self::builder::DeltaTableConfig; use self::state::DeltaTableState; use crate::kernel::{ - Action, CommitInfo, DataCheck, DataType, FileStats, Metadata, Protocol, StructType, + Action, CommitInfo, DataCheck, DataType, LogicalFile, Metadata, Protocol, StructType, }; use crate::logstore::{self, LogStoreConfig, LogStoreRef}; use crate::partitions::PartitionFilter; @@ -412,7 +412,7 @@ impl DeltaTable { pub fn get_active_add_actions_by_partitions<'a>( &'a self, filters: &'a [PartitionFilter], - ) -> Result>> + '_, DeltaTableError> { + ) -> Result>> + '_, DeltaTableError> { self.state .as_ref() .ok_or(DeltaTableError::NoMetadata)? @@ -476,11 +476,6 @@ impl DeltaTable { self.state.as_ref().ok_or(DeltaTableError::NotInitialized) } - /// Returns the currently loaded state snapshot. - pub fn get_state(&self) -> Option<&DeltaTableState> { - self.state.as_ref() - } - /// Returns current table protocol pub fn protocol(&self) -> DeltaResult<&Protocol> { Ok(self diff --git a/crates/deltalake-core/src/table/state.rs b/crates/deltalake-core/src/table/state.rs index e6be024715..104ba2bd32 100644 --- a/crates/deltalake-core/src/table/state.rs +++ b/crates/deltalake-core/src/table/state.rs @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize}; use super::config::TableConfig; use super::{get_partition_col_data_types, DeltaTableConfig}; use crate::kernel::{ - Action, Add, DataType, EagerSnapshot, FileStats, LogDataHandler, Metadata, Protocol, Remove, + Action, Add, DataType, EagerSnapshot, LogDataHandler, LogicalFile, Metadata, Protocol, Remove, StructType, }; use crate::partitions::{DeltaTablePartition, PartitionFilter}; @@ -207,7 +207,7 @@ impl DeltaTableState { pub fn get_active_add_actions_by_partitions<'a>( &'a self, filters: &'a [PartitionFilter], - ) -> Result>> + '_, DeltaTableError> { + ) -> Result>> + '_, DeltaTableError> { let current_metadata = self.metadata(); let nonpartitioned_columns: Vec = filters diff --git a/crates/deltalake-core/tests/command_filesystem_check.rs b/crates/deltalake-core/tests/command_filesystem_check.rs index d61970c188..fdc1e6fae7 100644 --- a/crates/deltalake-core/tests/command_filesystem_check.rs +++ b/crates/deltalake-core/tests/command_filesystem_check.rs @@ -23,20 +23,20 @@ async fn test_filesystem_check(context: &IntegrationContext) -> TestResult { let table = context.table_builder(TestTables::Simple).load().await?; let version = table.snapshot()?.version(); - let active = table.snapshot()?.file_actions()?.len(); + let active = table.snapshot()?.files_count(); // Validate a Dry run does not mutate the table log and indentifies orphaned add actions let op = DeltaOps::from(table); let (table, metrics) = op.filesystem_check().with_dry_run(true).await?; assert_eq!(version, table.snapshot()?.version()); - assert_eq!(active, table.snapshot()?.file_actions()?.len()); + assert_eq!(active, table.snapshot()?.files_count()); assert_eq!(vec![file.to_string()], metrics.files_removed); // Validate a run updates the table version with proper remove actions let op = DeltaOps::from(table); let (table, metrics) = op.filesystem_check().await?; assert_eq!(version + 1, table.snapshot()?.version()); - assert_eq!(active - 1, table.snapshot()?.file_actions()?.len()); + assert_eq!(active - 1, table.snapshot()?.files_count()); assert_eq!(vec![file.to_string()], metrics.files_removed); let remove = table @@ -51,7 +51,7 @@ async fn test_filesystem_check(context: &IntegrationContext) -> TestResult { let op = DeltaOps::from(table); let (table, metrics) = op.filesystem_check().await?; assert_eq!(version + 1, table.snapshot()?.version()); - assert_eq!(active - 1, table.snapshot()?.file_actions()?.len()); + assert_eq!(active - 1, table.snapshot()?.files_count()); assert!(metrics.files_removed.is_empty()); Ok(()) @@ -77,13 +77,13 @@ async fn test_filesystem_check_partitioned() -> TestResult { .await?; let version = table.snapshot()?.version(); - let active = table.snapshot()?.file_actions()?.len(); + let active = table.snapshot()?.files_count(); // Validate a run updates the table version with proper remove actions let op = DeltaOps::from(table); let (table, metrics) = op.filesystem_check().await?; assert_eq!(version + 1, table.snapshot()?.version()); - assert_eq!(active - 1, table.snapshot()?.file_actions()?.len()); + assert_eq!(active - 1, table.snapshot()?.files_count()); assert_eq!(vec![file.to_string()], metrics.files_removed); let remove = table diff --git a/crates/deltalake-core/tests/command_optimize.rs b/crates/deltalake-core/tests/command_optimize.rs index 9e70e22bcf..75788c6792 100644 --- a/crates/deltalake-core/tests/command_optimize.rs +++ b/crates/deltalake-core/tests/command_optimize.rs @@ -1,11 +1,11 @@ -use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use std::{collections::HashMap, error::Error, sync::Arc}; +use std::time::Duration; +use std::{error::Error, sync::Arc}; use arrow_array::{Int32Array, RecordBatch, StringArray}; use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema}; use arrow_select::concat::concat_batches; use deltalake_core::errors::DeltaTableError; -use deltalake_core::kernel::{Action, DataType, PrimitiveType, Remove, StructField}; +use deltalake_core::kernel::{Action, DataType, PrimitiveType, StructField}; use deltalake_core::operations::optimize::{ create_merge_plan, MetricDetails, Metrics, OptimizeType, }; @@ -275,24 +275,8 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { let uri = context.tmp_dir.path().to_str().to_owned().unwrap(); let other_dt = deltalake_core::open_table(uri).await?; - let add = &other_dt.snapshot()?.file_actions()?[0]; - let remove = Remove { - path: add.path.clone(), - deletion_timestamp: Some( - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() as i64, - ), - data_change: true, - extended_file_metadata: None, - size: Some(add.size), - partition_values: Some(add.partition_values.clone()), - tags: Some(HashMap::new()), - deletion_vector: add.deletion_vector.clone(), - base_row_id: add.base_row_id, - default_row_commit_version: add.default_row_commit_version, - }; + let add = &other_dt.snapshot()?.log_data().into_iter().next().unwrap(); + let remove = add.remove_action(true); let operation = DeltaOperation::Delete { predicate: None }; commit( diff --git a/crates/deltalake-core/tests/command_restore.rs b/crates/deltalake-core/tests/command_restore.rs index 9e2431251a..1e49132d23 100644 --- a/crates/deltalake-core/tests/command_restore.rs +++ b/crates/deltalake-core/tests/command_restore.rs @@ -170,8 +170,8 @@ async fn test_restore_with_error_params() -> Result<(), Box> { async fn test_restore_file_missing() -> Result<(), Box> { let context = setup_test().await?; - for file in context.table.snapshot()?.file_actions()?.iter() { - let p = context.tmp_dir.path().join(file.clone().path); + for file in context.table.snapshot()?.log_data() { + let p = context.tmp_dir.path().join(file.path().as_ref()); fs::remove_file(p).unwrap(); } @@ -197,8 +197,8 @@ async fn test_restore_file_missing() -> Result<(), Box> { async fn test_restore_allow_file_missing() -> Result<(), Box> { let context = setup_test().await?; - for file in context.table.snapshot()?.file_actions()?.iter() { - let p = context.tmp_dir.path().join(file.clone().path); + for file in context.table.snapshot()?.log_data() { + let p = context.tmp_dir.path().join(file.path().as_ref()); fs::remove_file(p).unwrap(); } diff --git a/python/src/lib.rs b/python/src/lib.rs index e3395d2610..5e7df875d9 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -25,7 +25,7 @@ use deltalake::datafusion::datasource::provider::TableProvider; use deltalake::datafusion::prelude::SessionContext; use deltalake::delta_datafusion::DeltaDataChecker; use deltalake::errors::DeltaTableError; -use deltalake::kernel::{Action, Add, FileStats, Invariant, Remove, Scalar, StructType}; +use deltalake::kernel::{Action, Add, Invariant, LogicalFile, Remove, Scalar, StructType}; use deltalake::operations::constraints::ConstraintBuilder; use deltalake::operations::convert_to_delta::{ConvertToDeltaBuilder, PartitionStrategy}; use deltalake::operations::delete::DeleteBuilder; @@ -1150,7 +1150,7 @@ fn scalar_to_py(value: &Scalar, py_date: &PyAny, py: Python) -> PyResult( py: Python<'py>, schema: &PyArrowType, - file_info: FileStats<'_>, + file_info: LogicalFile<'_>, ) -> PyResult> { let ds = PyModule::import(py, "pyarrow.dataset")?; let py_field = ds.getattr("field")?; diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 49177782ff..337d68f931 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -237,7 +237,7 @@ def test_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table, engin "float32", "float64", "bool", - "binary", + # "binary", "date32", "timestamp", ],