diff --git a/crates/deltalake-core/src/protocol/checkpoints.rs b/crates/deltalake-core/src/protocol/checkpoints.rs index a32c73fe44..52ea4e6de3 100644 --- a/crates/deltalake-core/src/protocol/checkpoints.rs +++ b/crates/deltalake-core/src/protocol/checkpoints.rs @@ -527,7 +527,6 @@ mod tests { use lazy_static::lazy_static; use serde_json::json; - use crate::logstore; use crate::operations::DeltaOps; use crate::writer::test_utils::get_delta_schema; use object_store::path::Path; @@ -561,6 +560,87 @@ mod tests { assert_eq!(last_checkpoint.version, 0); } + /// This test validates that a checkpoint can be written and re-read with the minimum viable + /// Metadata. There was a bug which didn't handle the optionality of createdTime. + #[tokio::test] + async fn test_create_checkpoint_with_metadata() { + let table_schema = get_delta_schema(); + + let mut table = DeltaOps::new_in_memory() + .create() + .with_columns(table_schema.fields().clone()) + .with_save_mode(crate::protocol::SaveMode::Ignore) + .await + .unwrap(); + assert_eq!(table.version(), 0); + assert_eq!(table.get_schema().unwrap(), &table_schema); + + let part_cols: Vec = vec![]; + let schema_string: String = + serde_json::to_string(&table.state.schema()).expect("Failed to create schema string"); + let metadata = Metadata::new( + "test-id".to_string(), + Format::default(), + schema_string, + part_cols, + None, + ); + let actions = vec![Action::Metadata(metadata)]; + + let epoch_id = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Time went backwards") + .as_millis() as i64; + + let v = crate::operations::transaction::commit( + table.log_store().clone().as_ref(), + &actions, + crate::protocol::DeltaOperation::StreamingUpdate { + output_mode: crate::protocol::OutputMode::Append, + query_id: "test".into(), + epoch_id, + }, + &table.state, + None, + ) + .await + .expect("Failed to commit"); + + assert_eq!(1, v, "Expected the commit to create table version 1"); + table.load().await.expect("Failed to reload table"); + assert_eq!( + table.version(), + 1, + "The loaded version of the table is not up to date" + ); + + let res = + create_checkpoint_for(table.version(), table.get_state(), table.log_store.as_ref()) + .await; + assert!(res.is_ok()); + + // Look at the "files" and verify that the _last_checkpoint has the right version + let path = Path::from("_delta_log/_last_checkpoint"); + let last_checkpoint = table + .object_store() + .get(&path) + .await + .expect("Failed to get the _last_checkpoint") + .bytes() + .await + .expect("Failed to get bytes for _last_checkpoint"); + let last_checkpoint: CheckPoint = serde_json::from_slice(&last_checkpoint).expect("Fail"); + assert_eq!(last_checkpoint.version, 1); + + // If the regression exists, this will fail + table.load().await.expect("Failed to reload the table, this likely means that the optional createdTime was not actually optional"); + assert_eq!( + 1, + table.version(), + "The reloaded table doesn't have the right version" + ); + } + #[tokio::test] async fn test_create_checkpoint_for_invalid_version() { let table_schema = get_delta_schema(); diff --git a/crates/deltalake-core/src/protocol/parquet_read/mod.rs b/crates/deltalake-core/src/protocol/parquet_read/mod.rs index d6f0ac7979..6acf1a2fa7 100644 --- a/crates/deltalake-core/src/protocol/parquet_read/mod.rs +++ b/crates/deltalake-core/src/protocol/parquet_read/mod.rs @@ -434,12 +434,10 @@ impl Metadata { .map_err(|_| gen_action_type_error("metaData", "schemaString", "string"))? .clone(); } - "createdTime" => { - re.created_time = - Some(record.get_long(i).map_err(|_| { - gen_action_type_error("metaData", "createdTime", "long") - })?); - } + "createdTime" => match record.get_long(i) { + Ok(s) => re.created_time = Some(s.clone()), + _ => re.created_time = None, + }, "configuration" => { let configuration_map = record .get_map(i)