diff --git a/rust/src/logstore/default_logstore.rs b/crates/deltalake-core/src/logstore/default_logstore.rs similarity index 100% rename from rust/src/logstore/default_logstore.rs rename to crates/deltalake-core/src/logstore/default_logstore.rs diff --git a/rust/src/logstore/mod.rs b/crates/deltalake-core/src/logstore/mod.rs similarity index 100% rename from rust/src/logstore/mod.rs rename to crates/deltalake-core/src/logstore/mod.rs diff --git a/crates/deltalake-core/src/operations/vacuum.rs b/crates/deltalake-core/src/operations/vacuum.rs index 7f4bb41cd0..a950345a11 100644 --- a/crates/deltalake-core/src/operations/vacuum.rs +++ b/crates/deltalake-core/src/operations/vacuum.rs @@ -36,9 +36,8 @@ use serde_json::{Map, Value}; use super::transaction::commit; use crate::crate_version; use crate::errors::{DeltaResult, DeltaTableError}; +use crate::logstore::{LogStore, LogStoreRef}; use crate::protocol::{Action, DeltaOperation}; // Txn CommitInfo -use crate::logstore::LogStoreRef; -use crate::storage::DeltaObjectStore; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -238,7 +237,9 @@ impl std::future::IntoFuture for VacuumBuilder { )); } - let metrics = plan.execute(&this.log_store.object_store()).await?; + let metrics = plan + .execute(this.log_store.as_ref(), &this.snapshot) + .await?; Ok(( DeltaTable::new_with_state(this.log_store, this.snapshot), metrics, @@ -265,7 +266,7 @@ impl VacuumPlan { /// Execute the vacuum plan and delete files from underlying storage pub async fn execute( self, - store: &DeltaObjectStore, + store: &dyn LogStore, snapshot: &DeltaTableState, ) -> Result { if self.files_to_delete.is_empty() { @@ -314,6 +315,7 @@ impl VacuumPlan { .boxed(); let files_deleted = store + .object_store() .delete_stream(locations) .map(|res| match res { Ok(path) => Ok(path.to_string()), diff --git a/crates/deltalake-core/tests/common/mod.rs b/crates/deltalake-core/tests/common/mod.rs index 930b8af5b5..577e17f74e 100644 --- a/crates/deltalake-core/tests/common/mod.rs +++ b/crates/deltalake-core/tests/common/mod.rs @@ -1,12 +1,11 @@ #![allow(dead_code, unused_variables)] use bytes::Bytes; +use deltalake_core::logstore::LogStore; use deltalake_core::operations::create::CreateBuilder; use deltalake_core::operations::transaction::commit; use deltalake_core::protocol::{self, Add, DeltaOperation, Remove, SaveMode}; -use deltalake_core::storage::DeltaObjectStore; use deltalake_core::DeltaTableBuilder; -use deltalake::logstore::LogStore; use deltalake_core::{DeltaTable, Schema}; use object_store::{path::Path, ObjectStore}; use std::any::Any; diff --git a/crates/deltalake-core/tests/integration_datafusion.rs b/crates/deltalake-core/tests/integration_datafusion.rs index fe45ad4d38..e8af4d4442 100644 --- a/crates/deltalake-core/tests/integration_datafusion.rs +++ b/crates/deltalake-core/tests/integration_datafusion.rs @@ -45,7 +45,7 @@ use std::error::Error; mod common; mod local { - use deltalake::{writer::JsonWriter, SchemaTypeMap}; + use deltalake_core::{writer::JsonWriter, SchemaTypeMap}; use super::*; #[tokio::test] @@ -153,7 +153,7 @@ mod local { #[tokio::test] async fn test_datafusion_simple_query_partitioned() -> Result<()> { let ctx = SessionContext::new(); - let table = deltalake::open_table("./tests/data/delta-0.8.0-partitioned") + let table = deltalake_core::open_table("./tests/data/delta-0.8.0-partitioned") .await .unwrap(); ctx.register_table("demo", Arc::new(table))?; @@ -182,7 +182,7 @@ mod local { let source_scan_bytes = { let ctx = SessionContext::new(); let state = ctx.state(); - let source_table = deltalake::open_table("./tests/data/delta-0.8.0-date").await?; + let source_table = deltalake_core::open_table("./tests/data/delta-0.8.0-date").await?; let source_scan = source_table.scan(&state, None, &[], None).await?; physical_plan_to_bytes_with_extension_codec(source_scan, &DeltaPhysicalCodec {})? }; @@ -261,7 +261,7 @@ mod local { #[tokio::test] async fn test_datafusion_date_column() -> Result<()> { let ctx = SessionContext::new(); - let table = deltalake::open_table("./tests/data/delta-0.8.0-date") + let table = deltalake_core::open_table("./tests/data/delta-0.8.0-date") .await .unwrap(); ctx.register_table("dates", Arc::new(table))?; @@ -282,7 +282,7 @@ mod local { #[tokio::test] async fn test_datafusion_stats() -> Result<()> { - let table = deltalake::open_table("./tests/data/delta-0.8.0") + let table = deltalake_core::open_table("./tests/data/delta-0.8.0") .await .unwrap(); let statistics = table.state.datafusion_table_statistics(); @@ -734,7 +734,7 @@ mod local { assert_eq!(metrics.num_scanned_files(), 1); // Ensure that tables without stats and partition columns can be pruned for just partitions - // let table = deltalake::open_table("./tests/data/delta-0.8.0-null-partition").await?; + // let table = deltalake_core::open_table("./tests/data/delta-0.8.0-null-partition").await?; /* // Logically this should prune. See above @@ -764,7 +764,7 @@ mod local { #[tokio::test] async fn test_datafusion_partitioned_types() -> Result<()> { let ctx = SessionContext::new(); - let table = deltalake::open_table("./tests/data/delta-2.2.0-partitioned-types") + let table = deltalake_core::open_table("./tests/data/delta-2.2.0-partitioned-types") .await .unwrap(); ctx.register_table("demo", Arc::new(table))?; @@ -813,7 +813,7 @@ mod local { #[tokio::test] async fn test_datafusion_scan_timestamps() -> Result<()> { let ctx = SessionContext::new(); - let table = deltalake::open_table("./tests/data/table_with_edge_timestamps") + let table = deltalake_core::open_table("./tests/data/table_with_edge_timestamps") .await .unwrap(); ctx.register_table("demo", Arc::new(table))?; @@ -837,7 +837,7 @@ mod local { #[tokio::test] async fn test_issue_1292_datafusion_sql_projection() -> Result<()> { let ctx = SessionContext::new(); - let table = deltalake::open_table("./tests/data/http_requests") + let table = deltalake_core::open_table("./tests/data/http_requests") .await .unwrap(); ctx.register_table("http_requests", Arc::new(table))?; @@ -868,7 +868,7 @@ mod local { #[tokio::test] async fn test_issue_1291_datafusion_sql_partitioned_data() -> Result<()> { let ctx = SessionContext::new(); - let table = deltalake::open_table("./tests/data/http_requests") + let table = deltalake_core::open_table("./tests/data/http_requests") .await .unwrap(); ctx.register_table("http_requests", Arc::new(table))?; @@ -901,7 +901,7 @@ mod local { #[tokio::test] async fn test_issue_1374() -> Result<()> { let ctx = SessionContext::new(); - let table = deltalake::open_table("./tests/data/issue_1374") + let table = deltalake_core::open_table("./tests/data/issue_1374") .await .unwrap(); ctx.register_table("t", Arc::new(table))?; @@ -948,14 +948,15 @@ mod local { true, HashMap::new(), )]; - let schema = deltalake::Schema::new(fields); - let table = deltalake::DeltaTableBuilder::from_uri("./tests/data/issue-1619").build()?; + let schema = deltalake_core::Schema::new(fields); + let table = + deltalake_core::DeltaTableBuilder::from_uri("./tests/data/issue-1619").build()?; let _ = DeltaOps::from(table) .create() .with_columns(schema.get_fields().to_owned()) .await?; - let mut table = deltalake::open_table("./tests/data/issue-1619").await?; + let mut table = deltalake_core::open_table("./tests/data/issue-1619").await?; let mut writer = JsonWriter::for_table(&table).unwrap(); writer diff --git a/crates/deltalake-core/tests/integration_read.rs b/crates/deltalake-core/tests/integration_read.rs index 5abdaa5b38..08a4163bed 100644 --- a/crates/deltalake-core/tests/integration_read.rs +++ b/crates/deltalake-core/tests/integration_read.rs @@ -60,7 +60,7 @@ mod local { assert_eq!(table.get_files(), vec![Path::from(a.path.clone())]); // Remove added file. - let r = deltalake::protocol::Remove { + let r = deltalake_core::protocol::Remove { path: a.path.clone(), deletion_timestamp: Some(chrono::Utc::now().timestamp_millis()), data_change: false, @@ -211,7 +211,7 @@ async fn read_simple_table(integration: &IntegrationContext) -> TestResult { ); let tombstones = table.get_state().all_tombstones(); assert_eq!(tombstones.len(), 31); - assert!(tombstones.contains(&deltalake::protocol::Remove { + assert!(tombstones.contains(&deltalake_core::protocol::Remove { path: "part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet".to_string(), deletion_timestamp: Some(1587968596250), data_change: true, @@ -247,7 +247,7 @@ async fn read_simple_table_with_version(integration: &IntegrationContext) -> Tes ); let tombstones = table.get_state().all_tombstones(); assert_eq!(tombstones.len(), 29); - assert!(tombstones.contains(&deltalake::protocol::Remove { + assert!(tombstones.contains(&deltalake_core::protocol::Remove { path: "part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet".to_string(), deletion_timestamp: Some(1587968596250), data_change: true, @@ -292,7 +292,7 @@ mod gcs { #[tokio::test] async fn test_gcs_simple() { let bucket = std::env::var("GCS_DELTA_BUCKET").unwrap(); - let table = deltalake::open_table(format!("gs://{}/simple_table", bucket).as_str()) + let table = deltalake_core::open_table(format!("gs://{}/simple_table", bucket).as_str()) .await .unwrap(); assert_eq!(table.version(), 4); @@ -310,7 +310,7 @@ mod gcs { ); let tombstones = table.get_state().all_tombstones(); assert_eq!(tombstones.len(), 31); - assert!(tombstones.contains(&deltalake::protocol::Remove { + assert!(tombstones.contains(&deltalake_core::protocol::Remove { path: "part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet".to_string(), deletion_timestamp: Some(1587968596250), data_change: true, diff --git a/python/src/lib.rs b/python/src/lib.rs index e505038935..54fb2352a5 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -859,9 +859,8 @@ impl RawDeltaTable { /// have been deleted or are malformed #[pyo3(signature = (dry_run = true))] pub fn repair(&mut self, dry_run: bool) -> PyResult { - let cmd = - FileSystemCheckBuilder::new(self._table.object_store(), self._table.state.clone()) - .with_dry_run(dry_run); + let cmd = FileSystemCheckBuilder::new(self._table.log_store(), self._table.state.clone()) + .with_dry_run(dry_run); let (table, metrics) = rt()? .block_on(cmd.into_future())