Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing snapshot creation for earlier versions than the latest checkpoint #322

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions kernel/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ pub enum Error {
/// Expressions did not parse or evaluate correctly
#[error("Invalid expression evaluation: {0}")]
InvalidExpressionEvaluation(String),

/// An error indicating an invalid Delta Log structure
#[error("Invalid Delta Log: {0}")]
InvalidDeltaLog(String),
}

// Convenience constructors for Error types that take a String argument
Expand Down
260 changes: 260 additions & 0 deletions kernel/src/group_iterator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
use std::rc::Rc;
use std::cell::RefCell;
use crate::FileMeta;
use crate::path::LogPath;
use crate::{DeltaResult, Error};

/// This iterator groups Delta log files into checkpoint nodes. It handles various scenarios including:
/// - Single-part checkpoints
/// - Multi-part checkpoints
/// - Multiple checkpoints for the same version
/// - Commits without checkpoints
///
/// The iterator creates a linked list of CheckpointNodes, where each node represents a checkpoint
/// (if present) and all commits up to the next checkpoint.
///
/// Sample Delta log structures and resulting node structures:
///
/// 1. Simple scenario with single-part checkpoints:
/// Files:
/// 00000000000000000000.json
/// 00000000000000000001.json
/// 00000000000000000002.checkpoint.parquet
/// 00000000000000000003.json
/// 00000000000000000004.checkpoint.parquet
/// 00000000000000000005.json
///
/// Resulting nodes:
/// Node 1: {checkpoint_version: None, checkpoint_files: None, commits: [0.json, 1.json]}
/// Node 2: {checkpoint_version: 2, checkpoint_files: [2.checkpoint.parquet], commits: [3.json]}
/// Node 3: {checkpoint_version: 4, checkpoint_files: [4.checkpoint.parquet], commits: [5.json]}
///
/// 2. Scenario with multi-part checkpoints:
/// Files:
/// 00000000000000000000.json
/// 00000000000000000001.json
/// 00000000000000000002.checkpoint.00000000001.00000000002.parquet
/// 00000000000000000002.checkpoint.00000000002.00000000002.parquet
/// 00000000000000000003.json
/// 00000000000000000004.checkpoint.parquet
/// 00000000000000000005.json
///
/// Resulting nodes:
/// Node 1: {checkpoint_version: None, checkpoint_files: None, commits: [0.json, 1.json]}
/// Node 2: {checkpoint_version: 2, checkpoint_files: [2.checkpoint.*.parquet], multi_part: true, commits: [3.json]}
/// Node 3: {checkpoint_version: 4, checkpoint_files: [4.checkpoint.parquet], commits: [5.json]}
///
/// 3. Scenario with multiple checkpoints for the same version:
/// Files:
/// 00000000000000000000.json
/// 00000000000000000001.checkpoint.00000000001.00000000002.parquet
/// 00000000000000000001.checkpoint.00000000002.00000000002.parquet
/// 00000000000000000001.checkpoint.00000000001.00000000003.parquet
/// 00000000000000000001.checkpoint.00000000002.00000000003.parquet
/// 00000000000000000001.checkpoint.00000000003.00000000003.parquet
/// 00000000000000000002.json
///
/// Resulting node:
/// Node 1: {
/// checkpoint_version: 1,
/// checkpoint_files: [1.checkpoint.00000000001.00000000003.parquet, 1.checkpoint.00000000002.00000000003.parquet, 1.checkpoint.00000000003.00000000003.parquet],
/// other_multipart_checkpoints: [[1.checkpoint.00000000001.00000000002.parquet, 1.checkpoint.00000000002.00000000002.parquet]],
/// multi_part: true,
/// commits: [2.json]
/// }

#[derive(Clone)]
pub struct CheckpointNode {
pub checkpoint_version: Option<u64>,
pub checkpoint_files: Option<Vec<FileMeta>>,
pub other_multipart_checkpoints: Vec<Vec<FileMeta>>,
pub multi_part: bool,
pub commits: Vec<FileMeta>,
pub next: Option<Rc<RefCell<CheckpointNode>>>,
}

