Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add methods for constructing LogSegment for Snapshot and for TableChanges #495

Merged
merged 107 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
c7913dc
Move log segment into separate module
OussamaSaoudi-db Oct 28, 2024
6b331ac
Fix tests, make fields pub
OussamaSaoudi-db Oct 28, 2024
f1f9886
Improve comments
OussamaSaoudi-db Oct 28, 2024
8122113
Remove table changes
OussamaSaoudi-db Oct 28, 2024
46185ae
Merge branch 'main' into snapshot_cleanup
OussamaSaoudi-db Oct 28, 2024
471a858
change visibility
OussamaSaoudi-db Oct 28, 2024
4ac35da
Merge branch 'main' into snapshot_cleanup
OussamaSaoudi-db Nov 6, 2024
6297805
Merge branch 'main' into snapshot_cleanup
OussamaSaoudi-db Nov 6, 2024
e569719
Merge remote-tracking branch 'refs/remotes/origin/snapshot_cleanup' i…
OussamaSaoudi-db Nov 6, 2024
5edf4db
Merge branch 'main' into snapshot_cleanup
OussamaSaoudi-db Nov 6, 2024
0b8463a
Remove old log segment
OussamaSaoudi-db Nov 6, 2024
5300a7b
fix failing tests
OussamaSaoudi-db Nov 6, 2024
81d0de0
Get rid of warnings
OussamaSaoudi-db Nov 6, 2024
6b85932
Fix failing tests
OussamaSaoudi-db Nov 6, 2024
1384ea3
Apply suggestions from code review
OussamaSaoudi-db Nov 6, 2024
0182326
Address more pr comments
OussamaSaoudi-db Nov 6, 2024
d053a77
fix imports
OussamaSaoudi-db Nov 6, 2024
52f57e5
rebase onto git changes
OussamaSaoudi-db Nov 6, 2024
aa6c9f4
address nits
OussamaSaoudi-db Nov 6, 2024
dca491c
fix visibility issue
OussamaSaoudi-db Nov 6, 2024
bf5cdd4
Use LogSegmentBuilder
OussamaSaoudi-db Oct 30, 2024
748bab9
Introduce start and end versions
OussamaSaoudi-db Oct 30, 2024
2a6eb3e
Remove old code
OussamaSaoudi-db Nov 1, 2024
841f17f
Fix failing tests
OussamaSaoudi-db Nov 1, 2024
1ce29d8
Most up to date logsegment
OussamaSaoudi-db Nov 6, 2024
a2f9810
Fix failing test and remove unnecessary code
OussamaSaoudi-db Nov 6, 2024
5f7a680
remove table changes from this commit
OussamaSaoudi-db Nov 6, 2024
6d8e35f
remove table_changes
OussamaSaoudi-db Nov 6, 2024
6b78d56
Merge code
OussamaSaoudi-db Nov 8, 2024
c959b1d
Update log segment to latest version
OussamaSaoudi-db Nov 8, 2024
f121f67
Add doc comments
OussamaSaoudi-db Nov 8, 2024
6fbecb7
Fix tests and refactor
OussamaSaoudi-db Nov 9, 2024
7338834
small changes
OussamaSaoudi-db Nov 9, 2024
362900b
Move out checkpoint retain files
OussamaSaoudi-db Nov 9, 2024
de941f7
Merge branch 'main' into list_log_cleanup
OussamaSaoudi-db Nov 9, 2024
8c5a218
Address nits, upgrade to prasedlogpath
OussamaSaoudi-db Nov 11, 2024
a9c529d
merge main
OussamaSaoudi-db Nov 12, 2024
5dcff78
Address ommit
OussamaSaoudi-db Nov 12, 2024
ba079e6
Fix omit
OussamaSaoudi-db Nov 12, 2024
ccaed09
Address nit
OussamaSaoudi-db Nov 12, 2024
62be3d7
Apply suggestions from code review
OussamaSaoudi-db Nov 12, 2024
d22d661
address nit
OussamaSaoudi-db Nov 12, 2024
c7042ce
merge
OussamaSaoudi-db Nov 12, 2024
4b79a33
fix comment
OussamaSaoudi-db Nov 12, 2024
1a2fcb9
Merge branch 'main' into list_log_cleanup
OussamaSaoudi-db Nov 12, 2024
8aec522
Add docs and bring back omit
OussamaSaoudi-db Nov 12, 2024
d88e31e
Apply suggestions from code review
OussamaSaoudi-db Nov 13, 2024
4370737
Address pr comments
OussamaSaoudi-db Nov 13, 2024
6fccf58
Change checkpoint parts
OussamaSaoudi-db Nov 13, 2024
b83c74b
Fix test
OussamaSaoudi-db Nov 13, 2024
87b3247
address more comments
OussamaSaoudi-db Nov 13, 2024
83436d2
Added tests for log segment builder
OussamaSaoudi-db Nov 13, 2024
a7c2461
Add tests
OussamaSaoudi-db Nov 13, 2024
f7f05f2
Address comments
OussamaSaoudi-db Nov 13, 2024
11032a5
small nits
OussamaSaoudi-db Nov 13, 2024
84027cb
change checkpoint parts to checkpoint files
OussamaSaoudi-db Nov 13, 2024
4b7ff17
Make visibility the same between builder and logsegment
OussamaSaoudi-db Nov 13, 2024
4c9c96b
fix comment
OussamaSaoudi-db Nov 13, 2024
a53f38a
Update comments
OussamaSaoudi-db Nov 13, 2024
b0b6514
error message
OussamaSaoudi-db Nov 13, 2024
5654a6d
Merge branch 'main' into list_log_cleanup
OussamaSaoudi-db Nov 13, 2024
2630a9e
Use delta_path_for_version instead of get_path
OussamaSaoudi-db Nov 13, 2024
db5dddb
Add reference import
OussamaSaoudi-db Nov 13, 2024
6f9b459
Merge branch 'main' into list_log_cleanup
OussamaSaoudi-db Nov 13, 2024
a1531ee
Apply suggestions from code review
OussamaSaoudi-db Nov 14, 2024
33c4039
Update kernel/src/log_segment.rs
OussamaSaoudi-db Nov 14, 2024
1f077b0
More nits
OussamaSaoudi-db Nov 14, 2024
bee5659
Merge branch 'main' into list_log_cleanup
OussamaSaoudi-db Nov 14, 2024
9d0a24f
Return dev visibility
OussamaSaoudi-db Nov 14, 2024
f8457d9
make builder methods match builder visibility
OussamaSaoudi-db Nov 14, 2024
9c44a69
Fix doc issue
OussamaSaoudi-db Nov 14, 2024
3a214de
Address nits
OussamaSaoudi-db Nov 14, 2024
d5adae1
Update kernel/src/log_segment.rs
OussamaSaoudi-db Nov 14, 2024
6a5484c
add test, fix bug
OussamaSaoudi-db Nov 14, 2024
1f5b7f1
Merge branch 'list_log_cleanup' of github.com:OussamaSaoudi-db/delta-…
OussamaSaoudi-db Nov 14, 2024
669ae84
New log segment implementation
OussamaSaoudi-db Nov 14, 2024
1fbb563
fix naming
OussamaSaoudi-db Nov 14, 2024
dd7bd7d
add more tests for checkpoint parts
OussamaSaoudi-db Nov 15, 2024
673381f
make sure parts is checked
OussamaSaoudi-db Nov 15, 2024
9873380
naming
OussamaSaoudi-db Nov 15, 2024
b1e12a6
Merge branch 'main' into log_builder_new
OussamaSaoudi-db Nov 15, 2024
04a7ecf
Fix spacing
OussamaSaoudi-db Nov 15, 2024
a16c71c
Fix bug
OussamaSaoudi-db Nov 15, 2024
8cbf2ff
Add doc comment explaining properties
OussamaSaoudi-db Nov 15, 2024
a2ef014
Fix more docs
OussamaSaoudi-db Nov 15, 2024
3995579
Remove expected_end_version from try_new
OussamaSaoudi-db Nov 15, 2024
fc296a4
Fix spacing
OussamaSaoudi-db Nov 15, 2024
6926857
change test util name
OussamaSaoudi-db Nov 15, 2024
e8f3405
Apply suggestions from code review
OussamaSaoudi-db Nov 15, 2024
f31ec8a
Address pr reviews
OussamaSaoudi-db Nov 15, 2024
634661a
Move log segment tests to separate file
OussamaSaoudi-db Nov 15, 2024
b94e2dd
Address more comments
OussamaSaoudi-db Nov 15, 2024
ff8ad65
Make naming consistent
OussamaSaoudi-db Nov 15, 2024
60a8a08
Address more nits
OussamaSaoudi-db Nov 15, 2024
04d2665
Merge branch 'main' into log_builder_new
OussamaSaoudi-db Nov 15, 2024
2f02d19
Update kernel/src/log_segment/mod.rs
OussamaSaoudi-db Nov 15, 2024
38fb7d3
address more pr comments
OussamaSaoudi-db Nov 15, 2024
6a7a624
Merge branch 'log_builder_new' of github.com:OussamaSaoudi-db/delta-k…
OussamaSaoudi-db Nov 15, 2024
13da708
Move back log_segment.rs
OussamaSaoudi-db Nov 15, 2024
8d20941
fix typo
OussamaSaoudi-db Nov 15, 2024
7c45564
Apply suggestions from code review
OussamaSaoudi-db Nov 15, 2024
1718d85
Apply suggestions from code review
OussamaSaoudi-db Nov 15, 2024
5819e49
more comments
OussamaSaoudi-db Nov 15, 2024
ef28095
Merge branch 'log_builder_new' of github.com:OussamaSaoudi-db/delta-k…
OussamaSaoudi-db Nov 15, 2024
e49fdfa
Address nits
OussamaSaoudi-db Nov 15, 2024
b83dafe
fix clippy
OussamaSaoudi-db Nov 15, 2024
358ed16
Merge branch 'main' into log_builder_new
OussamaSaoudi-db Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
361 changes: 288 additions & 73 deletions kernel/src/log_segment.rs
OussamaSaoudi-db marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,184 @@
use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME};
use crate::path::ParsedLogPath;
use crate::schema::SchemaRef;
use crate::{DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileMeta};

