Skip to content

Commit

Permalink
Prevent writing checkpoints with a version that does not exist in tab…
Browse files Browse the repository at this point in the history
…le state

I have seen this in a production environment where the same writer is issuing
append transactions using the operations API, which returns the newly created
version, such as 10.

If the caller then attempts to create a checkpoint for version 10, the operation
will produce an inconsistency in the `_last_checkpoint` file, if the callers
in-memory table state has *not* been reloaded since the append operation
completed.

In this scenario the _delta_log/ directory may contain:
.
    ├── 00000000000000000000.json
    ├── 00000000000000000001.json
    ├── 00000000000000000002.json
    ├── 00000000000000000003.json
    ├── 00000000000000000004.json
    ├── 00000000000000000005.json
    ├── 00000000000000000006.json
    ├── 00000000000000000007.json
    ├── 00000000000000000008.json
    ├── 00000000000000000009.json
    ├── 00000000000000000010.checkpoint.parquet
    ├── 00000000000000000010.json
    └── _last_checkpoint

While `_last_checkpoint` contains the following:
    {"num_of_add_files":null,"parts":null,"size":342,"size_in_bytes":95104,"version":9}

This will result in an error on any attempts to read the Delta table:

    >>> from deltalake import DeltaTable
    >>> dt = DeltaTable('.')
    [2023-11-14T18:05:59Z DEBUG deltalake_core::protocol] loading checkpoint from _delta_log/_last_checkpoint
    [2023-11-14T18:05:59Z DEBUG deltalake_core::table] update with latest checkpoint CheckPoint { version: 9, size: 342, parts: None, size_in_bytes: Some(95104), num_of_add_files: None }
    Traceback (most recent call last):
    File "<stdin>", line 1, in <module>
    File "/home/tyler/venv/lib64/python3.11/site-packages/deltalake/table.py", line 253, in __init__
        self._table = RawDeltaTable(
                    ^^^^^^^^^^^^^^
    FileNotFoundError: Object at location /home/tyler/corrupted-table/_delta_log/00000000000000000009.checkpoint.parquet not found: No such file or directory (os error 2)
    >>>

To prevent this error condition, the create_checkpoint_for() function should ensure
that the provided checkpoint version (used to write the `.checkpoint.parquet` file) matches
the table state's version (used to write the `_last_checkpoint` file).

This has the added benefit of helping prevent the caller from passing in a
nonsensical version number that could also lead to a broken table.

Sponsored-by: Scribd Inc
  • Loading branch information
rtyler committed Nov 14, 2023
1 parent 1a2c95e commit cdf52df
Showing 1 changed file with 79 additions and 0 deletions.
79 changes: 79 additions & 0 deletions crates/deltalake-core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ enum CheckpointError {
#[error("Partition value {0} cannot be parsed from string.")]
PartitionValueNotParseable(String),

/// Caller attempt to create a checkpoint for a version which does not exist on the table state
#[error("Attempted to create a checkpoint for a version {0} that does not match the table state {1}")]
StaleTableVersion(i64, i64),

/// Error returned when the parquet writer fails while writing the checkpoint.
#[error("Failed to write parquet: {}", .source)]
Parquet {
Expand All @@ -60,6 +64,7 @@ impl From<CheckpointError> for ProtocolError {
match value {
CheckpointError::PartitionValueNotParseable(_) => Self::InvalidField(value.to_string()),
CheckpointError::Arrow { source } => Self::Arrow { source },
CheckpointError::StaleTableVersion(..) => Self::Generic(value.to_string()),
CheckpointError::Parquet { source } => Self::ParquetParseError { source },
}
}
Expand Down Expand Up @@ -117,6 +122,14 @@ pub async fn create_checkpoint_for(
state: &DeltaTableState,
log_store: &dyn LogStore,
) -> Result<(), ProtocolError> {
if version != state.version() {
error!(
"create_checkpoint_for called with version {version} but table state contains: {}. The table state may need to be reloaded",
state.version()
);
return Err(CheckpointError::StaleTableVersion(version, state.version()).into());
}

// TODO: checkpoints _can_ be multi-part... haven't actually found a good reference for
// an appropriate split point yet though so only writing a single part currently.
// See https://github.com/delta-io/delta-rs/issues/288
Expand Down Expand Up @@ -486,6 +499,72 @@ mod tests {
use lazy_static::lazy_static;
use serde_json::json;

use crate::operations::DeltaOps;
use crate::writer::test_utils::get_delta_schema;
use object_store::path::Path;

#[tokio::test]
async fn test_create_checkpoint_for() {
let table_schema = get_delta_schema();

let table = DeltaOps::new_in_memory()
.create()
.with_columns(table_schema.fields().clone())
.with_save_mode(crate::protocol::SaveMode::Ignore)
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_metadata().unwrap().schema, table_schema);
let res = create_checkpoint_for(0, table.get_state(), table.log_store.as_ref()).await;
assert!(res.is_ok());

// Look at the "files" and verify that the _last_checkpoint has the right version
let path = Path::from("_delta_log/_last_checkpoint");
let last_checkpoint = table
.object_store()
.get(&path)
.await
.expect("Failed to get the _last_checkpoint")
.bytes()
.await
.expect("Failed to get bytes for _last_checkpoint");
let last_checkpoint: CheckPoint = serde_json::from_slice(&last_checkpoint).expect("Fail");
assert_eq!(last_checkpoint.version, 0);
}

#[tokio::test]
async fn test_create_checkpoint_for_invalid_version() {
let table_schema = get_delta_schema();

let table = DeltaOps::new_in_memory()
.create()
.with_columns(table_schema.fields().clone())
.with_save_mode(crate::protocol::SaveMode::Ignore)
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_metadata().unwrap().schema, table_schema);
match create_checkpoint_for(1, table.get_state(), table.log_store.as_ref()).await {
Ok(_) => {
/*
* If a checkpoint is allowed to be created here, it will use the passed in
* version, but _last_checkpoint is generated from the table state will point to a
* version 0 checkpoint.
* E.g.
*
* Path { raw: "_delta_log/00000000000000000000.json" }
* Path { raw: "_delta_log/00000000000000000001.checkpoint.parquet" }
* Path { raw: "_delta_log/_last_checkpoint" }
*
*/
panic!(
"We should not allow creating a checkpoint for a version which doesn't exist!"
);
}
Err(_) => { /* We should expect an error in the "right" case */ }
}
}

#[test]
fn typed_partition_value_from_string_test() {
let string_value: Value = "Hello World!".into();
Expand Down

0 comments on commit cdf52df

Please sign in to comment.