pub struct DeltaLogGroupingIterator {
pub head: Option<Rc<RefCell<CheckpointNode>>>,
pub current: Option<Rc<RefCell<CheckpointNode>>>,
}

impl DeltaLogGroupingIterator {
pub fn new(files: Vec<FileMeta>, beginning: Option<bool>) -> DeltaResult<Self> {
// Sort files by version and type (checkpoints before commits)
let mut versioned_files: Vec<(u64, FileMeta)> = files
.into_iter()
.filter_map(|file| {
let log_path = LogPath::new(&file.location);
log_path.version.map(|v| (v, file))
})
.collect();

// Sort files: first by version, then by type (checkpoints before commits)
// This ensures that for each version, we process checkpoints before commits
versioned_files.sort_unstable_by(|(v1, f1), (v2, f2)| {
v1.cmp(v2).then_with(|| {
let is_checkpoint1 = LogPath::new(&f1.location).is_checkpoint;
let is_checkpoint2 = LogPath::new(&f2.location).is_checkpoint;
is_checkpoint2.cmp(&is_checkpoint1) // Checkpoints before commits
})
});

// Initialize variables for building the linked list of nodes
let mut current_version: Option<u64> = None;
let mut head_node: Option<Rc<RefCell<CheckpointNode>>> = None;
let mut last_node: Option<Rc<RefCell<CheckpointNode>>> = None;

// Temporary storage for building the current node
let mut current_node = CheckpointNode {
checkpoint_version: None,
checkpoint_files: None,
other_multipart_checkpoints: Vec::new(),
multi_part: false,
commits: Vec::new(),
next: None,
};

let mut iter = versioned_files.into_iter().peekable();

while let Some((file_version, file_meta)) = iter.next() {
let log_path = LogPath::new(&file_meta.location);

// Handle version gaps and ensure we start from version 0 if beginning is true
match current_version {
Some(v) if v + 1 < file_version => {
return Err(Error::InvalidDeltaLog(format!(
"Version gap detected between versions {} and {}",
v, file_version
)));
}
None if file_version > 0 && beginning == Some(true) => {
return Err(Error::InvalidDeltaLog(format!(
"Missing commits before version {}",
file_version
)));
}
_ => (),
}

current_version = Some(file_version);

if log_path.is_checkpoint {
// Finalize the current node if it contains data and start a new one
// This happens when we encounter a new checkpoint
if current_node.checkpoint_version.is_some() || !current_node.commits.is_empty() {
let new_node = Rc::new(RefCell::new(CheckpointNode {
checkpoint_version: current_node.checkpoint_version,
checkpoint_files: current_node.checkpoint_files,
other_multipart_checkpoints: current_node.other_multipart_checkpoints,
multi_part: current_node.multi_part,
commits: current_node.commits,
next: None,
}));

// Link the new node to the previous one
if let Some(ref last) = last_node {
last.borrow_mut().next = Some(Rc::clone(&new_node));
} else {
head_node = Some(Rc::clone(&new_node));
}

last_node = Some(new_node);

// Reset current node for the new checkpoint
current_node = CheckpointNode {
checkpoint_version: None,
checkpoint_files: None,
other_multipart_checkpoints: Vec::new(),
multi_part: false,
commits: Vec::new(),
next: None,
};
}

// Start a new node with this checkpoint
current_node.checkpoint_version = Some(file_version);

if log_path.is_multi_part_checkpoint() {
// Handle multi-part checkpoints
current_node.multi_part = true;
let mut parts = vec![file_meta.clone()];

// Extract expected number of parts
let (_, num_parts) = log_path.get_checkpoint_part_numbers().unwrap();

// Collect remaining parts of the multi-part checkpoint
for _ in 1..num_parts {
if let Some((next_version, next_file_meta)) = iter.peek() {
let next_log_path = LogPath::new(&next_file_meta.location);
if *next_version == file_version && next_log_path.is_checkpoint {
parts.push(next_file_meta.clone());
iter.next(); // Consume the iterator
} else {
return Err(Error::InvalidDeltaLog(format!(
"Incomplete multi-part checkpoint at version {}",
file_version
)));
}
} else {
return Err(Error::InvalidDeltaLog(format!(
"Incomplete multi-part checkpoint at version {}",
file_version
)));
}
}

current_node.checkpoint_files = Some(parts);
} else {
// Handle single-part checkpoint
current_node.checkpoint_files = Some(vec![file_meta.clone()]);
current_node.multi_part = false;
}
} else if log_path.is_commit {
// Add commit file to the current node
current_node.commits.push(file_meta.clone());
} else {
// Skip unknown file types
continue;
}
}

// Finalize the last node
if current_node.checkpoint_version.is_some() || !current_node.commits.is_empty() {
let new_node = Rc::new(RefCell::new(CheckpointNode {
checkpoint_version: current_node.checkpoint_version,
checkpoint_files: current_node.checkpoint_files,
other_multipart_checkpoints: current_node.other_multipart_checkpoints,
multi_part: current_node.multi_part,
commits: current_node.commits,
next: None,
}));

if let Some(ref last) = last_node {
last.borrow_mut().next = Some(Rc::clone(&new_node));
} else {
head_node = Some(Rc::clone(&new_node));
}
}

Ok(DeltaLogGroupingIterator {
head: head_node,
current: None,
})
}
}