use crate::snapshot::CheckpointMetadata;
use crate::utils::require;
use crate::{
DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileSystemClient, Version,
};
use itertools::Itertools;
use std::cmp::Ordering;
use std::convert::identity;
use std::sync::{Arc, LazyLock};
use tracing::warn;
use url::Url;

#[cfg(test)]
mod tests;

/// A [`LogSegment`] represents a contiguous section of the log and is made of checkpoint files
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
/// and commit files and guarantees the following:
/// 1. Commit file versions will not have any gaps between them.
/// 2. If checkpoint(s) is/are present in the range, only commits with versions greater than the most
/// recent checkpoint version are retained. There will not be a gap between the checkpoint
/// version and the first commit version.
OussamaSaoudi-db marked this conversation as resolved.
Show resolved Hide resolved
/// 3. All checkpoint_parts must belong to the same checkpoint version, and must form a complete
/// version. Multi-part checkpoints must have all their parts.
///
/// [`LogSegment`] is used in [`Snapshot`] when built with [`LogSegment::for_snapshot`], and
/// and in `TableChanges` when built with [`LogSegment::for_table_changes`].
///
/// [`Snapshot`]: crate::snapshot::Snapshot
#[derive(Debug)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) struct LogSegment {
pub end_version: Version,
pub log_root: Url,
/// Reverse order sorted commit files in the log segment
pub commit_files: Vec<ParsedLogPath>,
/// checkpoint files in the log segment.
pub checkpoint_files: Vec<ParsedLogPath>,
/// Sorted commit files in the log segment (ascending)
pub ascending_commit_files: Vec<ParsedLogPath>,
/// Checkpoint files in the log segment.
pub checkpoint_parts: Vec<ParsedLogPath>,
}

