diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 21ae333952..42f1a081c6 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -429,7 +429,7 @@ impl DeltaTableState { } /// merges new state information into our state - pub fn merge(&mut self, mut new_state: DeltaTableState) { + pub fn merge(&mut self, mut new_state: DeltaTableState, require_tombstones: bool) { self.files.append(&mut new_state.files); if !new_state.tombstones.is_empty() { @@ -442,7 +442,11 @@ impl DeltaTableState { self.files .retain(|a| !new_removals.contains(a.path.as_str())); } - self.tombstones.append(&mut new_state.tombstones); + + if require_tombstones { + self.tombstones.append(&mut new_state.tombstones); + } + if new_state.min_reader_version > 0 { self.min_reader_version = new_state.min_reader_version; self.min_writer_version = new_state.min_writer_version; @@ -486,12 +490,147 @@ fn extract_rel_path<'a, 'b>( } } +/// possible version specifications for loading a delta table +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum DeltaVersion { + /// load the newest version + Newest, + /// specify the version to laod + Version(DeltaDataTypeVersion), + /// specify the timestamp in UTC + Timestamp(DateTime), +} + +impl Default for DeltaVersion { + fn default() -> Self { + DeltaVersion::Newest + } +} + +/// configuration options for delta table +#[derive(Debug)] +pub struct DeltaTableConfig { + /// indicates whether our use case requires tracking tombstones. + /// read-only applications never require tombstones. Tombstones + /// are only required when writing checkpoints, so even many writers + /// may want to skip them. + /// defaults to true as a safe default. + pub require_tombstones: bool, +} + +impl Default for DeltaTableConfig { + fn default() -> Self { + Self { + require_tombstones: true, + } + } +} + +/// Load-time delta table configuration options +#[derive(Debug)] +pub struct DeltaTableLoadOptions { + /// table root uri + pub table_uri: String, + /// backend to access storage system + pub storage_backend: Box, + /// indicates whether our use case requires tracking tombstones. + /// read-only applications never require tombstones. Tombstones + /// are only required when writing checkpoints, so even many writers + /// may want to skip them. + /// defaults to true as a safe default. + pub require_tombstones: bool, + /// specify the version we are going to load: a time stamp, a version, or just the newest + /// available version + pub version: DeltaVersion, +} + +impl DeltaTableLoadOptions { + /// create default table load options for a table uri + pub fn new(table_uri: &str) -> Result { + Ok(Self { + table_uri: table_uri.to_string(), + storage_backend: storage::get_backend_for_uri(table_uri)?, + require_tombstones: true, + version: DeltaVersion::default(), + }) + } +} + +/// builder for configuring a delta table load. +#[derive(Debug)] +pub struct DeltaTableBuilder { + options: DeltaTableLoadOptions, +} + +impl DeltaTableBuilder { + /// TODO + pub fn from_uri(table_uri: &str) -> Result { + Ok(DeltaTableBuilder { + options: DeltaTableLoadOptions::new(table_uri)?, + }) + } + + /// TODO + pub fn without_tombstones(mut self) -> Self { + self.options.require_tombstones = false; + self + } + + /// TODO + pub fn with_version(mut self, version: DeltaDataTypeVersion) -> Self { + self.options.version = DeltaVersion::Version(version); + self + } + + /// specify the timestamp given as ISO-8601/RFC-3339 timestamp + pub fn with_datestring(self, date_string: &str) -> Result { + let datetime = + DateTime::::from(DateTime::::parse_from_rfc3339(date_string)?); + Ok(self.with_timestamp(datetime)) + } + + /// specify a timestamp + pub fn with_timestamp(mut self, timestamp: DateTime) -> Self { + self.options.version = DeltaVersion::Timestamp(timestamp); + self + } + + /// explicitely set a backend (override backend derived from `table_uri`) + pub fn with_storage_backend(mut self, storage: Box) -> Self { + self.options.storage_backend = storage; + self + } + + /// finally load the table + pub async fn load(self) -> Result { + let config = DeltaTableConfig { + require_tombstones: self.options.require_tombstones, + }; + + let mut table = DeltaTable::new( + &self.options.table_uri, + self.options.storage_backend, + config, + )?; + + match self.options.version { + DeltaVersion::Newest => table.load().await?, + DeltaVersion::Version(v) => table.load_version(v).await?, + DeltaVersion::Timestamp(ts) => table.load_with_datetime(ts).await?, + } + + Ok(table) + } +} + /// In memory representation of a Delta Table pub struct DeltaTable { /// The version of the table as of the most recent loaded Delta log entry. pub version: DeltaDataTypeVersion, /// The URI the DeltaTable was loaded from. pub table_uri: String, + /// the load options used during load + pub config: DeltaTableConfig, state: DeltaTableState, @@ -609,10 +748,10 @@ impl DeltaTable { let mut new_state = DeltaTableState::default(); for line in reader.lines() { let action: Action = serde_json::from_str(line?.as_str())?; - process_action(&mut new_state, action)?; + process_action(&mut new_state, action, true)?; } - self.state.merge(new_state); + self.state.merge(new_state, self.config.require_tombstones); Ok(()) } @@ -642,6 +781,7 @@ impl DeltaTable { process_action( &mut self.state, Action::from_parquet_record(schema, &record)?, + self.config.require_tombstones, )?; } } @@ -1130,6 +1270,7 @@ impl DeltaTable { pub fn new( table_uri: &str, storage_backend: Box, + config: DeltaTableConfig, ) -> Result { let table_uri = storage_backend.trim_path(table_uri); let log_uri_normalized = storage_backend.join_path(&table_uri, "_delta_log"); @@ -1138,6 +1279,7 @@ impl DeltaTable { state: DeltaTableState::default(), storage: storage_backend, table_uri, + config, last_check_point: None, log_uri: log_uri_normalized, version_timestamp: HashMap::new(), @@ -1179,7 +1321,7 @@ impl DeltaTable { // Mutate the DeltaTable's state using process_action() // in order to get most up-to-date state based on the commit above for action in actions { - let _ = process_action(&mut self.state, action)?; + let _ = process_action(&mut self.state, action, self.config.require_tombstones)?; } Ok(()) @@ -1511,14 +1653,20 @@ fn log_entry_from_actions(actions: &[Action]) -> Result Result<(), ApplyLogError> { +fn process_action( + state: &mut DeltaTableState, + action: Action, + handle_tombstones: bool, +) -> Result<(), ApplyLogError> { match action { Action::add(v) => { state.files.push(v.path_decoded()?); } Action::remove(v) => { - let v = v.path_decoded()?; - state.tombstones.push(v); + if handle_tombstones { + let v = v.path_decoded()?; + state.tombstones.push(v); + } } Action::protocol(v) => { state.min_reader_version = v.min_reader_version; @@ -1548,9 +1696,7 @@ fn process_action(state: &mut DeltaTableState, action: Action) -> Result<(), App /// Creates and loads a DeltaTable from the given path with current metadata. /// Infers the storage backend to use from the scheme in the given table path. pub async fn open_table(table_uri: &str) -> Result { - let storage_backend = storage::get_backend_for_uri(table_uri)?; - let mut table = DeltaTable::new(table_uri, storage_backend)?; - table.load().await?; + let table = DeltaTableBuilder::from_uri(table_uri)?.load().await?; Ok(table) } @@ -1561,10 +1707,10 @@ pub async fn open_table_with_version( table_uri: &str, version: DeltaDataTypeVersion, ) -> Result { - let storage_backend = storage::get_backend_for_uri(table_uri)?; - let mut table = DeltaTable::new(table_uri, storage_backend)?; - table.load_version(version).await?; - + let table = DeltaTableBuilder::from_uri(table_uri)? + .with_version(version) + .load() + .await?; Ok(table) } @@ -1572,11 +1718,10 @@ pub async fn open_table_with_version( /// Loads metadata from the version appropriate based on the given ISO-8601/RFC-3339 timestamp. /// Infers the storage backend to use from the scheme in the given table path. pub async fn open_table_with_ds(table_uri: &str, ds: &str) -> Result { - let datetime = DateTime::::from(DateTime::::parse_from_rfc3339(ds)?); - let storage_backend = storage::get_backend_for_uri(table_uri)?; - let mut table = DeltaTable::new(table_uri, storage_backend)?; - table.load_with_datetime(datetime).await?; - + let table = DeltaTableBuilder::from_uri(table_uri)? + .with_datestring(ds)? + .load() + .await?; Ok(table) } @@ -1614,7 +1759,7 @@ mod tests { last_updated: Some(0), }); - let _ = process_action(&mut state, txn_action).unwrap(); + let _ = process_action(&mut state, txn_action, false).unwrap(); assert_eq!(2, *state.app_transaction_version.get("abc").unwrap()); assert_eq!(1, *state.app_transaction_version.get("xyz").unwrap()); @@ -1631,7 +1776,7 @@ mod tests { .iter() { let be = storage::get_backend_for_uri(table_uri).unwrap(); - let table = DeltaTable::new(table_uri, be).unwrap(); + let table = DeltaTable::new(table_uri, be, DeltaTableConfig::default()).unwrap(); assert_eq!(table.table_uri, "s3://tests/data/delta-0.8.0"); } } @@ -1715,7 +1860,8 @@ mod tests { let backend = Box::new(storage::file::FileStorageBackend::new( tmp_dir.path().to_str().unwrap(), )); - let mut dt = DeltaTable::new(path, backend).unwrap(); + let mut dt = DeltaTable::new(path, backend, DeltaTableConfig::default()).unwrap(); + // let mut dt = DeltaTable::new(path, backend, DeltaTableLoadOptions::default()).unwrap(); let mut commit_info = Map::::new(); commit_info.insert( diff --git a/rust/tests/fs_common/mod.rs b/rust/tests/fs_common/mod.rs index 4558399a26..566eef8949 100644 --- a/rust/tests/fs_common/mod.rs +++ b/rust/tests/fs_common/mod.rs @@ -1,5 +1,5 @@ use deltalake::action::{Action, Protocol}; -use deltalake::{storage, DeltaTable, DeltaTableMetaData, Schema}; +use deltalake::{storage, DeltaTable, DeltaTableConfig, DeltaTableMetaData, Schema}; use parquet::file::reader::{FileReader, SerializedFileReader}; use parquet::schema::types::Type; use std::collections::HashMap; @@ -26,7 +26,7 @@ pub async fn create_test_table( config: HashMap>, ) -> DeltaTable { let backend = storage::get_backend_for_uri(path).unwrap(); - let mut table = DeltaTable::new(path, backend).unwrap(); + let mut table = DeltaTable::new(path, backend, DeltaTableConfig::default()).unwrap(); let md = DeltaTableMetaData::new(None, None, None, schema, Vec::new(), config); let protocol = Protocol { min_reader_version: 1, diff --git a/rust/tests/read_delta_test.rs b/rust/tests/read_delta_test.rs index 944f86146c..a4c748ad37 100644 --- a/rust/tests/read_delta_test.rs +++ b/rust/tests/read_delta_test.rs @@ -1,6 +1,7 @@ extern crate deltalake; use deltalake::storage::file::FileStorageBackend; +use deltalake::DeltaTableBuilder; use deltalake::StorageBackend; use pretty_assertions::assert_eq; use std::collections::HashMap; @@ -52,6 +53,28 @@ async fn read_delta_table_with_update() { ); } +#[tokio::test] +async fn read_delta_table_ignoring_tombstones() { + let table = DeltaTableBuilder::from_uri("./tests/data/delta-0.8.0") + .unwrap() + .without_tombstones() + .load() + .await + .unwrap(); + assert!( + table.get_state().all_tombstones().is_empty(), + "loading without tombstones should skip tombstones" + ); + + assert_eq!( + table.get_files(), + vec![ + "part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet", + "part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet" + ] + ); +} + #[tokio::test] async fn read_delta_2_0_table_with_version() { let mut table = deltalake::open_table_with_version("./tests/data/delta-0.2.0", 0) diff --git a/rust/tests/s3_test.rs b/rust/tests/s3_test.rs index efa662137a..77f252afc8 100644 --- a/rust/tests/s3_test.rs +++ b/rust/tests/s3_test.rs @@ -33,7 +33,9 @@ mod s3 { }, ) .unwrap(); - let mut table = deltalake::DeltaTable::new(table_uri, storage).unwrap(); + let mut table = + deltalake::DeltaTable::new(table_uri, storage, deltalake::DeltaTableConfig::default()) + .unwrap(); table.load().await.unwrap(); println!("{}", table);