Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce require_files for tracking the add files in table state #594

Merged
merged 1 commit into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 50 additions & 19 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,21 +429,30 @@ impl Default for DeltaVersion {
}
}

/// configuration options for delta table
/// Configuration options for delta table
#[derive(Debug)]
pub struct DeltaTableConfig {
/// indicates whether our use case requires tracking tombstones.
/// read-only applications never require tombstones. Tombstones
/// Indicates whether our use case requires tracking tombstones.
/// This defaults to `true`
///
/// Read-only applications never require tombstones. Tombstones
/// are only required when writing checkpoints, so even many writers
/// may want to skip them.
/// defaults to true as a safe default.
pub require_tombstones: bool,

/// Indicates whether DeltaTable should track files.
/// This defaults to `true`
///
/// Some append-only applications might have no need of tracking any files.
/// Hence, DeltaTable will be loaded with significant memory reduction.
pub require_files: bool,
}

impl Default for DeltaTableConfig {
fn default() -> Self {
Self {
require_tombstones: true,
require_files: true,
}
}
}
Expand All @@ -455,15 +464,22 @@ pub struct DeltaTableLoadOptions {
pub table_uri: String,
/// backend to access storage system
pub storage_backend: Box<dyn StorageBackend>,
/// indicates whether our use case requires tracking tombstones.
/// read-only applications never require tombstones. Tombstones
/// are only required when writing checkpoints, so even many writers
/// may want to skip them.
/// defaults to true as a safe default.
pub require_tombstones: bool,
/// specify the version we are going to load: a time stamp, a version, or just the newest
/// available version
pub version: DeltaVersion,
/// Indicates whether our use case requires tracking tombstones.
/// This defaults to `true`
///
/// Read-only applications never require tombstones. Tombstones
/// are only required when writing checkpoints, so even many writers
/// may want to skip them.
pub require_tombstones: bool,
/// Indicates whether DeltaTable should track files.
/// This defaults to `true`
///
/// Some append-only applications might have no need of tracking any files.
/// Hence, DeltaTable will be loaded with significant memory reduction.
pub require_files: bool,
}

impl DeltaTableLoadOptions {
Expand All @@ -473,6 +489,7 @@ impl DeltaTableLoadOptions {
table_uri: table_uri.to_string(),
storage_backend: storage::get_backend_for_uri(table_uri)?,
require_tombstones: true,
require_files: true,
version: DeltaVersion::default(),
})
}
Expand All @@ -485,20 +502,26 @@ pub struct DeltaTableBuilder {
}

