diff --git a/crates/deltalake-core/src/protocol/checkpoints.rs b/crates/deltalake-core/src/protocol/checkpoints.rs index f48fbfbd76..a4cc1b66c7 100644 --- a/crates/deltalake-core/src/protocol/checkpoints.rs +++ b/crates/deltalake-core/src/protocol/checkpoints.rs @@ -38,6 +38,10 @@ enum CheckpointError { #[error("Partition value {0} cannot be parsed from string.")] PartitionValueNotParseable(String), + /// Caller attempt to create a checkpoint for a version which does not exist on the table state + #[error("Attempted to create a checkpoint for a version {0} that does not match the table state {1}")] + StaleTableVersion(i64, i64), + /// Error returned when the parquet writer fails while writing the checkpoint. #[error("Failed to write parquet: {}", .source)] Parquet { @@ -60,6 +64,7 @@ impl From for ProtocolError { match value { CheckpointError::PartitionValueNotParseable(_) => Self::InvalidField(value.to_string()), CheckpointError::Arrow { source } => Self::Arrow { source }, + CheckpointError::StaleTableVersion(..) => Self::Generic(value.to_string()), CheckpointError::Parquet { source } => Self::ParquetParseError { source }, } } @@ -117,6 +122,14 @@ pub async fn create_checkpoint_for( state: &DeltaTableState, log_store: &dyn LogStore, ) -> Result<(), ProtocolError> { + if version != state.version() { + error!( + "create_checkpoint_for called with version {version} but table state contains: {}. The table state may need to be reloaded", + state.version() + ); + return Err(CheckpointError::StaleTableVersion(version, state.version()).into()); + } + // TODO: checkpoints _can_ be multi-part... haven't actually found a good reference for // an appropriate split point yet though so only writing a single part currently. // See https://github.com/delta-io/delta-rs/issues/288 @@ -486,6 +499,72 @@ mod tests { use lazy_static::lazy_static; use serde_json::json; + use crate::operations::DeltaOps; + use crate::writer::test_utils::get_delta_schema; + use object_store::path::Path; + + #[tokio::test] + async fn test_create_checkpoint_for() { + let table_schema = get_delta_schema(); + + let table = DeltaOps::new_in_memory() + .create() + .with_columns(table_schema.fields().clone()) + .with_save_mode(crate::protocol::SaveMode::Ignore) + .await + .unwrap(); + assert_eq!(table.version(), 0); + assert_eq!(table.get_metadata().unwrap().schema, table_schema); + let res = create_checkpoint_for(0, table.get_state(), table.log_store.as_ref()).await; + assert!(res.is_ok()); + + // Look at the "files" and verify that the _last_checkpoint has the right version + let path = Path::from("_delta_log/_last_checkpoint"); + let last_checkpoint = table + .object_store() + .get(&path) + .await + .expect("Failed to get the _last_checkpoint") + .bytes() + .await + .expect("Failed to get bytes for _last_checkpoint"); + let last_checkpoint: CheckPoint = serde_json::from_slice(&last_checkpoint).expect("Fail"); + assert_eq!(last_checkpoint.version, 0); + } + + #[tokio::test] + async fn test_create_checkpoint_for_invalid_version() { + let table_schema = get_delta_schema(); + + let table = DeltaOps::new_in_memory() + .create() + .with_columns(table_schema.fields().clone()) + .with_save_mode(crate::protocol::SaveMode::Ignore) + .await + .unwrap(); + assert_eq!(table.version(), 0); + assert_eq!(table.get_metadata().unwrap().schema, table_schema); + match create_checkpoint_for(1, table.get_state(), table.log_store.as_ref()).await { + Ok(_) => { + /* + * If a checkpoint is allowed to be created here, it will use the passed in + * version, but _last_checkpoint is generated from the table state will point to a + * version 0 checkpoint. + * E.g. + * + * Path { raw: "_delta_log/00000000000000000000.json" } + * Path { raw: "_delta_log/00000000000000000001.checkpoint.parquet" } + * Path { raw: "_delta_log/_last_checkpoint" } + * + */ + panic!( + "We should not allow creating a checkpoint for a version which doesn't exist!" + ); + } + Err(_) => { /* We should expect an error in the "right" case */ } + } + } + #[test] fn typed_partition_value_from_string_test() { let string_value: Value = "Hello World!".into();