Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: failing test est_restore_by_datetime #2000

Closed
wants to merge 8 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix tests loading table by datetime
Signed-off-by: Nikolay Ulmasov <[email protected]>
r3stl355 committed Dec 31, 2023
commit a8d46ee8a4a499d8d799c640cc83032eeef47af6
6 changes: 5 additions & 1 deletion crates/deltalake-core/src/table/mod.rs
Original file line number Diff line number Diff line change
@@ -638,9 +638,14 @@ impl DeltaTable {
&mut self,
version: i64,
) -> Result<i64, DeltaTableError> {
// Use a cached timestamp info if available
match self.version_timestamp.get(&version) {
Some(ts) => Ok(*ts),
None => {
// Load the version specified
if self.version() != version {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of loading the version each time, we could also just do this

self.state.commit_infos().get(version).unwrap().timestamp

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ion-elgreco Setting aside the concerns I have shared with reliance on commitInfo, I believe that invocation is not guaranteed to return the right versioned information if the version has not been loaded already.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just think aloud here but in general you shouldn't be able to restore to a version of a table that's above the current loaded table version

self.load_version(version).await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@r3stl355 I am not as familiar with the callers of get_version_timestamp but this call will actually reset the self table state to the version specified. That seems like it would be a concerning side effect.

I'm honestly not sure if load_version is more/less work than just creating a new DeltaTable at that specific version, and then returning newtable.get_version_timestamp(version) to preserve self here 🤔

}
let timestamp: Option<i64> = if !self.state.commit_infos().is_empty() {
self.state.commit_infos().last().unwrap().timestamp
} else {
@@ -891,7 +896,6 @@ impl DeltaTable {
while min_version <= max_version {
let pivot = (max_version + min_version) / 2;
version = pivot;
self.load_version(version).await?;
let pts = self.get_version_timestamp(pivot).await?;
match pts.cmp(&target_ts) {
Ordering::Equal => {
14 changes: 12 additions & 2 deletions crates/deltalake-core/tests/time_travel.rs
Original file line number Diff line number Diff line change
@@ -12,9 +12,19 @@ async fn time_travel_by_ds() {
("00000000000000000003.json", "2020-05-04T22:47:31-07:00"),
("00000000000000000004.json", "2020-05-05T22:47:31-07:00"),
];

// Need to set both the commit timestamp in each log file and, as a secondary, the modifed time of the file in case the timestamp is not defined
let log_dir_path = Path::new(log_dir);
for (fname, ds) in log_mtime_pair {
let ts = ds_to_ts(ds);
utime::set_file_times(Path::new(log_dir).join(fname), ts, ts).unwrap();
let file = log_dir_path.join(fname);
let contents = std::fs::read_to_string(&file).unwrap();
let my_reg = regex::Regex::new(r#"("timestamp"):\s*\d+([\s,}])"#).unwrap();
let new_contents = my_reg
.replace(&contents, format!("$1:{}$2", ts))
.to_string();
std::fs::write(&file, new_contents).unwrap();
utime::set_file_times(file, ts, ts).unwrap();
}

let mut table = deltalake_core::open_table_with_ds(
@@ -85,5 +95,5 @@ async fn time_travel_by_ds() {

fn ds_to_ts(ds: &str) -> i64 {
let fixed_dt = DateTime::<FixedOffset>::parse_from_rfc3339(ds).unwrap();
DateTime::<Utc>::from(fixed_dt).timestamp()
DateTime::<Utc>::from(fixed_dt).timestamp_millis()
}
19 changes: 14 additions & 5 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import re
from datetime import date, datetime
from pathlib import Path
from threading import Barrier, Thread
@@ -74,14 +75,22 @@ def test_read_simple_table_using_options_to_dict():
def test_load_as_version_datetime(date_value: str, expected_version):
log_dir = "../crates/deltalake-core/tests/data/simple_table/_delta_log"
log_mtime_pair = [
("00000000000000000000.json", 1588398451.0),
("00000000000000000001.json", 1588484851.0),
("00000000000000000002.json", 1588571251.0),
("00000000000000000003.json", 1588657651.0),
("00000000000000000004.json", 1588744051.0),
("00000000000000000000.json", 1588398451000),
("00000000000000000001.json", 1588484851000),
("00000000000000000002.json", 1588571251000),
("00000000000000000003.json", 1588657651000),
("00000000000000000004.json", 1588744051000),
]

# Need to set both the commit timestamp in each log file and, as a secondary, the modifed time of the file in case the timestamp is not defined
for file_name, dt_epoch in log_mtime_pair:
file_path = os.path.join(log_dir, file_name)
with open(file_path, "r") as out:
contents = out.read()
regex = r'("timestamp"):\s*\d+([\s,}])'
new_contents = re.sub(regex, rf"\1:{dt_epoch}\2", contents)
with open(file_path, "w") as out:
out.write(new_contents)
os.utime(file_path, (dt_epoch, dt_epoch))

table_path = "../crates/deltalake-core/tests/data/simple_table"