Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle unrecognized file types in Delta log #575

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
8 changes: 6 additions & 2 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,13 @@ fn list_log_files(

Ok(fs_client
.list_from(&start_from)?
.map(|meta| ParsedLogPath::try_from(meta?))
// TODO this filters out .crc files etc which start with "." - how do we want to use these kind of files?
.map(|meta| meta.map(|m| ParsedLogPath::try_from(m).ok().and_then(identity)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nit: .and_then(identity) is flattening a nested option right? Could use .flatten() in that case.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch, i actually didn't know about this -- will update

.filter_map_ok(identity)
.filter(|path_res| {
path_res
.as_ref()
.map_or(true, |path| path.is_commit() || path.is_checkpoint())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This keeps commit/checkpoint (map) and errors (or), right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that's right

})
.take_while(move |path_res| match path_res {
Ok(path) => !end_version.is_some_and(|end_version| end_version < path.version),
Err(_) => true,
Expand Down
40 changes: 40 additions & 0 deletions kernel/src/log_segment/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,3 +511,43 @@ fn table_changes_fails_with_larger_start_version_than_end() {
let log_segment_res = LogSegment::for_table_changes(client.as_ref(), log_root, 1, Some(0));
assert!(log_segment_res.is_err());
}

#[test]
fn test_list_files_with_unusual_patterns() {
// Create paths for all the unusual patterns
let paths = vec![
// hex instead of decimal
Path::from("_delta_log/00000000deadbeef.commit.json"),
// bogus part numbering
Path::from("_delta_log/00000000000000000000.checkpoint.0000000010.0000000000.parquet"),
// v2 checkpoint
Path::from(
"_delta_log/00000000000000000010.checkpoint.80a083e8-7026-4e79-81be-64bd76c43a11.json",
),
// compacted log file
Path::from("_delta_log/00000000000000000004.00000000000000000006.compacted.json"),
// CRC file
Path::from("_delta_log/00000000000000000001.crc"),
// Valid commit file for comparison
Path::from("_delta_log/00000000000000000002.json"),
];

let (client, log_root) = build_log_with_paths_and_checkpoint(&paths, None);

// Test that list_log_files doesn't fail
let result = super::list_log_files(&*client, &log_root, None, None);
assert!(result.is_ok(), "list_log_files should not fail");

// Collect the results and verify
let paths: Vec<_> = result.unwrap().try_collect().unwrap();

// We should find exactly one file (the valid commit)
assert_eq!(paths.len(), 1, "Should find exactly one valid file");

// Verify that the valid commit file is included
let has_valid_commit = paths.iter().any(|p| p.version == 2 && p.is_commit());
assert!(has_valid_commit, "Should find the valid commit file");

// All other files should have been filtered out by ParsedLogPath::try_from
// and the filter_map_ok(identity) call
}
Loading