impl DeltaTableBuilder {
/// TODO
/// Creates `DeltaTableBuilder` from table uri
pub fn from_uri(table_uri: &str) -> Result<Self, DeltaTableError> {
Ok(DeltaTableBuilder {
options: DeltaTableLoadOptions::new(table_uri)?,
})
}

/// TODO
/// Sets `require_tombstones=false` to the builder
pub fn without_tombstones(mut self) -> Self {
self.options.require_tombstones = false;
self
}

/// TODO
/// Sets `require_files=false` to the builder
pub fn without_files(mut self) -> Self {
self.options.require_files = false;
self
}

/// Sets `version` to the builder
pub fn with_version(mut self, version: DeltaDataTypeVersion) -> Self {
self.options.version = DeltaVersion::Version(version);
self
Expand Down Expand Up @@ -527,6 +550,7 @@ impl DeltaTableBuilder {
pub async fn load(self) -> Result<DeltaTable, DeltaTableError> {
let config = DeltaTableConfig {
require_tombstones: self.options.require_tombstones,
require_files: self.options.require_files,
};

let mut table = DeltaTable::new(
Expand Down Expand Up @@ -708,15 +732,17 @@ impl DeltaTable {

async fn apply_log(&mut self, version: DeltaDataTypeVersion) -> Result<(), ApplyLogError> {
let new_state = DeltaTableState::from_commit(self, version).await?;
self.state.merge(new_state, self.config.require_tombstones);
self.state.merge(
new_state,
self.config.require_tombstones,
self.config.require_files,
);

Ok(())
}

async fn restore_checkpoint(&mut self, check_point: CheckPoint) -> Result<(), DeltaTableError> {
self.state =
DeltaTableState::from_checkpoint(self, &check_point, self.config.require_tombstones)
.await?;
self.state = DeltaTableState::from_checkpoint(self, &check_point).await?;

Ok(())
}
Expand Down Expand Up @@ -810,7 +836,8 @@ impl DeltaTable {
}

let s = DeltaTableState::from_actions(actions)?;
self.state.merge(s, self.config.require_tombstones);
self.state
.merge(s, self.config.require_tombstones, self.config.require_files);
self.version = new_version;

Ok(())
Expand Down Expand Up @@ -1325,7 +1352,11 @@ impl DeltaTable {
let committed_version = self.try_commit_transaction(&prepared_commit, 0).await?;

let new_state = DeltaTableState::from_commit(self, committed_version).await?;
self.state.merge(new_state, self.config.require_tombstones);
self.state.merge(
new_state,
self.config.require_tombstones,
self.config.require_files,
);

Ok(())
}
Expand Down
38 changes: 26 additions & 12 deletions rust/src/table_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ impl DeltaTableState {
let mut new_state = DeltaTableState::default();
for line in reader.lines() {
let action: action::Action = serde_json::from_str(line?.as_str())?;
new_state.process_action(action, true)?;
new_state.process_action(
action,
table.config.require_tombstones,
table.config.require_files,
)?;
}

Ok(new_state)
Expand All @@ -58,7 +62,7 @@ impl DeltaTableState {
pub fn from_actions(actions: Vec<Action>) -> Result<Self, ApplyLogError> {
let mut new_state = DeltaTableState::default();
for action in actions {
new_state.process_action(action, true)?;
new_state.process_action(action, true, true)?;
}
Ok(new_state)
}
Expand All @@ -67,7 +71,6 @@ impl DeltaTableState {
pub async fn from_checkpoint(
table: &DeltaTable,
check_point: &CheckPoint,
require_tombstones: bool,
) -> Result<Self, DeltaTableError> {
let checkpoint_data_paths = table.get_checkpoint_data_paths(check_point);
// process actions from checkpoint
Expand All @@ -85,7 +88,8 @@ impl DeltaTableState {
for record in preader.get_row_iter(None)? {
new_state.process_action(
action::Action::from_parquet_record(schema, &record)?,
require_tombstones,
table.config.require_tombstones,
table.config.require_files,
)?;
}
}
Expand Down Expand Up @@ -154,14 +158,19 @@ impl DeltaTableState {
self.current_metadata.as_ref()
}

/// merges new state information into our state
pub fn merge(&mut self, mut new_state: DeltaTableState, require_tombstones: bool) {
/// Merges new state information into our state
pub fn merge(
&mut self,
mut new_state: DeltaTableState,
require_tombstones: bool,
require_files: bool,
) {
if !new_state.tombstones.is_empty() {
self.files
.retain(|a| !new_state.tombstones.contains(a.path.as_str()));
}

if require_tombstones {
if require_tombstones && require_files {
new_state.tombstones.into_iter().for_each(|r| {
self.tombstones.insert(r);
});
Expand All @@ -173,7 +182,9 @@ impl DeltaTableState {
}
}

self.files.append(&mut new_state.files);
if require_files {
self.files.append(&mut new_state.files);
}

if new_state.min_reader_version > 0 {
self.min_reader_version = new_state.min_reader_version;
Expand Down Expand Up @@ -206,14 +217,17 @@ impl DeltaTableState {
fn process_action(
&mut self,
action: action::Action,
handle_tombstones: bool,
require_tombstones: bool,
require_files: bool,
) -> Result<(), ApplyLogError> {
match action {
action::Action::add(v) => {
self.files.push(v.path_decoded()?);
if require_files {
self.files.push(v.path_decoded()?);
}
}
action::Action::remove(v) => {
if handle_tombstones {
if require_tombstones && require_files {
let v = v.path_decoded()?;
self.tombstones.insert(v);
}
Expand Down Expand Up @@ -280,7 +294,7 @@ mod tests {
last_updated: Some(0),
});

let _ = state.process_action(txn_action, false).unwrap();
let _ = state.process_action(txn_action, false, true).unwrap();

assert_eq!(2, *state.app_transaction_version().get("abc").unwrap());
assert_eq!(1, *state.app_transaction_version().get("xyz").unwrap());
Expand Down
8 changes: 1 addition & 7 deletions rust/src/writer/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,13 +542,7 @@ mod tests {
options: HashMap<String, String>,
) -> Result<DeltaTable, DeltaTableError> {
let backend = crate::get_backend_for_uri_with_options(table_uri, options)?;
let mut table = DeltaTable::new(
table_uri,
backend,
crate::DeltaTableConfig {
require_tombstones: true,
},
)?;
let mut table = DeltaTable::new(table_uri, backend, crate::DeltaTableConfig::default())?;
table.load().await?;
Ok(table)
}
Expand Down
42 changes: 42 additions & 0 deletions rust/tests/read_delta_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,48 @@ async fn read_delta_table_ignoring_tombstones() {
);
}

#[tokio::test]
async fn read_delta_table_ignoring_files() {
let table = DeltaTableBuilder::from_uri("./tests/data/delta-0.8.0")
.unwrap()
.without_files()
.load()
.await
.unwrap();

assert!(table.get_files().is_empty(), "files should be empty");
assert!(
table.get_tombstones().next().is_none(),
"tombstones should be empty"
);
}

#[tokio::test]
async fn read_delta_table_with_ignoring_files_on_apply_log() {
let mut table = DeltaTableBuilder::from_uri("./tests/data/delta-0.8.0")
.unwrap()
.with_version(0)
.without_files()
.load()
.await
.unwrap();

assert_eq!(table.version, 0);
assert!(table.get_files().is_empty(), "files should be empty");
assert!(
table.get_tombstones().next().is_none(),
"tombstones should be empty"
);

table.update().await.unwrap();
assert_eq!(table.version, 1);
assert!(table.get_files().is_empty(), "files should be empty");
assert!(
table.get_tombstones().next().is_none(),
"tombstones should be empty"
);
}

#[tokio::test]
async fn read_delta_2_0_table_with_version() {
let mut table = deltalake::open_table_with_version("./tests/data/delta-0.2.0", 0)
Expand Down