Skip to content

Commit

Permalink
fix: ensure metadata cleanup do not corrupt tables without checkpoints (
Browse files Browse the repository at this point in the history
#2044)

# Description
When metadata cleanup is executed on a delta table without checkpoints
it will corrupt the table and prevent further loading. This is a high
risk for people who use delta-rs since our writers do not automatically
create checkpoints.

# Related Issue(s)
- closes #2016
  • Loading branch information
Blajda authored Jan 7, 2024
1 parent 70fc6c0 commit 25040b8
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 2 deletions.
109 changes: 108 additions & 1 deletion crates/deltalake-core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use arrow_schema::{ArrowError, Schema as ArrowSchema};
use chrono::{Datelike, Utc};
use futures::{StreamExt, TryStreamExt};
use lazy_static::lazy_static;
use object_store::ObjectStore;
use object_store::{Error, ObjectStore};
use parquet::arrow::ArrowWriter;
use parquet::errors::ParquetError;
use regex::Regex;
Expand Down Expand Up @@ -183,6 +183,19 @@ pub async fn cleanup_expired_logs_for(
Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint).*$").unwrap();
}

let object_store = log_store.object_store();
let maybe_last_checkpoint = object_store
.get(&log_store.log_path().child("_last_checkpoint"))
.await;

if let Err(Error::NotFound { path: _, source: _ }) = maybe_last_checkpoint {
return Ok(0);
}

let last_checkpoint = maybe_last_checkpoint?.bytes().await?;
let last_checkpoint: CheckPoint = serde_json::from_slice(&last_checkpoint)?;
let until_version = i64::min(until_version, last_checkpoint.version);

// Feed a stream of candidate deletion files directly into the delete_stream
// function to try to improve the speed of cleanup and reduce the need for
// intermediate memory.
Expand Down Expand Up @@ -506,10 +519,15 @@ fn apply_stats_conversion(

#[cfg(test)]
mod tests {
use std::sync::Arc;

use super::*;
use arrow_array::{ArrayRef, RecordBatch};
use chrono::Duration;
use lazy_static::lazy_static;
use serde_json::json;

use crate::logstore;
use crate::operations::DeltaOps;
use crate::writer::test_utils::get_delta_schema;
use object_store::path::Path;
Expand Down Expand Up @@ -702,6 +720,95 @@ mod tests {
);
}

async fn setup_table() -> DeltaTable {
use arrow_schema::{DataType, Field};
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"id",
DataType::Utf8,
false,
)]));

let data =
vec![Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])) as ArrayRef];
let batches = vec![RecordBatch::try_new(schema.clone(), data).unwrap()];

let table = DeltaOps::new_in_memory()
.write(batches.clone())
.await
.unwrap();

DeltaOps(table)
.write(batches)
.with_save_mode(crate::protocol::SaveMode::Overwrite)
.await
.unwrap()
}

#[tokio::test]
async fn test_cleanup_no_checkpoints() {
// Test that metadata clean up does not corrupt the table when no checkpoints exist
let table = setup_table().await;

let log_retention_timestamp = (Utc::now().timestamp_millis()
+ Duration::days(31).num_milliseconds())
- table
.get_state()
.table_config()
.log_retention_duration()
.as_millis() as i64;
let count = cleanup_expired_logs_for(
table.version(),
table.log_store().as_ref(),
log_retention_timestamp,
)
.await
.unwrap();
assert_eq!(count, 0);
println!("{:?}", count);

let path = Path::from("_delta_log/00000000000000000000.json");
let res = table.log_store().object_store().get(&path).await;
assert!(res.is_ok());
}

#[tokio::test]
async fn test_cleanup_with_checkpoints() {
let table = setup_table().await;
create_checkpoint(&table).await.unwrap();

let log_retention_timestamp = (Utc::now().timestamp_millis()
+ Duration::days(32).num_milliseconds())
- table
.get_state()
.table_config()
.log_retention_duration()
.as_millis() as i64;
let count = cleanup_expired_logs_for(
table.version(),
table.log_store().as_ref(),
log_retention_timestamp,
)
.await
.unwrap();
assert_eq!(count, 1);

let log_store = table.log_store();

let path = log_store.log_path().child("00000000000000000000.json");
let res = table.log_store().object_store().get(&path).await;
assert!(res.is_err());

let path = log_store
.log_path()
.child("00000000000000000001.checkpoint.parquet");
let res = table.log_store().object_store().get(&path).await;
assert!(res.is_ok());

let path = log_store.log_path().child("00000000000000000001.json");
let res = table.log_store().object_store().get(&path).await;
assert!(res.is_ok());
}

#[test]
fn apply_stats_conversion_test() {
let mut stats = STATS_JSON.clone();
Expand Down
26 changes: 25 additions & 1 deletion python/tests/test_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def test_checkpoint(tmp_path: pathlib.Path, sample_data: pa.Table):
assert checkpoint_path.exists()


def test_cleanup_metadata(tmp_path: pathlib.Path, sample_data: pa.Table):
def setup_cleanup_metadata(tmp_path: pathlib.Path, sample_data: pa.Table):
tmp_table_path = tmp_path / "path" / "to" / "table"
first_log_path = tmp_table_path / "_delta_log" / "00000000000000000000.json"
second_log_path = tmp_table_path / "_delta_log" / "00000000000000000001.json"
Expand All @@ -53,9 +53,33 @@ def test_cleanup_metadata(tmp_path: pathlib.Path, sample_data: pa.Table):
assert first_log_path.exists()
assert second_log_path.exists()
assert third_log_path.exists()
return delta_table


def test_cleanup_metadata(tmp_path: pathlib.Path, sample_data: pa.Table):
delta_table = setup_cleanup_metadata(tmp_path, sample_data)
delta_table.create_checkpoint()
delta_table.cleanup_metadata()

tmp_table_path = tmp_path / "path" / "to" / "table"
first_log_path = tmp_table_path / "_delta_log" / "00000000000000000000.json"
second_log_path = tmp_table_path / "_delta_log" / "00000000000000000001.json"
third_log_path = tmp_table_path / "_delta_log" / "00000000000000000002.json"

assert not first_log_path.exists()
assert second_log_path.exists()
assert third_log_path.exists()


def test_cleanup_metadata_no_checkpoint(tmp_path: pathlib.Path, sample_data: pa.Table):
delta_table = setup_cleanup_metadata(tmp_path, sample_data)
delta_table.cleanup_metadata()

tmp_table_path = tmp_path / "path" / "to" / "table"
first_log_path = tmp_table_path / "_delta_log" / "00000000000000000000.json"
second_log_path = tmp_table_path / "_delta_log" / "00000000000000000001.json"
third_log_path = tmp_table_path / "_delta_log" / "00000000000000000002.json"

assert first_log_path.exists()
assert second_log_path.exists()
assert third_log_path.exists()

0 comments on commit 25040b8

Please sign in to comment.