diff --git a/crates/deltalake-core/src/protocol/checkpoints.rs b/crates/deltalake-core/src/protocol/checkpoints.rs index d0fbeb337f..a32c73fe44 100644 --- a/crates/deltalake-core/src/protocol/checkpoints.rs +++ b/crates/deltalake-core/src/protocol/checkpoints.rs @@ -10,7 +10,7 @@ use arrow_schema::{ArrowError, Schema as ArrowSchema}; use chrono::{Datelike, Utc}; use futures::{StreamExt, TryStreamExt}; use lazy_static::lazy_static; -use object_store::ObjectStore; +use object_store::{Error, ObjectStore}; use parquet::arrow::ArrowWriter; use parquet::errors::ParquetError; use regex::Regex; @@ -183,6 +183,19 @@ pub async fn cleanup_expired_logs_for( Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint).*$").unwrap(); } + let object_store = log_store.object_store(); + let maybe_last_checkpoint = object_store + .get(&log_store.log_path().child("_last_checkpoint")) + .await; + + if let Err(Error::NotFound { path: _, source: _ }) = maybe_last_checkpoint { + return Ok(0); + } + + let last_checkpoint = maybe_last_checkpoint?.bytes().await?; + let last_checkpoint: CheckPoint = serde_json::from_slice(&last_checkpoint)?; + let until_version = i64::min(until_version, last_checkpoint.version); + // Feed a stream of candidate deletion files directly into the delete_stream // function to try to improve the speed of cleanup and reduce the need for // intermediate memory. @@ -506,10 +519,15 @@ fn apply_stats_conversion( #[cfg(test)] mod tests { + use std::sync::Arc; + use super::*; + use arrow_array::{ArrayRef, RecordBatch}; + use chrono::Duration; use lazy_static::lazy_static; use serde_json::json; + use crate::logstore; use crate::operations::DeltaOps; use crate::writer::test_utils::get_delta_schema; use object_store::path::Path; @@ -702,6 +720,95 @@ mod tests { ); } + async fn setup_table() -> DeltaTable { + use arrow_schema::{DataType, Field}; + let schema = Arc::new(ArrowSchema::new(vec![Field::new( + "id", + DataType::Utf8, + false, + )])); + + let data = + vec![Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])) as ArrayRef]; + let batches = vec![RecordBatch::try_new(schema.clone(), data).unwrap()]; + + let table = DeltaOps::new_in_memory() + .write(batches.clone()) + .await + .unwrap(); + + DeltaOps(table) + .write(batches) + .with_save_mode(crate::protocol::SaveMode::Overwrite) + .await + .unwrap() + } + + #[tokio::test] + async fn test_cleanup_no_checkpoints() { + // Test that metadata clean up does not corrupt the table when no checkpoints exist + let table = setup_table().await; + + let log_retention_timestamp = (Utc::now().timestamp_millis() + + Duration::days(31).num_milliseconds()) + - table + .get_state() + .table_config() + .log_retention_duration() + .as_millis() as i64; + let count = cleanup_expired_logs_for( + table.version(), + table.log_store().as_ref(), + log_retention_timestamp, + ) + .await + .unwrap(); + assert_eq!(count, 0); + println!("{:?}", count); + + let path = Path::from("_delta_log/00000000000000000000.json"); + let res = table.log_store().object_store().get(&path).await; + assert!(res.is_ok()); + } + + #[tokio::test] + async fn test_cleanup_with_checkpoints() { + let table = setup_table().await; + create_checkpoint(&table).await.unwrap(); + + let log_retention_timestamp = (Utc::now().timestamp_millis() + + Duration::days(32).num_milliseconds()) + - table + .get_state() + .table_config() + .log_retention_duration() + .as_millis() as i64; + let count = cleanup_expired_logs_for( + table.version(), + table.log_store().as_ref(), + log_retention_timestamp, + ) + .await + .unwrap(); + assert_eq!(count, 1); + + let log_store = table.log_store(); + + let path = log_store.log_path().child("00000000000000000000.json"); + let res = table.log_store().object_store().get(&path).await; + assert!(res.is_err()); + + let path = log_store + .log_path() + .child("00000000000000000001.checkpoint.parquet"); + let res = table.log_store().object_store().get(&path).await; + assert!(res.is_ok()); + + let path = log_store.log_path().child("00000000000000000001.json"); + let res = table.log_store().object_store().get(&path).await; + assert!(res.is_ok()); + } + #[test] fn apply_stats_conversion_test() { let mut stats = STATS_JSON.clone(); diff --git a/python/tests/test_checkpoint.py b/python/tests/test_checkpoint.py index 01c958034e..fa1ae6a8ae 100644 --- a/python/tests/test_checkpoint.py +++ b/python/tests/test_checkpoint.py @@ -27,7 +27,7 @@ def test_checkpoint(tmp_path: pathlib.Path, sample_data: pa.Table): assert checkpoint_path.exists() -def test_cleanup_metadata(tmp_path: pathlib.Path, sample_data: pa.Table): +def setup_cleanup_metadata(tmp_path: pathlib.Path, sample_data: pa.Table): tmp_table_path = tmp_path / "path" / "to" / "table" first_log_path = tmp_table_path / "_delta_log" / "00000000000000000000.json" second_log_path = tmp_table_path / "_delta_log" / "00000000000000000001.json" @@ -53,9 +53,33 @@ def test_cleanup_metadata(tmp_path: pathlib.Path, sample_data: pa.Table): assert first_log_path.exists() assert second_log_path.exists() assert third_log_path.exists() + return delta_table + +def test_cleanup_metadata(tmp_path: pathlib.Path, sample_data: pa.Table): + delta_table = setup_cleanup_metadata(tmp_path, sample_data) + delta_table.create_checkpoint() delta_table.cleanup_metadata() + tmp_table_path = tmp_path / "path" / "to" / "table" + first_log_path = tmp_table_path / "_delta_log" / "00000000000000000000.json" + second_log_path = tmp_table_path / "_delta_log" / "00000000000000000001.json" + third_log_path = tmp_table_path / "_delta_log" / "00000000000000000002.json" + assert not first_log_path.exists() assert second_log_path.exists() assert third_log_path.exists() + + +def test_cleanup_metadata_no_checkpoint(tmp_path: pathlib.Path, sample_data: pa.Table): + delta_table = setup_cleanup_metadata(tmp_path, sample_data) + delta_table.cleanup_metadata() + + tmp_table_path = tmp_path / "path" / "to" / "table" + first_log_path = tmp_table_path / "_delta_log" / "00000000000000000000.json" + second_log_path = tmp_table_path / "_delta_log" / "00000000000000000001.json" + third_log_path = tmp_table_path / "_delta_log" / "00000000000000000002.json" + + assert first_log_path.exists() + assert second_log_path.exists() + assert third_log_path.exists()