diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 91b9bf8721..4c81031ea7 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -429,21 +429,30 @@ impl Default for DeltaVersion { } } -/// configuration options for delta table +/// Configuration options for delta table #[derive(Debug)] pub struct DeltaTableConfig { - /// indicates whether our use case requires tracking tombstones. - /// read-only applications never require tombstones. Tombstones + /// Indicates whether our use case requires tracking tombstones. + /// This defaults to `true` + /// + /// Read-only applications never require tombstones. Tombstones /// are only required when writing checkpoints, so even many writers /// may want to skip them. - /// defaults to true as a safe default. pub require_tombstones: bool, + + /// Indicates whether DeltaTable should track files. + /// This defaults to `true` + /// + /// Some append-only applications might have no need of tracking any files. + /// Hence, DeltaTable will be loaded with significant memory reduction. + pub require_files: bool, } impl Default for DeltaTableConfig { fn default() -> Self { Self { require_tombstones: true, + require_files: true, } } } @@ -455,15 +464,22 @@ pub struct DeltaTableLoadOptions { pub table_uri: String, /// backend to access storage system pub storage_backend: Box, - /// indicates whether our use case requires tracking tombstones. - /// read-only applications never require tombstones. Tombstones - /// are only required when writing checkpoints, so even many writers - /// may want to skip them. - /// defaults to true as a safe default. - pub require_tombstones: bool, /// specify the version we are going to load: a time stamp, a version, or just the newest /// available version pub version: DeltaVersion, + /// Indicates whether our use case requires tracking tombstones. + /// This defaults to `true` + /// + /// Read-only applications never require tombstones. Tombstones + /// are only required when writing checkpoints, so even many writers + /// may want to skip them. + pub require_tombstones: bool, + /// Indicates whether DeltaTable should track files. + /// This defaults to `true` + /// + /// Some append-only applications might have no need of tracking any files. + /// Hence, DeltaTable will be loaded with significant memory reduction. + pub require_files: bool, } impl DeltaTableLoadOptions { @@ -473,6 +489,7 @@ impl DeltaTableLoadOptions { table_uri: table_uri.to_string(), storage_backend: storage::get_backend_for_uri(table_uri)?, require_tombstones: true, + require_files: true, version: DeltaVersion::default(), }) } @@ -485,20 +502,26 @@ pub struct DeltaTableBuilder { } impl DeltaTableBuilder { - /// TODO + /// Creates `DeltaTableBuilder` from table uri pub fn from_uri(table_uri: &str) -> Result { Ok(DeltaTableBuilder { options: DeltaTableLoadOptions::new(table_uri)?, }) } - /// TODO + /// Sets `require_tombstones=false` to the builder pub fn without_tombstones(mut self) -> Self { self.options.require_tombstones = false; self } - /// TODO + /// Sets `require_files=false` to the builder + pub fn without_files(mut self) -> Self { + self.options.require_files = false; + self + } + + /// Sets `version` to the builder pub fn with_version(mut self, version: DeltaDataTypeVersion) -> Self { self.options.version = DeltaVersion::Version(version); self @@ -527,6 +550,7 @@ impl DeltaTableBuilder { pub async fn load(self) -> Result { let config = DeltaTableConfig { require_tombstones: self.options.require_tombstones, + require_files: self.options.require_files, }; let mut table = DeltaTable::new( @@ -708,15 +732,17 @@ impl DeltaTable { async fn apply_log(&mut self, version: DeltaDataTypeVersion) -> Result<(), ApplyLogError> { let new_state = DeltaTableState::from_commit(self, version).await?; - self.state.merge(new_state, self.config.require_tombstones); + self.state.merge( + new_state, + self.config.require_tombstones, + self.config.require_files, + ); Ok(()) } async fn restore_checkpoint(&mut self, check_point: CheckPoint) -> Result<(), DeltaTableError> { - self.state = - DeltaTableState::from_checkpoint(self, &check_point, self.config.require_tombstones) - .await?; + self.state = DeltaTableState::from_checkpoint(self, &check_point).await?; Ok(()) } @@ -810,7 +836,8 @@ impl DeltaTable { } let s = DeltaTableState::from_actions(actions)?; - self.state.merge(s, self.config.require_tombstones); + self.state + .merge(s, self.config.require_tombstones, self.config.require_files); self.version = new_version; Ok(()) @@ -1325,7 +1352,11 @@ impl DeltaTable { let committed_version = self.try_commit_transaction(&prepared_commit, 0).await?; let new_state = DeltaTableState::from_commit(self, committed_version).await?; - self.state.merge(new_state, self.config.require_tombstones); + self.state.merge( + new_state, + self.config.require_tombstones, + self.config.require_files, + ); Ok(()) } diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index 63152f3b0f..e7cffe83b0 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -48,7 +48,11 @@ impl DeltaTableState { let mut new_state = DeltaTableState::default(); for line in reader.lines() { let action: action::Action = serde_json::from_str(line?.as_str())?; - new_state.process_action(action, true)?; + new_state.process_action( + action, + table.config.require_tombstones, + table.config.require_files, + )?; } Ok(new_state) @@ -58,7 +62,7 @@ impl DeltaTableState { pub fn from_actions(actions: Vec) -> Result { let mut new_state = DeltaTableState::default(); for action in actions { - new_state.process_action(action, true)?; + new_state.process_action(action, true, true)?; } Ok(new_state) } @@ -67,7 +71,6 @@ impl DeltaTableState { pub async fn from_checkpoint( table: &DeltaTable, check_point: &CheckPoint, - require_tombstones: bool, ) -> Result { let checkpoint_data_paths = table.get_checkpoint_data_paths(check_point); // process actions from checkpoint @@ -85,7 +88,8 @@ impl DeltaTableState { for record in preader.get_row_iter(None)? { new_state.process_action( action::Action::from_parquet_record(schema, &record)?, - require_tombstones, + table.config.require_tombstones, + table.config.require_files, )?; } } @@ -154,14 +158,19 @@ impl DeltaTableState { self.current_metadata.as_ref() } - /// merges new state information into our state - pub fn merge(&mut self, mut new_state: DeltaTableState, require_tombstones: bool) { + /// Merges new state information into our state + pub fn merge( + &mut self, + mut new_state: DeltaTableState, + require_tombstones: bool, + require_files: bool, + ) { if !new_state.tombstones.is_empty() { self.files .retain(|a| !new_state.tombstones.contains(a.path.as_str())); } - if require_tombstones { + if require_tombstones && require_files { new_state.tombstones.into_iter().for_each(|r| { self.tombstones.insert(r); }); @@ -173,7 +182,9 @@ impl DeltaTableState { } } - self.files.append(&mut new_state.files); + if require_files { + self.files.append(&mut new_state.files); + } if new_state.min_reader_version > 0 { self.min_reader_version = new_state.min_reader_version; @@ -206,14 +217,17 @@ impl DeltaTableState { fn process_action( &mut self, action: action::Action, - handle_tombstones: bool, + require_tombstones: bool, + require_files: bool, ) -> Result<(), ApplyLogError> { match action { action::Action::add(v) => { - self.files.push(v.path_decoded()?); + if require_files { + self.files.push(v.path_decoded()?); + } } action::Action::remove(v) => { - if handle_tombstones { + if require_tombstones && require_files { let v = v.path_decoded()?; self.tombstones.insert(v); } @@ -280,7 +294,7 @@ mod tests { last_updated: Some(0), }); - let _ = state.process_action(txn_action, false).unwrap(); + let _ = state.process_action(txn_action, false, true).unwrap(); assert_eq!(2, *state.app_transaction_version().get("abc").unwrap()); assert_eq!(1, *state.app_transaction_version().get("xyz").unwrap()); diff --git a/rust/src/writer/stats.rs b/rust/src/writer/stats.rs index d018d8897a..927f57524d 100644 --- a/rust/src/writer/stats.rs +++ b/rust/src/writer/stats.rs @@ -542,13 +542,7 @@ mod tests { options: HashMap, ) -> Result { let backend = crate::get_backend_for_uri_with_options(table_uri, options)?; - let mut table = DeltaTable::new( - table_uri, - backend, - crate::DeltaTableConfig { - require_tombstones: true, - }, - )?; + let mut table = DeltaTable::new(table_uri, backend, crate::DeltaTableConfig::default())?; table.load().await?; Ok(table) } diff --git a/rust/tests/read_delta_test.rs b/rust/tests/read_delta_test.rs index be96141842..741259dafc 100644 --- a/rust/tests/read_delta_test.rs +++ b/rust/tests/read_delta_test.rs @@ -77,6 +77,48 @@ async fn read_delta_table_ignoring_tombstones() { ); } +#[tokio::test] +async fn read_delta_table_ignoring_files() { + let table = DeltaTableBuilder::from_uri("./tests/data/delta-0.8.0") + .unwrap() + .without_files() + .load() + .await + .unwrap(); + + assert!(table.get_files().is_empty(), "files should be empty"); + assert!( + table.get_tombstones().next().is_none(), + "tombstones should be empty" + ); +} + +#[tokio::test] +async fn read_delta_table_with_ignoring_files_on_apply_log() { + let mut table = DeltaTableBuilder::from_uri("./tests/data/delta-0.8.0") + .unwrap() + .with_version(0) + .without_files() + .load() + .await + .unwrap(); + + assert_eq!(table.version, 0); + assert!(table.get_files().is_empty(), "files should be empty"); + assert!( + table.get_tombstones().next().is_none(), + "tombstones should be empty" + ); + + table.update().await.unwrap(); + assert_eq!(table.version, 1); + assert!(table.get_files().is_empty(), "files should be empty"); + assert!( + table.get_tombstones().next().is_none(), + "tombstones should be empty" + ); +} + #[tokio::test] async fn read_delta_2_0_table_with_version() { let mut table = deltalake::open_table_with_version("./tests/data/delta-0.2.0", 0)