Skip to content

Commit

Permalink
fix: allow checkpoints to contain metadata actions without a createdT…
Browse files Browse the repository at this point in the history
…ime value

From the protocol, createdTime is of type Option[Long] and:

    The time when this metadata action is created, in milliseconds since the Unix epoch

Not required, but this only shows up when checkpoints are created and
then read back into on the table load
  • Loading branch information
rtyler committed Jan 8, 2024
1 parent f7c303b commit edd8d78
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 7 deletions.
82 changes: 81 additions & 1 deletion crates/deltalake-core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,6 @@ mod tests {
use lazy_static::lazy_static;
use serde_json::json;

use crate::logstore;
use crate::operations::DeltaOps;
use crate::writer::test_utils::get_delta_schema;
use object_store::path::Path;
Expand Down Expand Up @@ -561,6 +560,87 @@ mod tests {
assert_eq!(last_checkpoint.version, 0);
}

/// This test validates that a checkpoint can be written and re-read with the minimum viable
/// Metadata. There was a bug which didn't handle the optionality of createdTime.
#[tokio::test]
async fn test_create_checkpoint_with_metadata() {
let table_schema = get_delta_schema();

let mut table = DeltaOps::new_in_memory()
.create()
.with_columns(table_schema.fields().clone())
.with_save_mode(crate::protocol::SaveMode::Ignore)
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_schema().unwrap(), &table_schema);

let part_cols: Vec<String> = vec![];
let schema_string: String =
serde_json::to_string(&table.state.schema()).expect("Failed to create schema string");
let metadata = Metadata::new(
"test-id".to_string(),
Format::default(),
schema_string,
part_cols,
None,
);
let actions = vec![Action::Metadata(metadata)];

let epoch_id = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as i64;

let v = crate::operations::transaction::commit(
table.log_store().clone().as_ref(),
&actions,
crate::protocol::DeltaOperation::StreamingUpdate {
output_mode: crate::protocol::OutputMode::Append,
query_id: "test".into(),
epoch_id,
},
&table.state,
None,
)
.await
.expect("Failed to commit");

assert_eq!(1, v, "Expected the commit to create table version 1");
table.load().await.expect("Failed to reload table");
assert_eq!(
table.version(),
1,
"The loaded version of the table is not up to date"
);

let res =
create_checkpoint_for(table.version(), table.get_state(), table.log_store.as_ref())
.await;
assert!(res.is_ok());

// Look at the "files" and verify that the _last_checkpoint has the right version
let path = Path::from("_delta_log/_last_checkpoint");
let last_checkpoint = table
.object_store()
.get(&path)
.await
.expect("Failed to get the _last_checkpoint")
.bytes()
.await
.expect("Failed to get bytes for _last_checkpoint");
let last_checkpoint: CheckPoint = serde_json::from_slice(&last_checkpoint).expect("Fail");
assert_eq!(last_checkpoint.version, 1);

// If the regression exists, this will fail
table.load().await.expect("Failed to reload the table, this likely means that the optional createdTime was not actually optional");
assert_eq!(
1,
table.version(),
"The reloaded table doesn't have the right version"
);
}

#[tokio::test]
async fn test_create_checkpoint_for_invalid_version() {
let table_schema = get_delta_schema();
Expand Down
10 changes: 4 additions & 6 deletions crates/deltalake-core/src/protocol/parquet_read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,12 +434,10 @@ impl Metadata {
.map_err(|_| gen_action_type_error("metaData", "schemaString", "string"))?
.clone();
}
"createdTime" => {
re.created_time =
Some(record.get_long(i).map_err(|_| {
gen_action_type_error("metaData", "createdTime", "long")
})?);
}
"createdTime" => match record.get_long(i) {
Ok(s) => re.created_time = Some(s.clone()),
_ => re.created_time = None,
},
"configuration" => {
let configuration_map = record
.get_map(i)
Expand Down

0 comments on commit edd8d78

Please sign in to comment.