From d4f18b3ae9d616e771b5d0e0fa498d0086fd91eb Mon Sep 17 00:00:00 2001 From: Justin Jossick Date: Wed, 13 Nov 2024 12:50:12 -0800 Subject: [PATCH] fix: jsonwriter should checkpoint by default This is a fix aimed to enable jsonwriter to checkpoint in accordance with delta.checkpointInterval. It changes the default commitbuilder to set a post_commit_hook so that checkpointing will be done by default. Potentially we could also expose CommitProperties as an argument to flush_and_commit, but that would require a change to the function signature and would be a breaking change. Signed-off-by: Justin Jossick --- crates/core/src/writer/json.rs | 49 ++++++++++++++++++++++++++++++++++ crates/core/src/writer/mod.rs | 4 +-- 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/crates/core/src/writer/json.rs b/crates/core/src/writer/json.rs index 2cf7f6a950..a04dceb3bd 100644 --- a/crates/core/src/writer/json.rs +++ b/crates/core/src/writer/json.rs @@ -659,4 +659,53 @@ mod tests { assert_eq!(table.version(), 1); } } + + #[tokio::test] + async fn test_json_write_checkpoint() { + use crate::operations::create::CreateBuilder; + use std::fs; + + let table_dir = tempfile::tempdir().unwrap(); + let schema = get_delta_schema(); + let path = table_dir.path().to_str().unwrap().to_string(); + let config: HashMap> = vec![ + ( + "delta.checkpointInterval".to_string(), + Some("5".to_string()), + ), + ("delta.checkpointPolicy".to_string(), Some("v2".to_string())), + ] + .into_iter() + .collect(); + let mut table = CreateBuilder::new() + .with_location(&path) + .with_table_name("test-table") + .with_comment("A table for running tests") + .with_columns(schema.fields().cloned()) + .with_configuration(config) + .await + .unwrap(); + assert_eq!(table.version(), 0); + let mut writer = JsonWriter::for_table(&table).unwrap(); + let data = serde_json::json!( + { + "id" : "A", + "value": 42, + "modified": "2021-02-01" + } + ); + for _ in 1..6 { + writer.write(vec![data.clone()]).await.unwrap(); + writer.flush_and_commit(&mut table).await.unwrap(); + } + let dir_path = path + "/_delta_log"; + + let target_file = "00000000000000000004.checkpoint.parquet"; + let entries: Vec<_> = fs::read_dir(dir_path) + .unwrap() + .filter_map(|entry| entry.ok()) + .filter(|entry| entry.file_name().into_string().unwrap() == target_file) + .collect(); + assert_eq!(entries.len(), 1); + } } diff --git a/crates/core/src/writer/mod.rs b/crates/core/src/writer/mod.rs index d3fe529a89..cd87459c2f 100644 --- a/crates/core/src/writer/mod.rs +++ b/crates/core/src/writer/mod.rs @@ -8,7 +8,7 @@ use serde_json::Value; use crate::errors::DeltaTableError; use crate::kernel::{Action, Add}; -use crate::operations::transaction::CommitBuilder; +use crate::operations::transaction::{CommitBuilder, CommitProperties}; use crate::protocol::{ColumnCountStat, DeltaOperation, SaveMode}; use crate::DeltaTable; @@ -174,7 +174,7 @@ pub(crate) async fn flush_and_commit( predicate: None, }; - let version = CommitBuilder::default() + let version = CommitBuilder::from(CommitProperties::default()) .with_actions(adds) .build(Some(snapshot), table.log_store.clone(), operation) .await?