impl LogSegment {
fn try_new(
ascending_commit_files: Vec<ParsedLogPath>,
checkpoint_parts: Vec<ParsedLogPath>,
log_root: Url,
end_version: Option<Version>,
) -> DeltaResult<Self> {
// We require that commits that are contiguous. In other words, there must be no gap between commit versions.
require!(
ascending_commit_files
.windows(2)
.all(|cfs| cfs[0].version + 1 == cfs[1].version),
Error::generic(format!(
"Expected ordered contiguous commit files {:?}",
ascending_commit_files
))
);

// There must be no gap between a checkpoint and the first commit version. Note that
// that all checkpoint parts share the same version.
if let (Some(checkpoint_file), Some(commit_file)) =
(checkpoint_parts.first(), ascending_commit_files.first())
{
require!(
checkpoint_file.version + 1 == commit_file.version,
Error::generic(format!(
"Gap between checkpoint version {} and next commit {}",
checkpoint_file.version, commit_file.version,
))
)
}

// Get the effective version from chosen files
let version_eff = ascending_commit_files
.last()
.or(checkpoint_parts.first())
.ok_or(Error::generic("No files in log segment"))?
.version;
if let Some(end_version) = end_version {
require!(
version_eff == end_version,
Error::generic(format!(
"LogSegment end version {} not the same as the specified end version {}",
version_eff, end_version
))
);
}
Ok(LogSegment {
end_version: version_eff,
log_root,
ascending_commit_files,
checkpoint_parts,
})
}

/// Constructs a [`LogSegment`] to be used for [`Snapshot`]. For a `Snapshot` at version `n`:
/// Its LogSegment is made of zero or one checkpoint, and all commits between the checkpoint up
/// to and including the end version `n`. Note that a checkpoint may be made of multiple
/// parts. All these parts will have the same checkpoint version.
///
/// The options for constructing a LogSegment for Snapshot are as follows:
/// - `checkpoint_hint`: a `CheckpointMetadata` to start the log segment from (e.g. from reading the `last_checkpoint` file).
/// - `time_travel_version`: The version of the log that the Snapshot will be at.
OussamaSaoudi-db marked this conversation as resolved.
Show resolved Hide resolved
///
/// [`Snapshot`]: crate::snapshot::Snapshot
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) fn for_snapshot(
fs_client: &dyn FileSystemClient,
log_root: Url,
checkpoint_hint: impl Into<Option<CheckpointMetadata>>,
time_travel_version: impl Into<Option<Version>>,
) -> DeltaResult<Self> {
let time_travel_version = time_travel_version.into();

let (mut ascending_commit_files, checkpoint_parts) =
match (checkpoint_hint.into(), time_travel_version) {
(Some(cp), None) => {
list_log_files_with_checkpoint(&cp, fs_client, &log_root, None)?
}
(Some(cp), Some(end_version)) if cp.version <= end_version => {
list_log_files_with_checkpoint(&cp, fs_client, &log_root, Some(end_version))?
}
_ => list_log_files_with_version(fs_client, &log_root, None, time_travel_version)?,
};

// Commit file versions must be greater than the most recent checkpoint version if it exists
if let Some(checkpoint_file) = checkpoint_parts.first() {
ascending_commit_files.retain(|log_path| checkpoint_file.version < log_path.version);
}

LogSegment::try_new(
ascending_commit_files,
checkpoint_parts,
log_root,
time_travel_version,
)
}

