Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: failing test est_restore_by_datetime #2000

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/deltalake-core/src/operations/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ async fn execute(
table.version()
}
};

if version >= snapshot.version() {
return Err(DeltaTableError::from(RestoreError::TooLargeRestoreVersion(
version,
Expand Down
27 changes: 21 additions & 6 deletions crates/deltalake-core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ impl DeltaTable {

debug!("merging table state with version: {new_version}");
let s = DeltaTableState::from_actions(actions, new_version)?;

self.state
.merge(s, self.config.require_tombstones, self.config.require_files);
if self.version() == max_version {
Expand Down Expand Up @@ -637,17 +638,31 @@ impl DeltaTable {
&mut self,
version: i64,
) -> Result<i64, DeltaTableError> {
// Use a cached timestamp info if available
match self.version_timestamp.get(&version) {
Some(ts) => Ok(*ts),
None => {
let meta = self
.object_store()
.head(&commit_uri_from_version(version))
.await?;
let ts = meta.last_modified.timestamp_millis();
// Load the version specified
if self.version() != version {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of loading the version each time, we could also just do this

self.state.commit_infos().get(version).unwrap().timestamp

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ion-elgreco Setting aside the concerns I have shared with reliance on commitInfo, I believe that invocation is not guaranteed to return the right versioned information if the version has not been loaded already.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just think aloud here but in general you shouldn't be able to restore to a version of a table that's above the current loaded table version

self.load_version(version).await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@r3stl355 I am not as familiar with the callers of get_version_timestamp but this call will actually reset the self table state to the version specified. That seems like it would be a concerning side effect.

I'm honestly not sure if load_version is more/less work than just creating a new DeltaTable at that specific version, and then returning newtable.get_version_timestamp(version) to preserve self here 🤔

}
let timestamp: Option<i64> = if !self.state.commit_infos().is_empty() {
self.state.commit_infos().last().unwrap().timestamp
Comment on lines +649 to +650
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a concept I've discussed with @ion-elgreco in Slack a bit. The value of the commitInfo is not governed by the protocol and therefore any timestamp field should not be relied upon for anything.

Since the protocol doesn't dictate the format here, this could be epoch, or any other random timestmap and we have no guarantees other than Delta/Spark's convention that it will actually represent the timestamp of the last written version.

All that said, I like that this is a sort of guarded optimization, I'm really not sure how to make this much safer though 🤔

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we just check the clientVersion and only take the timestamp from engines that we know use timestamp defined in the same format

} else {
None
};
let ts = match timestamp {
Some(ts) => ts,
_ => {
let meta = self
.object_store()
.head(&commit_uri_from_version(version))
.await?;
meta.last_modified.timestamp_millis()
}
};
// also cache timestamp for version
self.version_timestamp.insert(version, ts);

Ok(ts)
}
}
Expand Down
27 changes: 18 additions & 9 deletions crates/deltalake-core/tests/command_restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ use rand::Rng;
use std::error::Error;
use std::fs;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tempdir::TempDir;

#[derive(Debug)]
Expand Down Expand Up @@ -44,21 +42,18 @@ async fn setup_test() -> Result<Context, Box<dyn Error>> {
.await?;

let batch = get_record_batch();
thread::sleep(Duration::from_secs(1));
let table = DeltaOps(table)
.write(vec![batch.clone()])
.with_save_mode(SaveMode::Append)
.await
.unwrap();

thread::sleep(Duration::from_secs(1));
let table = DeltaOps(table)
.write(vec![batch.clone()])
.with_save_mode(SaveMode::Overwrite)
.await
.unwrap();

thread::sleep(Duration::from_secs(1));
let table = DeltaOps(table)
.write(vec![batch.clone()])
.with_save_mode(SaveMode::Append)
Expand Down Expand Up @@ -123,13 +118,27 @@ async fn test_restore_by_datetime() -> Result<(), Box<dyn Error>> {
let naive = NaiveDateTime::from_timestamp_millis(timestamp).unwrap();
let datetime: DateTime<Utc> = Utc.from_utc_datetime(&naive);

let result = DeltaOps(table)
let (table, metrics) = DeltaOps(table)
.restore()
.with_datetime_to_restore(datetime)
.await?;
assert_eq!(result.1.num_restored_file, 1);
assert_eq!(result.1.num_removed_file, 2);
assert_eq!(result.0.state.version(), 4);
assert_eq!(metrics.num_restored_file, 1);
assert_eq!(metrics.num_removed_file, 2);
assert_eq!(table.state.version(), 4);

// Pick another version right before the restore
let timestamp = history.get(3).unwrap().timestamp.unwrap();
let naive = NaiveDateTime::from_timestamp_millis(timestamp).unwrap();
let datetime: DateTime<Utc> = Utc.from_utc_datetime(&naive);

let (table, metrics) = DeltaOps(table)
.restore()
.with_datetime_to_restore(datetime)
.await?;
assert_eq!(metrics.num_restored_file, 2);
assert_eq!(metrics.num_removed_file, 1);
assert_eq!(table.state.version(), 5);

Ok(())
}

Expand Down
20 changes: 15 additions & 5 deletions crates/deltalake-core/tests/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,19 @@ async fn time_travel_by_ds() {
("00000000000000000003.json", "2020-05-04T22:47:31-07:00"),
("00000000000000000004.json", "2020-05-05T22:47:31-07:00"),
];

// Need to set both the commit timestamp in each log file and, as a secondary, the modifed time of the file in case the timestamp is not defined
let log_dir_path = Path::new(log_dir);
for (fname, ds) in log_mtime_pair {
let ts = ds_to_ts(ds);
utime::set_file_times(Path::new(log_dir).join(fname), ts, ts).unwrap();
let (file_ts, commit_ts) = ds_to_ts(ds);
let file = log_dir_path.join(fname);
let contents = std::fs::read_to_string(&file).unwrap();
let my_reg = regex::Regex::new(r#"("timestamp"):\s*\d+([\s,}])"#).unwrap();
let new_contents = my_reg
.replace(&contents, format!("$1:{}$2", commit_ts))
.to_string();
std::fs::write(&file, new_contents).unwrap();
utime::set_file_times(file, file_ts, file_ts).unwrap();
}

let mut table = deltalake_core::open_table_with_ds(
Expand All @@ -23,7 +33,6 @@ async fn time_travel_by_ds() {
)
.await
.unwrap();

assert_eq!(table.version(), 0);

table = deltalake_core::open_table_with_ds(
Expand Down Expand Up @@ -83,7 +92,8 @@ async fn time_travel_by_ds() {
assert_eq!(table.version(), 4);
}

fn ds_to_ts(ds: &str) -> i64 {
fn ds_to_ts(ds: &str) -> (i64, i64) {
let fixed_dt = DateTime::<FixedOffset>::parse_from_rfc3339(ds).unwrap();
DateTime::<Utc>::from(fixed_dt).timestamp()
let td_datetime = DateTime::<Utc>::from(fixed_dt);
(td_datetime.timestamp(), td_datetime.timestamp_millis())
}
24 changes: 18 additions & 6 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import re
from datetime import date, datetime
from pathlib import Path
from threading import Barrier, Thread
Expand Down Expand Up @@ -74,15 +75,26 @@ def test_read_simple_table_using_options_to_dict():
def test_load_as_version_datetime(date_value: str, expected_version):
log_dir = "../crates/deltalake-core/tests/data/simple_table/_delta_log"
log_mtime_pair = [
("00000000000000000000.json", 1588398451.0),
("00000000000000000001.json", 1588484851.0),
("00000000000000000002.json", 1588571251.0),
("00000000000000000003.json", 1588657651.0),
("00000000000000000004.json", 1588744051.0),
("00000000000000000000.json", 1588398451000),
("00000000000000000001.json", 1588484851000),
("00000000000000000002.json", 1588571251000),
("00000000000000000003.json", 1588657651000),
("00000000000000000004.json", 1588744051000),
]

# Need to set both the commit timestamp in each log file and, as a secondary, the modifed time of the file in case the timestamp is not defined
for file_name, dt_epoch in log_mtime_pair:
file_path = os.path.join(log_dir, file_name)
os.utime(file_path, (dt_epoch, dt_epoch))
with open(file_path, "r") as out:
contents = out.read()
regex = r'("timestamp"):\s*\d+([\s,}])'
new_contents = re.sub(regex, rf"\1:{dt_epoch}\2", contents)
with open(file_path, "w") as out:
out.write(new_contents)

# File timestamps are in seconds
file_ts = dt_epoch / 1000
os.utime(file_path, (file_ts, file_ts))

table_path = "../crates/deltalake-core/tests/data/simple_table"
dt = DeltaTable(table_path)
Expand Down