// Implement Iterator for DeltaLogGroupingIterator
impl Iterator for DeltaLogGroupingIterator {
type Item = Rc<RefCell<CheckpointNode>>;

fn next(&mut self) -> Option<Self::Item> {
if self.current.is_none() {
self.current = self.head.clone();
} else {
let next_node = self.current.as_ref().and_then(|node| node.borrow().next.clone());
self.current = next_node;
}

self.current.clone()
}
}
1 change: 1 addition & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub(crate) mod path;
pub mod scan;
pub mod schema;
pub mod snapshot;
pub mod group_iterator;
pub mod table;
pub mod transaction;
pub(crate) mod utils;
Expand Down
64 changes: 64 additions & 0 deletions kernel/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,70 @@ impl<'a> LogPath<'a> {
}
})
}

/// Determines if the file is a multi-part checkpoint.
///
/// Sample checkpoint files:
/// - Single-part: "00000000000000000010.checkpoint.parquet"
/// - Multi-part: "00000000000000000010.checkpoint.0000000001.0000000003.parquet"
///
/// For the multi-part example:
/// - "00000000000000000010" is the version
/// - "checkpoint" indicates it's a checkpoint file
/// - "0000000001" is the part number
/// - "0000000003" is the total number of parts
/// - "parquet" is the file extension
pub(crate) fn is_multi_part_checkpoint(&self) -> bool {
// If it's not a checkpoint at all, it can't be a multi-part checkpoint
if !self.is_checkpoint {
return false;
}

self.filename
.and_then(|f| f.split_once(".checkpoint."))
// After splitting at ".checkpoint.", we focus on the part after it
.map(|(_, rest)| {
// Count the number of parts after ".checkpoint."
// A multi-part checkpoint should have 3 parts:
// 1. Part number
// 2. Total number of parts
// 3. File extension (e.g., "parquet")
rest.split('.').count() == 3
})
.unwrap_or(false)
}

/// Extracts the part number and total number of parts for a multi-part checkpoint.
///
/// For a multi-part checkpoint file like "00000000000000000010.checkpoint.0000000001.0000000003.parquet":
/// - This method would return `Some((1, 3))`, indicating it's part 1 of 3.
///
/// Returns `None` for single-part checkpoints or non-checkpoint files.
pub(crate) fn get_checkpoint_part_numbers(&self) -> Option<(u64, u64)> {
// First, check if it's a multi-part checkpoint
if !self.is_multi_part_checkpoint() {
return None;
}

// Split the filename into parts
let parts: Vec<&str> = self.filename?.split('.').collect();

// A valid multi-part checkpoint filename should have 5 parts:
// 1. Version
// 2. "checkpoint"
// 3. Part number
// 4. Total number of parts
// 5. File extension
if parts.len() != 5 {
return None;
}

// Parse the part number (index 2) and total number (index 3) into u64 integers
let part = parts[2].parse().ok()?;
let total = parts[3].parse().ok()?;

Some((part, total))
}
}

impl<'a> AsRef<Url> for LogPath<'a> {
Expand Down
Loading
Loading