/// Constructs a [`LogSegment`] to be used for `TableChanges`. For a TableChanges between versions
/// `start_version` and `end_version`: Its LogSegment is made of zero checkpoints and all commits
/// between versions `start_version` (inclusive) and `end_version` (inclusive). If no `end_version`
/// is specified it will be the most recent version by default.
#[allow(unused)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

pub(crate) fn for_table_changes(
fs_client: &dyn FileSystemClient,
log_root: Url,
start_version: Version,
end_version: impl Into<Option<Version>>,
) -> DeltaResult<Self> {
let end_version = end_version.into();
if let Some(end_version) = end_version {
if start_version > end_version {
return Err(Error::generic(
"Failed to build LogSegment: start_version cannot be greater than end_version",
));
}
}

let ascending_commit_files: Vec<_> =
list_log_files(fs_client, &log_root, start_version, end_version)?
.filter_ok(|x| x.is_commit())
.try_collect()?;

// - Here check that the start version is correct.
// - [`LogSegment::try_new`] will verify that the `end_version` is correct if present.
// - [`LogSegment::try_new`] also checks that there are no gaps between commits.
// If all three are satisfied, this implies that all the desired commits are present.
require!(
ascending_commit_files
.first()
.is_some_and(|first_commit| first_commit.version == start_version),
Error::generic(format!(
"Expected the first commit to have version {}",
start_version
))
);
LogSegment::try_new(ascending_commit_files, vec![], log_root, end_version)
}
/// Read a stream of log data from this log segment.
///
/// The log files will be read from most recent to oldest.
Expand All @@ -40,30 +201,27 @@ impl LogSegment {
checkpoint_read_schema: SchemaRef,
meta_predicate: Option<ExpressionRef>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
// `replay` expects commit files to be sorted in descending order, so we reverse the sorted
// commit files
let commit_files: Vec<_> = self
.ascending_commit_files
.iter()
.rev()
.map(|f| f.location.clone())
.collect();
let commit_stream = engine
.get_json_handler()
.read_json_files(
&self
.commit_files
.iter()
.map(|f| f.location.clone())
.collect::<Vec<FileMeta>>(),
commit_read_schema,
meta_predicate.clone(),
)?
.read_json_files(&commit_files, commit_read_schema, meta_predicate.clone())?
.map_ok(|batch| (batch, true));

let checkpoint_parts: Vec<_> = self
.checkpoint_parts
.iter()
.map(|f| f.location.clone())
.collect();
let checkpoint_stream = engine
.get_parquet_handler()
.read_parquet_files(
&self
.checkpoint_files
.iter()
.map(|f| f.location.clone())
.collect::<Vec<FileMeta>>(),
checkpoint_read_schema,
meta_predicate,
)?
.read_parquet_files(&checkpoint_parts, checkpoint_read_schema, meta_predicate)?
.map_ok(|batch| (batch, false));

Ok(commit_stream.chain(checkpoint_stream))
Expand Down Expand Up @@ -112,54 +270,111 @@ impl LogSegment {
}
}

