diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 5e8a6222d..c6d4b0024 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -127,40 +127,50 @@ impl LogSegment { } /// Builder for [`LogSegment`] from from `start_version` to `end_version` inclusive -pub(crate) struct LogSegmentBuilder<'a> { - fs_client: &'a dyn FileSystemClient, - table_root: &'a Url, +pub(crate) struct LogSegmentBuilder { start_checkpoint: Option, start_version: Option, end_version: Option, - /// When `commit_files_sorted_ascending` is set to `true`, the commit files are sorted in + /// When `sort_commit_files_ascending` is set to `true`, the commit files are sorted in /// ascending order. Otherwise if it is set to `false`, the commit files are sorted in /// descending order. This is set to `false` by default. - commit_files_sorted_ascending: bool, + sort_commit_files_ascending: bool, omit_checkpoint_parts: bool, } -impl<'a> LogSegmentBuilder<'a> { - pub(crate) fn new(fs_client: &'a dyn FileSystemClient, table_root: &'a Url) -> Self { +impl LogSegmentBuilder { + pub(crate) fn new() -> Self { LogSegmentBuilder { - fs_client, - table_root, start_checkpoint: None, start_version: None, end_version: None, - commit_files_sorted_ascending: false, + sort_commit_files_ascending: false, omit_checkpoint_parts: false, } } - /// Optionally provide checkpoint metadata to start the log segment from (e.g. from reading the `last_checkpoint` file). + /// Provide checkpoint metadata to start the log segment from (e.g. from reading the `last_checkpoint` file). /// /// Note: Either `start_version` or `start_checkpoint` may be specified. Attempting to build a [`LogSegment`] /// with both will result in an error. + #[allow(unused)] pub(crate) fn with_start_checkpoint(mut self, start_checkpoint: CheckpointMetadata) -> Self { self.start_checkpoint = Some(start_checkpoint); self } - /// Optionally set the start version of the [`LogSegment`]. This ensures that all commit files + /// Provide checkpoint metadata to start the log segment. See [`LogSegmentBuilder::with_start_checkpoint`] + /// for details. If `start_checkpoint` is `None`, this is a no-op. + /// + /// Note: Either `start_version` or `start_checkpoint` may be specified. Attempting to build a [`LogSegment`] + /// with both will result in an error. + pub(crate) fn with_start_checkpoint_opt( + mut self, + start_checkpoint: Option, + ) -> Self { + self.start_checkpoint = start_checkpoint; + self + } + + /// Provide a `start_version` (inclusive) of the [`LogSegment`] that ensures that all commit files /// are at this version or above it. /// /// Note: Either `start_version` or `start_checkpoint` may be specified. Attempting to build a [`LogSegment`] @@ -170,51 +180,69 @@ impl<'a> LogSegmentBuilder<'a> { self.start_version = Some(version); self } - /// Optionally set the end version (inclusive) of the [`LogSegment`]. This ensures that all commit files - /// and checkpoints are at or below the end version. + /// Optionally provide the `start_version` of the [`LogSegment`]. See [`LogSegmentBuilder::with_start_version`] + /// for details. If `start_version` is `None`, this is a no-op. + #[allow(unused)] + pub(crate) fn with_start_version_opt(mut self, version: Option) -> Self { + self.start_version = version; + self + } + + /// Provide an `end_version` (inclusive) of the [`LogSegment`]. This ensures that all commit and + /// checkpoint files are at or below the end version. + #[allow(unused)] pub(crate) fn with_end_version(mut self, version: Version) -> Self { self.end_version = Some(version); self } - /// Optionally specify that the [`LogSegment`] will not have any checkpoint files. It will only - /// be made up of commit files. + /// Optionally set the end version (inclusive) of the [`LogSegment`]. See [`LogSegmentBuilder::with_end_version`] + /// for details. If `end_version` is `None`, this is a no-op. + pub(crate) fn with_end_version_opt(mut self, version: Option) -> Self { + self.end_version = version; + self + } + + /// Specify that the [`LogSegment`] will not have any checkpoint files. It will only be made + /// up of commit files. #[allow(unused)] pub(crate) fn with_omit_checkpoint_parts(mut self) -> Self { self.omit_checkpoint_parts = true; self } - /// Optionally specify that the commits in the [`LogSegment`] will be sorted by version in ascending - /// order. By default, commits are sorted in descending order of versions. + /// Specify that the commits in the [`LogSegment`] will be sorted by version in ascending + /// order. By default, commits are sorted by version in descending order. #[allow(unused)] - pub(crate) fn with_commit_files_sorted_ascending(mut self) -> Self { - self.commit_files_sorted_ascending = true; + pub(crate) fn with_sort_commit_files_ascending(mut self) -> Self { + self.sort_commit_files_ascending = true; self } /// Build the [`LogSegment`] /// /// This fetches checkpoint and commit files using the `fs_client`. - pub(crate) fn build(self) -> DeltaResult { + pub(crate) fn build( + self, + fs_client: &dyn FileSystemClient, + table_root: &Url, + ) -> DeltaResult { if self.start_version.is_some() && self.start_checkpoint.is_some() { return Err(Error::generic("Failed to build LogSegment: Cannot specify both start_version and start_checkpoint")); } let Self { - fs_client, - table_root, start_checkpoint, start_version, end_version, - commit_files_sorted_ascending, + sort_commit_files_ascending, omit_checkpoint_parts, } = self; let log_root = table_root.join("_delta_log/").unwrap(); let (mut sorted_commit_files, mut checkpoint_parts) = match (start_checkpoint, end_version) { - (Some(cp), None) => Self::list_log_files_with_checkpoint(&cp, fs_client, &log_root)?, + (Some(cp), None) => list_log_files_with_checkpoint(&cp, fs_client, &log_root)?, (Some(cp), Some(version)) if cp.version <= version => { - Self::list_log_files_with_checkpoint(&cp, fs_client, &log_root)? + list_log_files_with_checkpoint(&cp, fs_client, &log_root)? } - _ => Self::list_log_files_from_version(fs_client, &log_root, None)?, + _ => list_log_files_from_version(fs_client, &log_root, None)?, }; if omit_checkpoint_parts { @@ -256,9 +284,9 @@ impl<'a> LogSegmentBuilder<'a> { ); } - // We assume commit files are sorted in ascending order. If `commit_files_sorted_ascending` + // We assume commit files are sorted in ascending order. If `sort_commit_files_ascending` // is false, reverse to make it descending. - if !commit_files_sorted_ascending { + if !sort_commit_files_ascending { sorted_commit_files.reverse(); } @@ -269,96 +297,94 @@ impl<'a> LogSegmentBuilder<'a> { checkpoint_parts, }) } +} - pub(crate) fn list_log_files_from_version( - fs_client: &dyn FileSystemClient, - log_root: &Url, - version: Option, - ) -> DeltaResult<(Vec, Vec)> { - let begin_version = version.unwrap_or(0); - let version_prefix = format!("{:020}", begin_version); - let start_from = log_root.join(&version_prefix)?; - - let mut max_checkpoint_version = version; - let mut checkpoint_parts = vec![]; - // We expect 10 commit files per checkpoint, so start with that size. We could adjust this based - // on config at some point - let mut commit_files = Vec::with_capacity(10); - - for meta_res in fs_client.list_from(&start_from)? { - let meta = meta_res?; - // TODO this filters out .crc files etc which start with "." - how do we want to use these kind of files? - let Some(parsed_path) = ParsedLogPath::try_from(meta)? else { - continue; - }; - if parsed_path.is_commit() { - commit_files.push(parsed_path); - } else if parsed_path.is_checkpoint() { - let path_version = parsed_path.version; - match max_checkpoint_version { - None => { - checkpoint_parts.push(parsed_path); +fn list_log_files_from_version( + fs_client: &dyn FileSystemClient, + log_root: &Url, + version: Option, +) -> DeltaResult<(Vec, Vec)> { + let begin_version = version.unwrap_or(0); + let version_prefix = format!("{:020}", begin_version); + let start_from = log_root.join(&version_prefix)?; + + let mut max_checkpoint_version = version; + let mut checkpoint_parts = vec![]; + // We expect 10 commit files per checkpoint, so start with that size. We could adjust this based + // on config at some point + let mut commit_files = Vec::with_capacity(10); + + for meta_res in fs_client.list_from(&start_from)? { + let meta = meta_res?; + // TODO this filters out .crc files etc which start with "." - how do we want to use these kind of files? + let Some(parsed_path) = ParsedLogPath::try_from(meta)? else { + continue; + }; + if parsed_path.is_commit() { + commit_files.push(parsed_path); + } else if parsed_path.is_checkpoint() { + let path_version = parsed_path.version; + match max_checkpoint_version { + None => { + checkpoint_parts.push(parsed_path); + max_checkpoint_version = Some(path_version); + } + Some(checkpoint_version) => match path_version.cmp(&checkpoint_version) { + Ordering::Greater => { max_checkpoint_version = Some(path_version); + checkpoint_parts.clear(); + checkpoint_parts.push(parsed_path); } - Some(checkpoint_version) => match path_version.cmp(&checkpoint_version) { - Ordering::Greater => { - max_checkpoint_version = Some(path_version); - checkpoint_parts.clear(); - checkpoint_parts.push(parsed_path); - } - Ordering::Equal => checkpoint_parts.push(parsed_path), - Ordering::Less => {} - }, - } + Ordering::Equal => checkpoint_parts.push(parsed_path), + Ordering::Less => {} + }, } } - - debug_assert!( - commit_files - .windows(2) - .all(|cfs| cfs[0].version <= cfs[1].version), - "fs_client.list_from() didn't return a sorted listing! {:?}", - commit_files - ); - - Ok((commit_files, checkpoint_parts)) } - /// List all log files after a given checkpoint. - pub(crate) fn list_log_files_with_checkpoint( - checkpoint_metadata: &CheckpointMetadata, - fs_client: &dyn FileSystemClient, - log_root: &Url, - ) -> DeltaResult<(Vec, Vec)> { - let (commit_files, checkpoint_parts) = Self::list_log_files_from_version( - fs_client, - log_root, - Some(checkpoint_metadata.version), - )?; + debug_assert!( + commit_files + .windows(2) + .all(|cfs| cfs[0].version <= cfs[1].version), + "fs_client.list_from() didn't return a sorted listing! {:?}", + commit_files + ); - let Some(latest_checkpoint) = checkpoint_parts.last() else { - // TODO: We could potentially recover here - return Err(Error::generic( - "Had a _last_checkpoint hint but didn't find any checkpoints", - )); - }; + Ok((commit_files, checkpoint_parts)) +} - if latest_checkpoint.version != checkpoint_metadata.version { - warn!( +/// List all log files after a given checkpoint. +fn list_log_files_with_checkpoint( + checkpoint_metadata: &CheckpointMetadata, + fs_client: &dyn FileSystemClient, + log_root: &Url, +) -> DeltaResult<(Vec, Vec)> { + let (commit_files, checkpoint_parts) = + list_log_files_from_version(fs_client, log_root, Some(checkpoint_metadata.version))?; + + let Some(latest_checkpoint) = checkpoint_parts.last() else { + // TODO: We could potentially recover here + return Err(Error::generic( + "Had a _last_checkpoint hint but didn't find any checkpoints", + )); + }; + + if latest_checkpoint.version != checkpoint_metadata.version { + warn!( "_last_checkpoint hint is out of date. _last_checkpoint version: {}. Using actual most recent: {}", checkpoint_metadata.version, latest_checkpoint.version ); - } else if checkpoint_parts.len() != checkpoint_metadata.parts.unwrap_or(1) { - return Err(Error::Generic(format!( - "_last_checkpoint indicated that checkpoint should have {} parts, but it has {}", - checkpoint_metadata.parts.unwrap_or(1), - checkpoint_parts.len() - ))); - } - Ok((commit_files, checkpoint_parts)) + } else if checkpoint_parts.len() != checkpoint_metadata.parts.unwrap_or(1) { + return Err(Error::Generic(format!( + "_last_checkpoint indicated that checkpoint should have {} parts, but it has {}", + checkpoint_metadata.parts.unwrap_or(1), + checkpoint_parts.len() + ))); } + Ok((commit_files, checkpoint_parts)) } + #[cfg(test)] mod tests { use std::{path::PathBuf, sync::Arc}; @@ -492,9 +518,9 @@ mod tests { Some(&checkpoint_metadata), ); - let log_segment = LogSegmentBuilder::new(client.as_ref(), &table_root) + let log_segment = LogSegmentBuilder::new() .with_start_checkpoint(checkpoint_metadata) - .build() + .build(client.as_ref(), &table_root) .unwrap(); let (commit_files, checkpoint_parts) = (log_segment.commit_files, log_segment.checkpoint_parts); @@ -534,9 +560,9 @@ mod tests { Some(&checkpoint_metadata), ); - let log_segment = LogSegmentBuilder::new(client.as_ref(), &table_root) + let log_segment = LogSegmentBuilder::new() .with_start_checkpoint(checkpoint_metadata) - .build() + .build(client.as_ref(), &table_root) .unwrap(); let (commit_files, checkpoint_parts) = (log_segment.commit_files, log_segment.checkpoint_parts); @@ -566,9 +592,9 @@ mod tests { ], None, ); - let log_segment = LogSegmentBuilder::new(client.as_ref(), &table_root) + let log_segment = LogSegmentBuilder::new() .with_omit_checkpoint_parts() - .build() + .build(client.as_ref(), &table_root) .unwrap(); let (commit_files, checkpoint_parts) = (log_segment.commit_files, log_segment.checkpoint_parts); @@ -603,12 +629,12 @@ mod tests { // -------------------------------------------------------------------------------- // | Specify start version and end version | // -------------------------------------------------------------------------------- - let log_segment = LogSegmentBuilder::new(client.as_ref(), &table_root) + let log_segment = LogSegmentBuilder::new() .with_end_version(5) .with_start_version(2) .with_omit_checkpoint_parts() - .with_commit_files_sorted_ascending() - .build() + .with_sort_commit_files_ascending() + .build(client.as_ref(), &table_root) .unwrap(); let (commit_files, checkpoint_parts) = (log_segment.commit_files, log_segment.checkpoint_parts); @@ -624,10 +650,10 @@ mod tests { // -------------------------------------------------------------------------------- // | Specify no start or end version | // -------------------------------------------------------------------------------- - let log_segment = LogSegmentBuilder::new(client.as_ref(), &table_root) + let log_segment = LogSegmentBuilder::new() .with_omit_checkpoint_parts() - .with_commit_files_sorted_ascending() - .build() + .with_sort_commit_files_ascending() + .build(client.as_ref(), &table_root) .unwrap(); let (commit_files, checkpoint_parts) = (log_segment.commit_files, log_segment.checkpoint_parts); @@ -645,12 +671,12 @@ mod tests { fn test_non_contiguous_log() { let (client, table_root) = build_log_with_paths_and_checkpoint(&[get_path(0, "json"), get_path(2, "json")], None); - let log_segment_res = LogSegmentBuilder::new(client.as_ref(), &table_root).build(); + let log_segment_res = LogSegmentBuilder::new().build(client.as_ref(), &table_root); assert!(log_segment_res.is_err()); } #[test] - fn test_start_version_and_checkpoint() { + fn test_build_with_start_version_and_checkpoint_fails() { let checkpoint_metadata = CheckpointMetadata { version: 3, size: 10, @@ -675,10 +701,10 @@ mod tests { Some(&checkpoint_metadata), ); - let log_segment_res = LogSegmentBuilder::new(client.as_ref(), &table_root) + let log_segment_res = LogSegmentBuilder::new() .with_start_checkpoint(checkpoint_metadata) .with_start_version(5) - .build(); + .build(client.as_ref(), &table_root); assert!(log_segment_res.is_err()); } } diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 4bd8f3986..d6375d51f 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -62,14 +62,10 @@ impl Snapshot { let fs_client = engine.get_file_system_client(); let log_url = table_root.join("_delta_log/").unwrap(); - let mut builder = LogSegmentBuilder::new(fs_client.as_ref(), &table_root); - if let Some(version) = version { - builder = builder.with_end_version(version); - } - if let Some(checkpoint) = read_last_checkpoint(fs_client.as_ref(), &log_url)? { - builder = builder.with_start_checkpoint(checkpoint); - } - let log_segment = builder.build()?; + let log_segment = LogSegmentBuilder::new() + .with_end_version_opt(version) + .with_start_checkpoint_opt(read_last_checkpoint(fs_client.as_ref(), &log_url)?) + .build(fs_client.as_ref(), &table_root)?; Self::try_new_from_log_segment(table_root, log_segment, engine) }