Skip to content

Commit

Permalink
fix: raise during checkpoint creating with without_files
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Dec 15, 2024
1 parent dc1c556 commit d5da51e
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 0 deletions.
6 changes: 6 additions & 0 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ use futures::future::BoxFuture;
use object_store::path::Path;
use object_store::Error as ObjectStoreError;
use serde_json::Value;
use tracing::warn;

use self::conflict_checker::{TransactionInfo, WinningCommitSummary};
use crate::checkpoints::{cleanup_expired_logs_for, create_checkpoint_for};
Expand Down Expand Up @@ -702,6 +703,11 @@ impl<'a> PostCommit<'a> {
log_store: &LogStoreRef,
version: i64,
) -> DeltaResult<()> {
if !table_state.load_config().require_files {
warn!("Checkpoint creation in post_commit_hook has been skipped due to table being initialized without files.");
return Ok(())
}

let checkpoint_interval = table_state.config().checkpoint_interval() as i64;
if ((version + 1) % checkpoint_interval) == 0 {
create_checkpoint_for(version, table_state, log_store.as_ref()).await?
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ pub async fn create_checkpoint_for(
state: &DeltaTableState,
log_store: &dyn LogStore,
) -> Result<(), ProtocolError> {
if !state.load_config().require_files {
return Err(ProtocolError::Generic(
"Table has not yet been initialized with files, therefore creating a checkpoint is not possible.".to_string()
));
}

if version != state.version() {
error!(
"create_checkpoint_for called with version {version} but table state contains: {}. The table state may need to be reloaded",
Expand Down
40 changes: 40 additions & 0 deletions python/tests/test_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pytest

from deltalake import DeltaTable, write_deltalake
from deltalake.exceptions import DeltaError
from deltalake.table import PostCommitHookProperties


Expand All @@ -32,6 +33,45 @@ def test_checkpoint(tmp_path: pathlib.Path, sample_data: pa.Table):
assert checkpoint_path.exists()


def test_checkpoint_without_files(tmp_path: pathlib.Path, sample_data: pa.Table):
tmp_table_path = tmp_path / "path" / "to" / "table"
checkpoint_path = tmp_table_path / "_delta_log" / "_last_checkpoint"
last_checkpoint_path = (
tmp_table_path / "_delta_log" / "00000000000000000000.checkpoint.parquet"
)

# TODO: Include binary after fixing issue "Json error: binary type is not supported"
sample_data = sample_data.drop(["binary"])
write_deltalake(
str(tmp_table_path),
sample_data,
configuration={"delta.checkpointInterval": "2"},
)

assert not checkpoint_path.exists()

delta_table = DeltaTable(str(tmp_table_path), without_files=True)
with pytest.raises(
DeltaError,
match="Table has not yet been initialized with files, therefore creating a checkpoint is not possible.",
):
delta_table.create_checkpoint()

for i in range(3):
write_deltalake(delta_table, sample_data, mode="append")

assert not checkpoint_path.exists()

delta_table = DeltaTable(str(tmp_table_path), without_files=False)
delta_table.create_checkpoint()

assert checkpoint_path.exists()
last_checkpoint_path = (
tmp_table_path / "_delta_log" / "00000000000000000003.checkpoint.parquet"
)
assert last_checkpoint_path.exists()


def setup_cleanup_metadata(tmp_path: pathlib.Path, sample_data: pa.Table):
tmp_table_path = tmp_path / "path" / "to" / "table"
first_log_path = tmp_table_path / "_delta_log" / "00000000000000000000.json"
Expand Down

0 comments on commit d5da51e

Please sign in to comment.