#[cfg(test)]
mod tests {
use std::path::PathBuf;

use itertools::Itertools;

use crate::{engine::sync::SyncEngine, Table};

// NOTE: In addition to testing the meta-predicate for metadata replay, this test also verifies
// that the parquet reader properly infers nullcount = rowcount for missing columns. The two
// checkpoint part files that contain transaction app ids have truncated schemas that would
// otherwise fail skipping due to their missing nullcount stat:
//
// Row group 0: count: 1 total(compressed): 111 B total(uncompressed):107 B
// --------------------------------------------------------------------------------
// type nulls min / max
// txn.appId BINARY 0 "3ae45b72-24e1-865a-a211-3..." / "3ae45b72-24e1-865a-a211-3..."
// txn.version INT64 0 "4390" / "4390"
#[test]
fn test_replay_for_metadata() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();

let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let data: Vec<_> = snapshot
.log_segment
.replay_for_metadata(&engine)
.unwrap()
.try_collect()
.unwrap();

// The checkpoint has five parts, each containing one action:
// 1. txn (physically missing P&M columns)
// 2. metaData
// 3. protocol
// 4. add
// 5. txn (physically missing P&M columns)
//
// The parquet reader should skip parts 1, 3, and 5. Note that the actual `read_metadata`
// always skips parts 4 and 5 because it terminates the iteration after finding both P&M.
//
// NOTE: Each checkpoint part is a single-row file -- guaranteed to produce one row group.
//
// WARNING: https://github.com/delta-incubator/delta-kernel-rs/issues/434 -- We currently
// read parts 1 and 5 (4 in all instead of 2) because row group skipping is disabled for
// missing columns, but can still skip part 3 because has valid nullcount stats for P&M.
assert_eq!(data.len(), 4);
/// Returns a fallible iterator of [`ParsedLogPath`] that are between the provided `start_version` (inclusive)
/// and `end_version` (inclusive). [`ParsedLogPath`] may be a commit or a checkpoint. If `start_version` is
/// not specified, the files will begin from version number 0. If `end_version` is not specified, files up to
/// the most recent version will be included.
///
/// Note: this calls [`FileSystemClient::list_from`] to get the list of log files.
fn list_log_files(
fs_client: &dyn FileSystemClient,
log_root: &Url,
start_version: impl Into<Option<Version>>,
end_version: impl Into<Option<Version>>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ParsedLogPath>>> {
let start_version = start_version.into().unwrap_or(0);
let end_version = end_version.into();
let version_prefix = format!("{:020}", start_version);
let start_from = log_root.join(&version_prefix)?;

Ok(fs_client
.list_from(&start_from)?
.map(|meta| ParsedLogPath::try_from(meta?))
// TODO this filters out .crc files etc which start with "." - how do we want to use these kind of files?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we even use or care about .crc files? They seem to be something pyspark associates with every write? Would we ever try to read one?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right now, no. this is a comment that was moved over from old code - perhaps we leave this as-is and revisit in the future? (and perhaps the answer is we do nothing and just document that)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OussamaSaoudi-db can you create an issue for follow-up?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like #496 ? thanks!

.filter_map_ok(identity)
.take_while(move |path_res| match path_res {
Ok(path) => !end_version.is_some_and(|end_version| end_version < path.version),
Err(_) => true,
}))
}
/// List all commit and checkpoint files with versions above the provided `start_version` (inclusive).
/// If successful, this returns a tuple `(ascending_commit_files, checkpoint_parts)` of type
/// `(Vec<ParsedLogPath>, Vec<ParsedLogPath>)`. The commit files are guaranteed to be sorted in
/// ascending order by version. The elements of `checkpoint_parts` are all the parts of the same
/// checkpoint. Checkpoint parts share the same version.
fn list_log_files_with_version(
fs_client: &dyn FileSystemClient,
log_root: &Url,
start_version: Option<Version>,
end_version: Option<Version>,
) -> DeltaResult<(Vec<ParsedLogPath>, Vec<ParsedLogPath>)> {
// We expect 10 commit files per checkpoint, so start with that size. We could adjust this based
// on config at some point
let mut commit_files = Vec::with_capacity(10);
let mut checkpoint_parts = vec![];
let mut max_checkpoint_version = start_version;

for parsed_path in list_log_files(fs_client, log_root, start_version, end_version)? {
let parsed_path = parsed_path?;
if parsed_path.is_commit() {
commit_files.push(parsed_path);
} else if parsed_path.is_checkpoint() {
let path_version = parsed_path.version;
match max_checkpoint_version {
None => {
checkpoint_parts.push(parsed_path);
max_checkpoint_version = Some(path_version);
}
Some(checkpoint_version) => match path_version.cmp(&checkpoint_version) {
Ordering::Greater => {
max_checkpoint_version = Some(path_version);
checkpoint_parts.clear();
checkpoint_parts.push(parsed_path);
}
Ordering::Equal => checkpoint_parts.push(parsed_path),
Ordering::Less => {}
},
}
}
}

Ok((commit_files, checkpoint_parts))
}

/// List all commit and checkpoint files after the provided checkpoint. It is guaranteed that all
/// the returned [`ParsedLogPath`]s will have a version less than or equal to the `end_version`.
/// See [`list_log_files_with_version`] for details on the return type.
fn list_log_files_with_checkpoint(
checkpoint_metadata: &CheckpointMetadata,
fs_client: &dyn FileSystemClient,
log_root: &Url,
end_version: Option<Version>,
) -> DeltaResult<(Vec<ParsedLogPath>, Vec<ParsedLogPath>)> {
let (commit_files, checkpoint_parts) = list_log_files_with_version(
fs_client,
log_root,
Some(checkpoint_metadata.version),
end_version,
)?;

let Some(latest_checkpoint) = checkpoint_parts.last() else {
// TODO: We could potentially recover here
return Err(Error::generic(
"Had a _last_checkpoint hint but didn't find any checkpoints",
));
};
if latest_checkpoint.version != checkpoint_metadata.version {
warn!(
"_last_checkpoint hint is out of date. _last_checkpoint version: {}. Using actual most recent: {}",
checkpoint_metadata.version,
latest_checkpoint.version
);
} else if checkpoint_parts.len() != checkpoint_metadata.parts.unwrap_or(1) {
return Err(Error::Generic(format!(
"_last_checkpoint indicated that checkpoint should have {} parts, but it has {}",
checkpoint_metadata.parts.unwrap_or(1),
checkpoint_parts.len()
)));
}
Ok((commit_files, checkpoint_parts))
}
Loading
Loading