diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 90b5f49b3..5450877ba 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -4,20 +4,42 @@ use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME}; use crate::path::ParsedLogPath; use crate::schema::SchemaRef; -use crate::{DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileMeta}; - +use crate::snapshot::CheckpointMetadata; +use crate::utils::require; +use crate::{ + DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileSystemClient, Version, +}; use itertools::Itertools; +use std::cmp::Ordering; use std::sync::{Arc, LazyLock}; +use tracing::warn; use url::Url; +/// A [`LogSegment`] represents a contiguous section of the log and is made up of checkpoint files +/// and commit files. It is built with `LogSegmentBuilder`, and guarantees the following: +/// 1. Commit/checkpoint file versions will be less than or equal to `end_version` if specified. +/// 2. Commit file versions will be greater than or equal to `start_version` if specified. +/// 3. If checkpoint(s) is/are present in the range, only commits with versions greater than the most +/// recent checkpoint version are retained. Checkpoints can be omitted (and this rule skipped) +/// whenever the `LogSegment` is created. See `LogSegmentBuilder::without_checkpoint_files`. +/// +/// [`LogSegment`] is used in both [`Snapshot`] and in `TableChanges` to hold commit files and +/// checkpoint files. +/// - For a Snapshot at version `n`: Its LogSegment is made up of zero or one checkpoint, and all +/// commits between the checkpoint and the end version `n`. +/// - For a TableChanges between versions `a` and `b`: Its LogSegment is made up of zero +/// checkpoints and all commits between versions `a` and `b` (inclusive) +/// +/// [`Snapshot`]: crate::snapshot::Snapshot #[derive(Debug)] #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] pub(crate) struct LogSegment { + pub end_version: Version, pub log_root: Url, - /// Reverse order sorted commit files in the log segment + /// Sorted commit files in the log segment (see builder for sorting) pub commit_files: Vec, - /// checkpoint files in the log segment. - pub checkpoint_files: Vec, + /// Checkpoint files in the log segment. + pub checkpoint_parts: Vec, } impl LogSegment { @@ -40,30 +62,24 @@ impl LogSegment { checkpoint_read_schema: SchemaRef, meta_predicate: Option, ) -> DeltaResult, bool)>> + Send> { + let commit_files: Vec<_> = self + .commit_files + .iter() + .map(|f| f.location.clone()) + .collect(); let commit_stream = engine .get_json_handler() - .read_json_files( - &self - .commit_files - .iter() - .map(|f| f.location.clone()) - .collect::>(), - commit_read_schema, - meta_predicate.clone(), - )? + .read_json_files(&commit_files, commit_read_schema, meta_predicate.clone())? .map_ok(|batch| (batch, true)); + let checkpoint_parts: Vec<_> = self + .checkpoint_parts + .iter() + .map(|f| f.location.clone()) + .collect(); let checkpoint_stream = engine .get_parquet_handler() - .read_parquet_files( - &self - .checkpoint_files - .iter() - .map(|f| f.location.clone()) - .collect::>(), - checkpoint_read_schema, - meta_predicate, - )? + .read_parquet_files(&checkpoint_parts, checkpoint_read_schema, meta_predicate)? .map_ok(|batch| (batch, false)); Ok(commit_stream.chain(checkpoint_stream)) @@ -112,13 +128,280 @@ impl LogSegment { } } +/// Builder for [`LogSegment`] from from `start_version` to `end_version` inclusive +#[derive(Default)] +pub(crate) struct LogSegmentBuilder { + start_checkpoint: Option, + start_version: Option, + end_version: Option, + /// When `sort_commit_files_ascending` is set to `true`, the commit files are sorted in + /// ascending order. Otherwise if it is set to `false`, the commit files are sorted in + /// descending order. This is set to `false` by default. + sort_commit_files_ascending: bool, + without_checkpoint_files: bool, +} +impl LogSegmentBuilder { + pub(crate) fn new() -> Self { + Self::default() + } + /// Provide a checkpoint metadata to start the log segment from (e.g. from reading the `last_checkpoint` file). + /// + /// Note: Either `start_version` or `start_checkpoint` may be specified. Attempting to build a [`LogSegment`] + /// with both will result in an error. + #[allow(unused)] + pub(crate) fn with_start_checkpoint( + mut self, + start_checkpoint: impl Into>, + ) -> Self { + self.start_checkpoint = start_checkpoint.into(); + self + } + /// Provide a `start_version` (inclusive) of the [`LogSegment`] that ensures that all commit files + /// are at this version or above it. + /// + /// Note: Either `start_version` or `start_checkpoint` may be specified. Attempting to build a [`LogSegment`] + /// with both will result in an error. + #[allow(unused)] + pub(crate) fn with_start_version(mut self, version: impl Into>) -> Self { + self.start_version = version.into(); + self + } + /// Provide an `end_version` (inclusive) of the [`LogSegment`]. This ensures that all commit and + /// checkpoint files are at or below the end version. + #[allow(unused)] + pub(crate) fn with_end_version(mut self, version: impl Into>) -> Self { + self.end_version = version.into(); + self + } + /// Specify that the [`LogSegment`] will not have any checkpoint files. It will only be made + /// up of commit files. + #[allow(unused)] + pub(crate) fn without_checkpoint_files(mut self) -> Self { + self.without_checkpoint_files = true; + self + } + /// Specify that the commits in the [`LogSegment`] will be sorted by version in ascending + /// order. By default, commits are sorted by version in descending order. + #[allow(unused)] + pub(crate) fn with_sort_commit_files_ascending(mut self) -> Self { + self.sort_commit_files_ascending = true; + self + } + /// Build the [`LogSegment`] + /// + /// This fetches checkpoint and commit files using the `fs_client`. + pub(crate) fn build( + self, + fs_client: &dyn FileSystemClient, + table_root: &Url, + ) -> DeltaResult { + if self.start_version.is_some() && self.start_checkpoint.is_some() { + return Err(Error::generic("Failed to build LogSegment: Cannot specify both start_version and start_checkpoint")); + } + if let (Some(start_version), Some(end_version)) = (self.start_version, self.end_version) { + if start_version > end_version { + return Err(Error::generic("Failed to build LogSegment: `start_version` cannot be greater than end_version")); + } + } + let Self { + start_checkpoint, + start_version, + end_version, + sort_commit_files_ascending, + without_checkpoint_files, + } = self; + let log_root = table_root.join("_delta_log/").unwrap(); + let (mut sorted_commit_files, mut checkpoint_parts) = + match (start_checkpoint, start_version, end_version) { + (Some(cp), None, None) => { + list_log_files_with_checkpoint(&cp, fs_client, &log_root, end_version)? + } + (Some(cp), None, Some(end_version)) if cp.version <= end_version => { + list_log_files_with_checkpoint(&cp, fs_client, &log_root, Some(end_version))? + } + (None, Some(start_version), _) => list_log_files_with_version( + fs_client, + &log_root, + Some(start_version), + end_version, + )?, + _ => list_log_files_with_version(fs_client, &log_root, None, end_version)?, + }; + + if without_checkpoint_files { + checkpoint_parts.clear(); + } + + // Commit file versions must satisfy the following: + // - Be greater than or equal to the start version + // - Be greater than the most recent checkpoint version if it exists + // - Be less than or equal to the end version. + if let Some(start_version) = start_version { + sorted_commit_files.retain(|log_path| log_path.version >= start_version); + } + if let Some(end_version) = end_version { + sorted_commit_files.retain(|log_path| log_path.version <= end_version); + } + if let Some(checkpoint_file) = checkpoint_parts.first() { + sorted_commit_files.retain(|log_path| checkpoint_file.version < log_path.version); + } + + // After (possibly) omitting checkpoint files and filtering commits, we should have commits that are + // contiguous. In other words, there must be no gap between commit versions. + let ordered_commits = sorted_commit_files + .windows(2) + .all(|cfs| cfs[0].version + 1 == cfs[1].version); + if !ordered_commits { + return Err(Error::generic("Expected ordered contiguous commit files")); + } + + // get the effective version from chosen files + let version_eff = sorted_commit_files + .last() + .or(checkpoint_parts.first()) + .ok_or(Error::MissingVersion)? // TODO: A more descriptive error + .version; + if let Some(end_version) = end_version { + require!( + version_eff == end_version, + Error::generic(format!( + "version effective not the same as end_version {}, {}", + version_eff, end_version + )) // TODO more descriptive error + ); + } + + // We assume commit files are sorted in ascending order. If `sort_commit_files_ascending` + // is false, reverse to make it descending. + if !sort_commit_files_ascending { + sorted_commit_files.reverse(); + } + + Ok(LogSegment { + end_version: version_eff, + log_root, + commit_files: sorted_commit_files, + checkpoint_parts, + }) + } +} + +/// List all commit and checkpoint files with versions above the provided `version`. If successful, this returns +/// a tuple `(sorted_commit_files_paths, checkpoint_parts): (Vec, Vec)`. +/// The commit files are guaranteed to be sorted in ascending order by version. The elements of +/// `checkpoint_parts` are all the parts of the same checkpoint. Checkpoint parts share the same +/// version. +fn list_log_files_with_version( + fs_client: &dyn FileSystemClient, + log_root: &Url, + version: Option, + end_version: Option, +) -> DeltaResult<(Vec, Vec)> { + let begin_version = version.unwrap_or(0); + let version_prefix = format!("{:020}", begin_version); + let start_from = log_root.join(&version_prefix)?; + + let mut max_checkpoint_version = version; + let mut checkpoint_parts = vec![]; + // We expect 10 commit files per checkpoint, so start with that size. We could adjust this based + // on config at some point + let mut commit_files = Vec::with_capacity(10); + + for meta_res in fs_client.list_from(&start_from)? { + let meta = meta_res?; + // TODO this filters out .crc files etc which start with "." - how do we want to use these kind of files? + let Some(parsed_path) = ParsedLogPath::try_from(meta)? else { + continue; + }; + if parsed_path.is_commit() { + commit_files.push(parsed_path); + } else if parsed_path.is_checkpoint() { + let path_version = parsed_path.version; + if end_version.is_some_and(|end_version| path_version > end_version) { + continue; + } + match max_checkpoint_version { + None => { + checkpoint_parts.push(parsed_path); + max_checkpoint_version = Some(path_version); + } + Some(checkpoint_version) => match path_version.cmp(&checkpoint_version) { + Ordering::Greater => { + max_checkpoint_version = Some(path_version); + checkpoint_parts.clear(); + checkpoint_parts.push(parsed_path); + } + Ordering::Equal => checkpoint_parts.push(parsed_path), + Ordering::Less => {} + }, + } + } + } + + debug_assert!( + commit_files + .windows(2) + .all(|cfs| cfs[0].version <= cfs[1].version), + "fs_client.list_from() didn't return a sorted listing! {:?}", + commit_files + ); + + Ok((commit_files, checkpoint_parts)) +} + +/// List all commit and checkpoint files after the provided checkpoint. +/// See [`list_log_files_with_version`] for details on the return type. +fn list_log_files_with_checkpoint( + checkpoint_metadata: &CheckpointMetadata, + fs_client: &dyn FileSystemClient, + log_root: &Url, + end_version: Option, +) -> DeltaResult<(Vec, Vec)> { + let (commit_files, checkpoint_parts) = list_log_files_with_version( + fs_client, + log_root, + Some(checkpoint_metadata.version), + end_version, + )?; + + let Some(latest_checkpoint) = checkpoint_parts.last() else { + // TODO: We could potentially recover here + return Err(Error::generic( + "Had a _last_checkpoint hint but didn't find any checkpoints", + )); + }; + + if latest_checkpoint.version != checkpoint_metadata.version { + warn!( + "_last_checkpoint hint is out of date. _last_checkpoint version: {}. Using actual most recent: {}", + checkpoint_metadata.version, + latest_checkpoint.version + ); + } else if checkpoint_parts.len() != checkpoint_metadata.parts.unwrap_or(1) { + return Err(Error::Generic(format!( + "_last_checkpoint indicated that checkpoint should have {} parts, but it has {}", + checkpoint_metadata.parts.unwrap_or(1), + checkpoint_parts.len() + ))); + } + Ok((commit_files, checkpoint_parts)) +} + #[cfg(test)] mod tests { - use std::path::PathBuf; + use std::{path::PathBuf, sync::Arc}; use itertools::Itertools; + use object_store::{memory::InMemory, path::Path, ObjectStore}; + use url::Url; - use crate::{engine::sync::SyncEngine, Table}; + use crate::engine::default::executor::tokio::TokioBackgroundExecutor; + use crate::engine::default::filesystem::ObjectStoreFileSystemClient; + use crate::engine::sync::SyncEngine; + use crate::log_segment::LogSegmentBuilder; + use crate::snapshot::CheckpointMetadata; + use crate::{FileSystemClient, Table}; + use test_utils::delta_path_for_version; // NOTE: In addition to testing the meta-predicate for metadata replay, this test also verifies // that the parquet reader properly infers nullcount = rowcount for missing columns. The two @@ -162,4 +445,330 @@ mod tests { // missing columns, but can still skip part 3 because has valid nullcount stats for P&M. assert_eq!(data.len(), 4); } + + // Utility method to build a log using a list of log paths and an optional checkpoint hint. The + // CheckpointMetadata is written to `_delta_log/_last_checkpoint`. + fn build_log_with_paths_and_checkpoint( + paths: &[Path], + checkpoint_metadata: Option<&CheckpointMetadata>, + ) -> (Box, Url) { + let store = Arc::new(InMemory::new()); + + let data = bytes::Bytes::from("kernel-data"); + + // add log files to store + tokio::runtime::Runtime::new() + .expect("create tokio runtime") + .block_on(async { + for path in paths { + store + .put(path, data.clone().into()) + .await + .expect("put log file in store"); + } + if let Some(checkpoint_metadata) = checkpoint_metadata { + let checkpoint_str = + serde_json::to_string(checkpoint_metadata).expect("Serialize checkpoint"); + store + .put( + &Path::from("_delta_log/_last_checkpoint"), + checkpoint_str.into(), + ) + .await + .expect("Write _last_checkpoint"); + } + }); + + let client = ObjectStoreFileSystemClient::new( + store, + false, // don't have ordered listing + Path::from("/"), + Arc::new(TokioBackgroundExecutor::new()), + ); + + let table_root = Url::parse("memory:///").expect("valid url"); + (Box::new(client), table_root) + } + + #[test] + fn test_read_log_with_out_of_date_last_checkpoint() { + let checkpoint_metadata = CheckpointMetadata { + version: 3, + size: 10, + parts: None, + size_in_bytes: None, + num_of_add_files: None, + checkpoint_schema: None, + checksum: None, + }; + + let (client, table_root) = build_log_with_paths_and_checkpoint( + &[ + delta_path_for_version(0, "json"), + delta_path_for_version(1, "checkpoint.parquet"), + delta_path_for_version(2, "json"), + delta_path_for_version(3, "checkpoint.parquet"), + delta_path_for_version(4, "json"), + delta_path_for_version(5, "checkpoint.parquet"), + delta_path_for_version(6, "json"), + delta_path_for_version(7, "json"), + ], + Some(&checkpoint_metadata), + ); + + let log_segment = LogSegmentBuilder::new() + .with_start_checkpoint(checkpoint_metadata) + .build(client.as_ref(), &table_root) + .unwrap(); + let (commit_files, checkpoint_parts) = + (log_segment.commit_files, log_segment.checkpoint_parts); + + assert_eq!(checkpoint_parts.len(), 1); + assert_eq!(commit_files.len(), 2); + assert_eq!(checkpoint_parts[0].version, 5); + assert_eq!(commit_files[0].version, 7); + assert_eq!(commit_files[1].version, 6); + } + #[test] + fn test_read_log_with_correct_last_checkpoint() { + let checkpoint_metadata = CheckpointMetadata { + version: 5, + size: 10, + parts: None, + size_in_bytes: None, + num_of_add_files: None, + checkpoint_schema: None, + checksum: None, + }; + + let (client, table_root) = build_log_with_paths_and_checkpoint( + &[ + delta_path_for_version(0, "json"), + delta_path_for_version(1, "checkpoint.parquet"), + delta_path_for_version(1, "json"), + delta_path_for_version(2, "json"), + delta_path_for_version(3, "checkpoint.parquet"), + delta_path_for_version(3, "json"), + delta_path_for_version(4, "json"), + delta_path_for_version(5, "checkpoint.parquet"), + delta_path_for_version(5, "json"), + delta_path_for_version(6, "json"), + delta_path_for_version(7, "json"), + ], + Some(&checkpoint_metadata), + ); + + let log_segment = LogSegmentBuilder::new() + .with_start_checkpoint(checkpoint_metadata) + .build(client.as_ref(), &table_root) + .unwrap(); + let (commit_files, checkpoint_parts) = + (log_segment.commit_files, log_segment.checkpoint_parts); + + assert_eq!(checkpoint_parts.len(), 1); + assert_eq!(commit_files.len(), 2); + assert_eq!(checkpoint_parts[0].version, 5); + assert_eq!(commit_files[0].version, 7); + assert_eq!(commit_files[1].version, 6); + } + + #[test] + fn test_builder_witouth_checkpoints() { + let (client, table_root) = build_log_with_paths_and_checkpoint( + &[ + delta_path_for_version(0, "json"), + delta_path_for_version(1, "json"), + delta_path_for_version(1, "checkpoint.parquet"), + delta_path_for_version(2, "json"), + delta_path_for_version(3, "json"), + delta_path_for_version(3, "checkpoint.parquet"), + delta_path_for_version(4, "json"), + delta_path_for_version(5, "json"), + delta_path_for_version(5, "checkpoint.parquet"), + delta_path_for_version(6, "json"), + delta_path_for_version(7, "json"), + ], + None, + ); + let log_segment = LogSegmentBuilder::new() + .without_checkpoint_files() + .build(client.as_ref(), &table_root) + .unwrap(); + let (commit_files, checkpoint_parts) = + (log_segment.commit_files, log_segment.checkpoint_parts); + + // Checkpoints should be omitted + assert_eq!(checkpoint_parts.len(), 0); + + // All commit files should still be there + let versions = commit_files.into_iter().map(|x| x.version).collect_vec(); + let expected_versions = (0..=7).rev().collect_vec(); + assert_eq!(versions, expected_versions); + } + #[test] + fn test_log_segment_commit_versions() { + let (client, table_root) = build_log_with_paths_and_checkpoint( + &[ + delta_path_for_version(0, "json"), + delta_path_for_version(1, "json"), + delta_path_for_version(1, "checkpoint.parquet"), + delta_path_for_version(2, "json"), + delta_path_for_version(3, "json"), + delta_path_for_version(3, "checkpoint.parquet"), + delta_path_for_version(4, "json"), + delta_path_for_version(5, "json"), + delta_path_for_version(5, "checkpoint.parquet"), + delta_path_for_version(6, "json"), + delta_path_for_version(7, "json"), + ], + None, + ); + + // -------------------------------------------------------------------------------- + // | Specify start version and end version | + // -------------------------------------------------------------------------------- + let log_segment = LogSegmentBuilder::new() + .with_end_version(5) + .with_start_version(2) + .without_checkpoint_files() + .with_sort_commit_files_ascending() + .build(client.as_ref(), &table_root) + .unwrap(); + let (commit_files, checkpoint_parts) = + (log_segment.commit_files, log_segment.checkpoint_parts); + + // Checkpoints should be omitted + assert_eq!(checkpoint_parts.len(), 0); + + // Commits between 2 and 5 (inclusive) should be returned + let versions = commit_files.into_iter().map(|x| x.version).collect_vec(); + let expected_versions = (2..=5).collect_vec(); + assert_eq!(versions, expected_versions); + + // -------------------------------------------------------------------------------- + // | Specify no start or end version | + // -------------------------------------------------------------------------------- + let log_segment = LogSegmentBuilder::new() + .without_checkpoint_files() + .with_sort_commit_files_ascending() + .build(client.as_ref(), &table_root) + .unwrap(); + let (commit_files, checkpoint_parts) = + (log_segment.commit_files, log_segment.checkpoint_parts); + + // Checkpoints should be omitted + assert_eq!(checkpoint_parts.len(), 0); + + // Commits between 2 and 7 (inclusive) should be returned + let versions = commit_files.into_iter().map(|x| x.version).collect_vec(); + let expected_versions = (0..=7).collect_vec(); + assert_eq!(versions, expected_versions); + } + + #[test] + fn test_non_contiguous_log() { + // Commit with version 1 is missing + let (client, table_root) = build_log_with_paths_and_checkpoint( + &[ + delta_path_for_version(0, "json"), + delta_path_for_version(2, "json"), + ], + None, + ); + let log_segment_res = LogSegmentBuilder::new().build(client.as_ref(), &table_root); + assert!(log_segment_res.is_err()); + } + + #[test] + fn test_larger_start_version_is_fail() { + // Commit with version 1 is missing + let (client, table_root) = build_log_with_paths_and_checkpoint( + &[ + delta_path_for_version(0, "json"), + delta_path_for_version(1, "json"), + ], + None, + ); + let log_segment_res = LogSegmentBuilder::new() + .with_start_version(1) + .with_end_version(0) + .build(client.as_ref(), &table_root); + assert!(log_segment_res.is_err()); + + let log_segment_res = LogSegmentBuilder::new() + .with_start_version(0) + .with_end_version(0) + .build(client.as_ref(), &table_root); + assert!(log_segment_res.is_ok()); + } + + #[test] + fn test_build_with_start_version_and_checkpoint_fails() { + let checkpoint_metadata = CheckpointMetadata { + version: 3, + size: 10, + parts: None, + size_in_bytes: None, + num_of_add_files: None, + checkpoint_schema: None, + checksum: None, + }; + + let (client, table_root) = build_log_with_paths_and_checkpoint( + &[ + delta_path_for_version(0, "json"), + delta_path_for_version(1, "checkpoint.parquet"), + delta_path_for_version(2, "json"), + delta_path_for_version(3, "checkpoint.parquet"), + delta_path_for_version(4, "json"), + delta_path_for_version(5, "checkpoint.parquet"), + delta_path_for_version(6, "json"), + delta_path_for_version(7, "json"), + ], + Some(&checkpoint_metadata), + ); + + let log_segment_res = LogSegmentBuilder::new() + .with_start_checkpoint(checkpoint_metadata) + .with_start_version(5) + .build(client.as_ref(), &table_root); + assert!(log_segment_res.is_err()); + } + + #[test] + fn test_build_with_start_checkpoint_and_end_version() { + let checkpoint_metadata = CheckpointMetadata { + version: 3, + size: 10, + parts: None, + size_in_bytes: None, + num_of_add_files: None, + checkpoint_schema: None, + checksum: None, + }; + + let (client, table_root) = build_log_with_paths_and_checkpoint( + &[ + delta_path_for_version(0, "json"), + delta_path_for_version(1, "checkpoint.parquet"), + delta_path_for_version(2, "json"), + delta_path_for_version(3, "checkpoint.parquet"), + delta_path_for_version(4, "json"), + delta_path_for_version(5, "checkpoint.parquet"), + delta_path_for_version(6, "json"), + delta_path_for_version(7, "json"), + ], + Some(&checkpoint_metadata), + ); + + let log_segment = LogSegmentBuilder::new() + .with_start_checkpoint(checkpoint_metadata) + .with_end_version(4) + .build(client.as_ref(), &table_root) + .unwrap(); + assert_eq!(log_segment.checkpoint_parts.len(), 1); + assert_eq!(log_segment.checkpoint_parts[0].version, 3); + assert_eq!(log_segment.commit_files.len(), 1); + assert_eq!(log_segment.commit_files[0].version, 4); + } } diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 9931cd325..2305c2421 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -2,20 +2,16 @@ //! has schema etc.) //! -use std::cmp::Ordering; -use std::sync::Arc; - use serde::{Deserialize, Serialize}; +use std::sync::Arc; use tracing::{debug, warn}; use url::Url; use crate::actions::{Metadata, Protocol}; use crate::features::{ColumnMappingMode, COLUMN_MAPPING_MODE_KEY}; -use crate::log_segment::LogSegment; -use crate::path::ParsedLogPath; +use crate::log_segment::{LogSegment, LogSegmentBuilder}; use crate::scan::ScanBuilder; use crate::schema::Schema; -use crate::utils::require; use crate::{DeltaResult, Engine, Error, FileSystemClient, Version}; const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint"; @@ -28,7 +24,6 @@ const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint"; pub struct Snapshot { pub(crate) table_root: Url, pub(crate) log_segment: LogSegment, - version: Version, metadata: Metadata, protocol: Protocol, schema: Schema, @@ -45,7 +40,7 @@ impl std::fmt::Debug for Snapshot { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Snapshot") .field("path", &self.log_segment.log_root.as_str()) - .field("version", &self.version) + .field("version", &self.version()) .field("metadata", &self.metadata) .finish() } @@ -67,55 +62,18 @@ impl Snapshot { let fs_client = engine.get_file_system_client(); let log_url = table_root.join("_delta_log/").unwrap(); - // List relevant files from log - let (mut commit_files, checkpoint_files) = - match (read_last_checkpoint(fs_client.as_ref(), &log_url)?, version) { - (Some(cp), None) => { - list_log_files_with_checkpoint(&cp, fs_client.as_ref(), &log_url)? - } - (Some(cp), Some(version)) if cp.version >= version => { - list_log_files_with_checkpoint(&cp, fs_client.as_ref(), &log_url)? - } - _ => list_log_files(fs_client.as_ref(), &log_url)?, - }; - - // remove all files above requested version - if let Some(version) = version { - commit_files.retain(|log_path| log_path.version <= version); - } - // only keep commit files above the checkpoint we found - if let Some(checkpoint_file) = checkpoint_files.first() { - commit_files.retain(|log_path| checkpoint_file.version < log_path.version); - } - - // get the effective version from chosen files - let version_eff = commit_files - .first() - .or(checkpoint_files.first()) - .ok_or(Error::MissingVersion)? // TODO: A more descriptive error - .version; - - if let Some(v) = version { - require!( - version_eff == v, - Error::MissingVersion // TODO more descriptive error - ); - } - - let log_segment = LogSegment { - log_root: log_url, - commit_files, - checkpoint_files, - }; + let log_segment = LogSegmentBuilder::new() + .with_end_version(version) + .with_start_checkpoint(read_last_checkpoint(fs_client.as_ref(), &log_url)?) + .build(fs_client.as_ref(), &table_root)?; - Self::try_new_from_log_segment(table_root, log_segment, version_eff, engine) + Self::try_new_from_log_segment(table_root, log_segment, engine) } /// Create a new [`Snapshot`] instance. pub(crate) fn try_new_from_log_segment( location: Url, log_segment: LogSegment, - version: Version, engine: &dyn Engine, ) -> DeltaResult { let (metadata, protocol) = log_segment.read_metadata(engine)?; @@ -127,7 +85,6 @@ impl Snapshot { Ok(Self { table_root: location, log_segment, - version, metadata, protocol, schema, @@ -147,7 +104,7 @@ impl Snapshot { /// Version of this `Snapshot` in the table. pub fn version(&self) -> Version { - self.version + self.log_segment.end_version } /// Table [`Schema`] at this `Snapshot`s version. @@ -230,131 +187,6 @@ fn read_last_checkpoint( } } -/// List all log files after a given checkpoint. -fn list_log_files_with_checkpoint( - checkpoint_metadata: &CheckpointMetadata, - fs_client: &dyn FileSystemClient, - log_root: &Url, -) -> DeltaResult<(Vec, Vec)> { - let version_prefix = format!("{:020}", checkpoint_metadata.version); - let start_from = log_root.join(&version_prefix)?; - - let mut max_checkpoint_version = checkpoint_metadata.version; - let mut checkpoint_files = vec![]; - // We expect 10 commit files per checkpoint, so start with that size. We could adjust this based - // on config at some point - let mut commit_files = Vec::with_capacity(10); - - for meta_res in fs_client.list_from(&start_from)? { - let meta = meta_res?; - let parsed_path = ParsedLogPath::try_from(meta)?; - // TODO this filters out .crc files etc which start with "." - how do we want to use these kind of files? - if let Some(parsed_path) = parsed_path { - if parsed_path.is_commit() { - commit_files.push(parsed_path); - } else if parsed_path.is_checkpoint() { - match parsed_path.version.cmp(&max_checkpoint_version) { - Ordering::Greater => { - max_checkpoint_version = parsed_path.version; - checkpoint_files.clear(); - checkpoint_files.push(parsed_path); - } - Ordering::Equal => checkpoint_files.push(parsed_path), - Ordering::Less => {} - } - } - } - } - - if checkpoint_files.is_empty() { - // TODO: We could potentially recover here - return Err(Error::generic( - "Had a _last_checkpoint hint but didn't find any checkpoints", - )); - } - - if max_checkpoint_version != checkpoint_metadata.version { - warn!( - "_last_checkpoint hint is out of date. _last_checkpoint version: {}. Using actual most recent: {}", - checkpoint_metadata.version, - max_checkpoint_version - ); - // we (may) need to drop commits that are before the _actual_ last checkpoint (that - // is, commits between a stale _last_checkpoint and the _actual_ last checkpoint) - commit_files.retain(|parsed_path| parsed_path.version > max_checkpoint_version); - } else if checkpoint_files.len() != checkpoint_metadata.parts.unwrap_or(1) { - return Err(Error::Generic(format!( - "_last_checkpoint indicated that checkpoint should have {} parts, but it has {}", - checkpoint_metadata.parts.unwrap_or(1), - checkpoint_files.len() - ))); - } - - debug_assert!( - commit_files - .windows(2) - .all(|cfs| cfs[0].version <= cfs[1].version), - "fs_client.list_from() didn't return a sorted listing! {:?}", - commit_files - ); - // We assume listing returned ordered, we want reverse order - let commit_files = commit_files.into_iter().rev().collect(); - - Ok((commit_files, checkpoint_files)) -} - -/// List relevant log files. -/// -/// Relevant files are the max checkpoint found and all subsequent commits. -fn list_log_files( - fs_client: &dyn FileSystemClient, - log_root: &Url, -) -> DeltaResult<(Vec, Vec)> { - let version_prefix = format!("{:020}", 0); - let start_from = log_root.join(&version_prefix)?; - - let mut max_checkpoint_version = -1_i64; - let mut commit_files = Vec::new(); - let mut checkpoint_files = Vec::with_capacity(10); - - let log_paths = fs_client - .list_from(&start_from)? - .flat_map(|file| file.and_then(ParsedLogPath::try_from).transpose()); - for log_path in log_paths { - let log_path = log_path?; - if log_path.is_checkpoint() { - let version = log_path.version as i64; - match version.cmp(&max_checkpoint_version) { - Ordering::Greater => { - max_checkpoint_version = version; - checkpoint_files.clear(); - checkpoint_files.push(log_path); - } - Ordering::Equal => { - checkpoint_files.push(log_path); - } - _ => {} - } - } else if log_path.is_commit() { - commit_files.push(log_path); - } - } - - commit_files.retain(|f| f.version as i64 > max_checkpoint_version); - - debug_assert!( - commit_files - .windows(2) - .all(|cfs| cfs[0].version <= cfs[1].version), - "fs_client.list_from() didn't return a sorted listing! {:?}", - commit_files - ); - // We assume listing returned ordered, we want reverse order - let commit_files = commit_files.into_iter().rev().collect(); - - Ok((commit_files, checkpoint_files)) -} - #[cfg(test)] mod tests { use super::*; @@ -366,11 +198,11 @@ mod tests { use object_store::memory::InMemory; use object_store::path::Path; use object_store::ObjectStore; - use test_utils::delta_path_for_version; use crate::engine::default::executor::tokio::TokioBackgroundExecutor; use crate::engine::default::filesystem::ObjectStoreFileSystemClient; use crate::engine::sync::SyncEngine; + use crate::path::ParsedLogPath; use crate::schema::StructType; #[test] @@ -437,69 +269,6 @@ mod tests { assert!(cp.is_none()) } - #[test] - fn test_read_log_with_out_of_date_last_checkpoint() { - let store = Arc::new(InMemory::new()); - - let data = bytes::Bytes::from("kernel-data"); - - let checkpoint_metadata = CheckpointMetadata { - version: 3, - size: 10, - parts: None, - size_in_bytes: None, - num_of_add_files: None, - checkpoint_schema: None, - checksum: None, - }; - - // add log files to store - tokio::runtime::Runtime::new() - .expect("create tokio runtime") - .block_on(async { - for path in [ - delta_path_for_version(0, "json"), - delta_path_for_version(1, "checkpoint.parquet"), - delta_path_for_version(2, "json"), - delta_path_for_version(3, "checkpoint.parquet"), - delta_path_for_version(4, "json"), - delta_path_for_version(5, "checkpoint.parquet"), - delta_path_for_version(6, "json"), - delta_path_for_version(7, "json"), - ] { - store - .put(&path, data.clone().into()) - .await - .expect("put log file in store"); - } - let checkpoint_str = - serde_json::to_string(&checkpoint_metadata).expect("Serialize checkpoint"); - store - .put( - &Path::from("_delta_log/_last_checkpoint"), - checkpoint_str.into(), - ) - .await - .expect("Write _last_checkpoint"); - }); - - let client = ObjectStoreFileSystemClient::new( - store, - false, // don't have ordered listing - Path::from("/"), - Arc::new(TokioBackgroundExecutor::new()), - ); - - let url = Url::parse("memory:///_delta_log/").expect("valid url"); - let (commit_files, checkpoint_files) = - list_log_files_with_checkpoint(&checkpoint_metadata, &client, &url).unwrap(); - assert_eq!(checkpoint_files.len(), 1); - assert_eq!(commit_files.len(), 2); - assert_eq!(checkpoint_files[0].version, 5); - assert_eq!(commit_files[0].version, 7); - assert_eq!(commit_files[1].version, 6); - } - fn valid_last_checkpoint() -> Vec { r#"{"size":8,"size_in_bytes":21857,"version":1}"#.as_bytes().to_vec() } @@ -552,9 +321,9 @@ mod tests { let engine = SyncEngine::new(); let snapshot = Snapshot::try_new(location, &engine, None).unwrap(); - assert_eq!(snapshot.log_segment.checkpoint_files.len(), 1); + assert_eq!(snapshot.log_segment.checkpoint_parts.len(), 1); assert_eq!( - ParsedLogPath::try_from(snapshot.log_segment.checkpoint_files[0].location.clone()) + ParsedLogPath::try_from(snapshot.log_segment.checkpoint_parts[0].location.clone()) .unwrap() .unwrap() .version,