Skip to content

Commit

Permalink
Fix command_filesystem_check
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler committed Dec 30, 2023
1 parent 3f78db8 commit f03a9b4
Showing 1 changed file with 8 additions and 69 deletions.
77 changes: 8 additions & 69 deletions crates/deltalake-core/tests/command_filesystem_check.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,19 @@
#![cfg(feature = "integration_test")]

use deltalake_core::test_utils::{
set_env_if_not_set, IntegrationContext, StorageIntegration, TestResult, TestTables,
};
use deltalake_core::Path;
use deltalake_core::{errors::DeltaTableError, DeltaOps};
use deltalake_test::utils::*;
use serial_test::serial;

#[tokio::test]
#[serial]
async fn test_filesystem_check_local() -> TestResult {
test_filesystem_check(StorageIntegration::Local).await
}

#[cfg(any(feature = "s3", feature = "s3-native-tls"))]
#[tokio::test]
#[serial]
async fn test_filesystem_check_aws() -> TestResult {
set_env_if_not_set("AWS_S3_ALLOW_UNSAFE_RENAME", "true");
set_env_if_not_set("AWS_S3_LOCKING_PROVIDER", "none");
test_filesystem_check(StorageIntegration::Amazon).await
}

#[cfg(feature = "azure")]
#[tokio::test]
#[serial]
async fn test_filesystem_check_azure() -> TestResult {
test_filesystem_check(StorageIntegration::Microsoft).await
}

#[cfg(feature = "gcs")]
#[tokio::test]
#[serial]
async fn test_filesystem_check_gcp() -> TestResult {
test_filesystem_check(StorageIntegration::Google).await
}

#[cfg(feature = "hdfs")]
#[tokio::test]
#[serial]
async fn test_filesystem_check_hdfs() -> TestResult {
Ok(test_filesystem_check(StorageIntegration::Hdfs).await?)
let storage = Box::new(LocalStorageIntegration::default());
let context = IntegrationContext::new(storage)?;
test_filesystem_check(&context).await
}

async fn test_filesystem_check(storage: StorageIntegration) -> TestResult {
let context = IntegrationContext::new(storage)?;
async fn test_filesystem_check(context: &IntegrationContext) -> TestResult {
context.load_table(TestTables::Simple).await?;
let file = "part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet";
let path = Path::from_iter([&TestTables::Simple.as_name(), file]);
Expand Down Expand Up @@ -86,7 +55,7 @@ async fn test_filesystem_check(storage: StorageIntegration) -> TestResult {
#[tokio::test]
#[serial]
async fn test_filesystem_check_partitioned() -> TestResult {
let storage = StorageIntegration::Local;
let storage = Box::new(LocalStorageIntegration::default());
let context = IntegrationContext::new(storage)?;
context
.load_table(TestTables::Delta0_8_0Partitioned)
Expand Down Expand Up @@ -120,7 +89,8 @@ async fn test_filesystem_check_partitioned() -> TestResult {
#[serial]
async fn test_filesystem_check_fails_for_concurrent_delete() -> TestResult {
// Validate failure when a non dry only executes on the latest version
let context = IntegrationContext::new(StorageIntegration::Local)?;
let storage = Box::new(LocalStorageIntegration::default());
let context = IntegrationContext::new(storage)?;
context.load_table(TestTables::Simple).await?;
let file = "part-00003-53f42606-6cda-4f13-8d07-599a21197296-c000.snappy.parquet";
let path = Path::from_iter([&TestTables::Simple.as_name(), file]);
Expand All @@ -142,34 +112,3 @@ async fn test_filesystem_check_fails_for_concurrent_delete() -> TestResult {

Ok(())
}

#[tokio::test]
#[serial]
#[ignore = "should this actually fail? with conflict resolution, we are re-trying again."]
async fn test_filesystem_check_outdated() -> TestResult {
// Validate failure when a non dry only executes on the latest version
let context = IntegrationContext::new(StorageIntegration::Local)?;
context.load_table(TestTables::Simple).await?;
let file = "part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet";
let path = Path::from_iter([&TestTables::Simple.as_name(), file]);

// Delete an active file from underlying storage without an update to the log to simulate an external fault
context.object_store().delete(&path).await?;

let table = context
.table_builder(TestTables::Simple)
.with_version(2)
.load()
.await?;

let op = DeltaOps::from(table);
let res = op.filesystem_check().with_dry_run(false).await;
println!("{:?}", res);
if let Err(DeltaTableError::VersionAlreadyExists(version)) = res {
assert!(version == 3);
} else {
panic!();
}

Ok(())
}

0 comments on commit f03a9b4

Please sign in to comment.