From 5583247bae897fc2d25244d670a4f0dc300c6399 Mon Sep 17 00:00:00 2001 From: Thomas Peiselt Date: Tue, 7 Nov 2023 14:37:27 +0100 Subject: [PATCH] Remove `DeltaObjectStore`, in favor of `LogStore` + `ObjectStore` --- .../src/logstore/default_logstore.rs | 34 ++- crates/deltalake-core/src/logstore/mod.rs | 75 +++++- .../src/operations/transaction/mod.rs | 12 +- crates/deltalake-core/src/storage/config.rs | 6 +- crates/deltalake-core/src/storage/mod.rs | 237 +----------------- crates/deltalake-core/src/table/builder.rs | 12 +- crates/deltalake-core/src/table/mod.rs | 23 +- crates/deltalake-core/src/test_utils.rs | 3 +- crates/deltalake-core/src/writer/json.rs | 4 +- .../deltalake-core/src/writer/record_batch.rs | 4 +- crates/deltalake-core/tests/fs_common/mod.rs | 9 +- .../tests/integration_checkpoint.rs | 18 +- .../tests/integration_datafusion.rs | 6 +- .../tests/repair_s3_rename_test.rs | 3 +- 14 files changed, 145 insertions(+), 301 deletions(-) diff --git a/crates/deltalake-core/src/logstore/default_logstore.rs b/crates/deltalake-core/src/logstore/default_logstore.rs index 5cf9b9d7c9..6fca5a95a0 100644 --- a/crates/deltalake-core/src/logstore/default_logstore.rs +++ b/crates/deltalake-core/src/logstore/default_logstore.rs @@ -5,21 +5,24 @@ use std::sync::Arc; use bytes::Bytes; #[cfg(feature = "datafusion")] use datafusion::execution::object_store::ObjectStoreUrl; -use object_store::path::Path; +use object_store::{path::Path, ObjectStore}; use url::Url; -use super::LogStore; +use super::{LogStore, LogStoreConfig}; use crate::{ operations::transaction::TransactionError, - storage::{config::StorageOptions, DeltaObjectStore, ObjectStoreRef}, + storage::{ + config::{self, StorageOptions}, + ObjectStoreRef, + }, DeltaResult, }; /// Default [`LogStore`] implementation #[derive(Debug, Clone)] pub struct DefaultLogStore { - pub(crate) storage: ObjectStoreRef, - location: Url, + pub(crate) storage: Arc, + config: LogStoreConfig, } impl DefaultLogStore { @@ -29,16 +32,17 @@ impl DefaultLogStore { /// /// * `storage` - A shared reference to an [`object_store::ObjectStore`] with "/" pointing at delta table root (i.e. where `_delta_log` is located). /// * `location` - A url corresponding to the storage location of `storage`. - pub fn new(storage: ObjectStoreRef, location: Url) -> Self { - DefaultLogStore { storage, location } + pub fn new(storage: ObjectStoreRef, config: LogStoreConfig) -> Self { + DefaultLogStore { storage, config } } /// Create log store pub fn try_new(location: Url, options: impl Into + Clone) -> DeltaResult { - let object_store = DeltaObjectStore::try_new(location.clone(), options.clone())?; + let mut options = options.into(); + let storage = config::configure_store(&location, &mut options)?; Ok(Self { - storage: Arc::new(object_store), - location, + storage: Arc::new(storage), + config: LogStoreConfig { location, options }, }) } } @@ -66,16 +70,20 @@ impl LogStore for DefaultLogStore { super::get_latest_version(self, current_version).await } - fn object_store(&self) -> ObjectStoreRef { + fn object_store(&self) -> Arc { self.storage.clone() } fn to_uri(&self, location: &Path) -> String { - super::to_uri(&self.location, location) + super::to_uri(&self.config.location, location) } #[cfg(feature = "datafusion")] fn object_store_url(&self) -> ObjectStoreUrl { - super::object_store_url(&self.location) + super::object_store_url(&self.config.location) + } + + fn config(&self) -> &LogStoreConfig { + &self.config } } diff --git a/crates/deltalake-core/src/logstore/mod.rs b/crates/deltalake-core/src/logstore/mod.rs index 3e543046f0..3b9b38e56a 100644 --- a/crates/deltalake-core/src/logstore/mod.rs +++ b/crates/deltalake-core/src/logstore/mod.rs @@ -2,14 +2,19 @@ use futures::StreamExt; use lazy_static::lazy_static; use regex::Regex; -use std::{cmp::max, sync::Arc}; +use serde::{ + de::{Error, SeqAccess, Visitor}, + ser::SerializeSeq, + Deserialize, Serialize, +}; +use std::{cmp::max, collections::HashMap, sync::Arc}; use url::Url; use crate::{ errors::DeltaResult, operations::transaction::TransactionError, protocol::{get_last_checkpoint, ProtocolError}, - storage::{commit_uri_from_version, DeltaObjectStore, ObjectStoreRef}, + storage::{commit_uri_from_version, config::StorageOptions}, DeltaTableError, }; use bytes::Bytes; @@ -30,6 +35,15 @@ lazy_static! { static ref DELTA_LOG_PATH: Path = Path::from("_delta_log"); } +/// Configuration parameters for a log store +#[derive(Debug, Clone)] +pub struct LogStoreConfig { + /// url corresponding to the storage location. + pub location: Url, + /// Options used for configuring backend storage + pub options: StorageOptions, +} + /// Trait for critical operations required to read and write commit entries in Delta logs. /// /// The correctness is predicated on the atomicity and durability guarantees of @@ -60,7 +74,7 @@ pub trait LogStore: Sync + Send { async fn get_latest_version(&self, start_version: i64) -> DeltaResult; /// Get underlying object store. - fn object_store(&self) -> ObjectStoreRef; + fn object_store(&self) -> Arc; /// [Path] to Delta log fn to_uri(&self, location: &Path) -> String; @@ -95,7 +109,7 @@ pub trait LogStore: Sync + Send { /// Generate a unique enough url to identify the store in datafusion. /// The DF object store registry only cares about the scheme and the host of the url for /// registering/fetching. In our case the scheme is hard-coded to "delta-rs", so to get a unique - /// host we convert the location from this `DeltaObjectStore` to a valid name, combining the + /// host we convert the location from this `LogStore` to a valid name, combining the /// original scheme, host and path with invalid characters replaced. fn object_store_url(&self) -> ObjectStoreUrl; @@ -111,6 +125,9 @@ pub trait LogStore: Sync + Send { } Ok(()) } + + /// Get configuration representing configured log store. + fn config(&self) -> &LogStoreConfig; } // TODO: maybe a bit of a hack, required to `#[derive(Debug)]` for the operation builders @@ -120,6 +137,54 @@ impl std::fmt::Debug for dyn LogStore + '_ { } } +impl Serialize for LogStoreConfig { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut seq = serializer.serialize_seq(None)?; + seq.serialize_element(&self.location.to_string())?; + seq.serialize_element(&self.options.0)?; + seq.end() + } +} + +impl<'de> Deserialize<'de> for LogStoreConfig { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct LogStoreConfigVisitor {} + + impl<'de> Visitor<'de> for LogStoreConfigVisitor { + type Value = LogStoreConfig; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("struct LogStoreConfig") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: SeqAccess<'de>, + { + let location_str: String = seq + .next_element()? + .ok_or_else(|| A::Error::invalid_length(0, &self))?; + let options: HashMap = seq + .next_element()? + .ok_or_else(|| A::Error::invalid_length(0, &self))?; + let location = Url::parse(&location_str).unwrap(); + Ok(LogStoreConfig { + location, + options: options.into(), + }) + } + } + + deserializer.deserialize_seq(LogStoreConfigVisitor {}) + } +} + lazy_static! { static ref DELTA_LOG_REGEX: Regex = Regex::new(r"(\d{20})\.(json|checkpoint).*$").unwrap(); } @@ -228,7 +293,7 @@ async fn read_commit_entry(storage: &dyn ObjectStore, version: i64) -> DeltaResu } async fn write_commit_entry( - storage: &DeltaObjectStore, + storage: &dyn ObjectStore, version: i64, tmp_commit: &Path, ) -> Result<(), TransactionError> { diff --git a/crates/deltalake-core/src/operations/transaction/mod.rs b/crates/deltalake-core/src/operations/transaction/mod.rs index 6524d1e520..e5e808d2d5 100644 --- a/crates/deltalake-core/src/operations/transaction/mod.rs +++ b/crates/deltalake-core/src/operations/transaction/mod.rs @@ -243,8 +243,7 @@ mod tests { use self::test_utils::{create_remove_action, init_table_actions}; use super::*; use crate::{ - logstore::default_logstore::DefaultLogStore, - storage::{commit_uri_from_version, DeltaObjectStore}, + logstore::default_logstore::DefaultLogStore, storage::commit_uri_from_version, DeltaConfigKey, }; use object_store::memory::InMemory; @@ -294,8 +293,13 @@ mod tests { async fn test_try_commit_transaction() { let store = Arc::new(InMemory::new()); let url = Url::parse("mem://what/is/this").unwrap(); - let delta_store = DeltaObjectStore::new(store.clone(), url.clone()); - let log_store = DefaultLogStore::new(Arc::new(delta_store), url); + let log_store = DefaultLogStore::new( + store.clone(), + crate::logstore::LogStoreConfig { + location: url, + options: HashMap::new().into(), + }, + ); let tmp_path = Path::from("_delta_log/tmp"); let version_path = Path::from("_delta_log/00000000000000000000.json"); store.put(&tmp_path, bytes::Bytes::new()).await.unwrap(); diff --git a/crates/deltalake-core/src/storage/config.rs b/crates/deltalake-core/src/storage/config.rs index 16a9b0eb4b..1ddf7d9c8a 100644 --- a/crates/deltalake-core/src/storage/config.rs +++ b/crates/deltalake-core/src/storage/config.rs @@ -227,7 +227,8 @@ impl From> for StorageOptions { } } -pub(crate) fn configure_log_store( +/// Configure a [`LogStoreRef`] for the given url and configuration +pub fn configure_log_store( url: Url, options: impl Into + Clone, ) -> DeltaResult { @@ -236,7 +237,8 @@ pub(crate) fn configure_log_store( Ok(Arc::new(DefaultLogStore::try_new(url, options)?)) } -pub(crate) fn configure_store( +/// Configure an instance of an [`ObjectStore`] for the given url and configuration +pub fn configure_store( url: &Url, options: &mut StorageOptions, ) -> DeltaResult> { diff --git a/crates/deltalake-core/src/storage/mod.rs b/crates/deltalake-core/src/storage/mod.rs index ee3bab140e..b571905f8b 100644 --- a/crates/deltalake-core/src/storage/mod.rs +++ b/crates/deltalake-core/src/storage/mod.rs @@ -1,22 +1,8 @@ //! Object storage backend abstraction layer for Delta Table transaction logs and data -use std::collections::HashMap; -use std::fmt; -use std::ops::Range; use std::sync::Arc; -use bytes::Bytes; -use futures::stream::BoxStream; use lazy_static::lazy_static; -use object_store::GetOptions; -use serde::de::{Error, SeqAccess, Visitor}; -use serde::ser::SerializeSeq; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use tokio::io::AsyncWrite; -use url::Url; - -use self::config::StorageOptions; -use crate::errors::DeltaResult; pub mod config; pub mod file; @@ -42,224 +28,5 @@ pub(crate) fn commit_uri_from_version(version: i64) -> Path { DELTA_LOG_PATH.child(version.as_str()) } -/// Sharable reference to [`DeltaObjectStore`] -pub type ObjectStoreRef = Arc; - -/// Object Store implementation for DeltaTable. -/// -/// The [DeltaObjectStore] implements the [object_store::ObjectStore] trait to facilitate -/// interoperability with the larger rust / arrow ecosystem. Specifically it can directly -/// be registered as store within datafusion. -/// -/// The table root is treated as the root of the object store. -/// All [Path] are reported relative to the table root. -#[derive(Debug, Clone)] -pub struct DeltaObjectStore { - storage: Arc, - location: Url, - options: StorageOptions, -} - -impl std::fmt::Display for DeltaObjectStore { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "DeltaObjectStore({})", self.location.as_ref()) - } -} - -impl DeltaObjectStore { - /// Create a new instance of [`DeltaObjectStore`] - /// - /// # Arguments - /// - /// * `storage` - A shared reference to an [`object_store::ObjectStore`] with "/" pointing at delta table root (i.e. where `_delta_log` is located). - /// * `location` - A url corresponding to the storage location of `storage`. - pub fn new(storage: Arc, location: Url) -> Self { - Self { - storage, - location, - options: HashMap::new().into(), - } - } - - /// Try creating a new instance of [`DeltaObjectStore`] - /// - /// # Arguments - /// - /// * `location` - A url pointing to the root of the delta table. - /// * `options` - Options passed to underlying builders. See [`with_storage_options`](crate::table::builder::DeltaTableBuilder::with_storage_options) - pub fn try_new(location: Url, options: impl Into + Clone) -> DeltaResult { - let mut options = options.into(); - let storage = config::configure_store(&location, &mut options)?; - Ok(Self { - storage, - location, - options, - }) - } - - /// Get a reference to the underlying storage backend - pub fn storage_backend(&self) -> Arc { - self.storage.clone() - } - - /// Storage options used to initialize storage backend - pub fn storage_options(&self) -> &StorageOptions { - &self.options - } - - pub(crate) fn location(&self) -> Url { - self.location.clone() - } -} - -#[async_trait::async_trait] -impl ObjectStore for DeltaObjectStore { - /// Save the provided bytes to the specified location. - async fn put(&self, location: &Path, bytes: Bytes) -> ObjectStoreResult<()> { - self.storage.put(location, bytes).await - } - - /// Return the bytes that are stored at the specified location. - async fn get(&self, location: &Path) -> ObjectStoreResult { - self.storage.get(location).await - } - - /// Perform a get request with options - /// - /// Note: options.range will be ignored if [`object_store::GetResultPayload::File`] - async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult { - self.storage.get_opts(location, options).await - } - - /// Return the bytes that are stored at the specified location - /// in the given byte range - async fn get_range(&self, location: &Path, range: Range) -> ObjectStoreResult { - self.storage.get_range(location, range).await - } - - /// Return the metadata for the specified location - async fn head(&self, location: &Path) -> ObjectStoreResult { - self.storage.head(location).await - } - - /// Delete the object at the specified location. - async fn delete(&self, location: &Path) -> ObjectStoreResult<()> { - self.storage.delete(location).await - } - - /// List all the objects with the given prefix. - /// - /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of - /// `foo/bar_baz/x`. - async fn list( - &self, - prefix: Option<&Path>, - ) -> ObjectStoreResult>> { - self.storage.list(prefix).await - } - - /// List all the objects with the given prefix and a location greater than `offset` - /// - /// Some stores, such as S3 and GCS, may be able to push `offset` down to reduce - /// the number of network requests required - async fn list_with_offset( - &self, - prefix: Option<&Path>, - offset: &Path, - ) -> ObjectStoreResult>> { - self.storage.list_with_offset(prefix, offset).await - } - - /// List objects with the given prefix and an implementation specific - /// delimiter. Returns common prefixes (directories) in addition to object - /// metadata. - /// - /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of - /// `foo/bar_baz/x`. - async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult { - self.storage.list_with_delimiter(prefix).await - } - - /// Copy an object from one path to another in the same object store. - /// - /// If there exists an object at the destination, it will be overwritten. - async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { - self.storage.copy(from, to).await - } - - /// Copy an object from one path to another, only if destination is empty. - /// - /// Will return an error if the destination already has an object. - async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { - self.storage.copy_if_not_exists(from, to).await - } - - /// Move an object from one path to another in the same object store. - /// - /// Will return an error if the destination already has an object. - async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { - self.storage.rename_if_not_exists(from, to).await - } - - async fn put_multipart( - &self, - location: &Path, - ) -> ObjectStoreResult<(MultipartId, Box)> { - self.storage.put_multipart(location).await - } - - async fn abort_multipart( - &self, - location: &Path, - multipart_id: &MultipartId, - ) -> ObjectStoreResult<()> { - self.storage.abort_multipart(location, multipart_id).await - } -} - -impl Serialize for DeltaObjectStore { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let mut seq = serializer.serialize_seq(None)?; - seq.serialize_element(&self.location.to_string())?; - seq.serialize_element(&self.options.0)?; - seq.end() - } -} - -impl<'de> Deserialize<'de> for DeltaObjectStore { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - struct DeltaObjectStoreVisitor {} - - impl<'de> Visitor<'de> for DeltaObjectStoreVisitor { - type Value = DeltaObjectStore; - - fn expecting(&self, formatter: &mut std::fmt::Formatter) -> fmt::Result { - formatter.write_str("struct DeltaObjectStore") - } - - fn visit_seq(self, mut seq: A) -> Result - where - A: SeqAccess<'de>, - { - let location_str: String = seq - .next_element()? - .ok_or_else(|| A::Error::invalid_length(0, &self))?; - let options: HashMap = seq - .next_element()? - .ok_or_else(|| A::Error::invalid_length(0, &self))?; - let location = Url::parse(&location_str).unwrap(); - let table = DeltaObjectStore::try_new(location, options) - .map_err(|_| A::Error::custom("Failed deserializing DeltaObjectStore"))?; - Ok(table) - } - } - - deserializer.deserialize_seq(DeltaObjectStoreVisitor {}) - } -} +/// Sharable reference to [`ObjectStore`] +pub type ObjectStoreRef = Arc; diff --git a/crates/deltalake-core/src/table/builder.rs b/crates/deltalake-core/src/table/builder.rs index a8e7d15e01..2a4f8aca41 100644 --- a/crates/deltalake-core/src/table/builder.rs +++ b/crates/deltalake-core/src/table/builder.rs @@ -12,9 +12,8 @@ use url::Url; use super::DeltaTable; use crate::errors::{DeltaResult, DeltaTableError}; use crate::logstore::default_logstore::DefaultLogStore; -use crate::logstore::LogStoreRef; +use crate::logstore::{LogStoreConfig, LogStoreRef}; use crate::storage::config::{self, StorageOptions}; -use crate::storage::DeltaObjectStore; #[allow(dead_code)] #[derive(Debug, thiserror::Error)] @@ -249,8 +248,13 @@ impl DeltaTableBuilder { match self.options.storage_backend { Some((storage, location)) => { let location = ensure_table_uri(location.as_str())?; - let storage = DeltaObjectStore::new(storage, location.clone()); - Ok(Arc::new(DefaultLogStore::new(Arc::new(storage), location))) + Ok(Arc::new(DefaultLogStore::new( + Arc::new(storage), + LogStoreConfig { + location, + options: HashMap::new().into(), + }, + ))) } None => { let location = ensure_table_uri(&self.options.table_uri)?; diff --git a/crates/deltalake-core/src/table/mod.rs b/crates/deltalake-core/src/table/mod.rs index e4110a29f8..cd0f1808f5 100644 --- a/crates/deltalake-core/src/table/mod.rs +++ b/crates/deltalake-core/src/table/mod.rs @@ -5,7 +5,6 @@ use std::convert::TryFrom; use std::fmt; use std::fmt::Formatter; use std::io::{BufRead, BufReader, Cursor}; -use std::sync::Arc; use std::{cmp::max, cmp::Ordering, collections::HashSet}; use chrono::{DateTime, Utc}; @@ -26,11 +25,13 @@ use crate::kernel::{ Action, Add, CommitInfo, DataType, Format, Metadata, ReaderFeatures, Remove, StructType, WriterFeatures, }; -use crate::logstore::{default_logstore::DefaultLogStore, LogStoreRef}; +use crate::logstore::LogStoreConfig; +use crate::logstore::LogStoreRef; use crate::partitions::PartitionFilter; use crate::protocol::{ find_latest_check_point_for_version, get_last_checkpoint, ProtocolError, Stats, }; +use crate::storage::config::configure_log_store; use crate::storage::{commit_uri_from_version, ObjectStoreRef}; pub mod builder; @@ -266,8 +267,7 @@ impl Serialize for DeltaTable { let mut seq = serializer.serialize_seq(None)?; seq.serialize_element(&self.state)?; seq.serialize_element(&self.config)?; - // tp::TODO (de)serialize actual log store instead - seq.serialize_element(self.object_store().as_ref())?; + seq.serialize_element(self.log_store.config())?; seq.serialize_element(&self.last_check_point)?; seq.serialize_element(&self.version_timestamp)?; seq.end() @@ -298,15 +298,12 @@ impl<'de> Deserialize<'de> for DeltaTable { let config = seq .next_element()? .ok_or_else(|| A::Error::invalid_length(0, &self))?; - // TODO twh; proper (de-)serialization - let storage = Arc::new( - seq.next_element()? - .ok_or_else(|| A::Error::invalid_length(0, &self))?, - ); - let log_store = Arc::new(DefaultLogStore::new( - Arc::clone(&storage), - storage.location().to_owned(), - )); + let storage_config: LogStoreConfig = seq + .next_element()? + .ok_or_else(|| A::Error::invalid_length(0, &self))?; + let log_store = + configure_log_store(storage_config.location, storage_config.options) + .map_err(|_| A::Error::custom("Failed deserializing LogStore"))?; let last_check_point = seq .next_element()? .ok_or_else(|| A::Error::invalid_length(0, &self))?; diff --git a/crates/deltalake-core/src/test_utils.rs b/crates/deltalake-core/src/test_utils.rs index 76b6c61820..ce121106c4 100644 --- a/crates/deltalake-core/src/test_utils.rs +++ b/crates/deltalake-core/src/test_utils.rs @@ -88,8 +88,7 @@ impl IntegrationContext { _ => DeltaTableBuilder::from_uri(store_uri) .with_allow_http(true) .build_storage()? - .object_store() - .storage_backend(), + .object_store(), }; Ok(Self { diff --git a/crates/deltalake-core/src/writer/json.rs b/crates/deltalake-core/src/writer/json.rs index 0239dee2f9..7fec11fad2 100644 --- a/crates/deltalake-core/src/writer/json.rs +++ b/crates/deltalake-core/src/writer/json.rs @@ -26,14 +26,14 @@ use crate::errors::DeltaTableError; use crate::kernel::{Add, StructType}; use crate::table::builder::DeltaTableBuilder; use crate::table::DeltaTableMetaData; +use crate::writer::utils::ShareableBuffer; use crate::DeltaTable; -use crate::{storage::DeltaObjectStore, writer::utils::ShareableBuffer}; type BadValue = (Value, ParquetError); /// Writes messages to a delta lake table. pub struct JsonWriter { - storage: Arc, + storage: Arc, arrow_schema_ref: Arc, writer_properties: WriterProperties, partition_columns: Vec, diff --git a/crates/deltalake-core/src/writer/record_batch.rs b/crates/deltalake-core/src/writer/record_batch.rs index 6e23313860..49b5dfebc9 100644 --- a/crates/deltalake-core/src/writer/record_batch.rs +++ b/crates/deltalake-core/src/writer/record_batch.rs @@ -29,11 +29,11 @@ use crate::errors::DeltaTableError; use crate::kernel::{Add, StructType}; use crate::table::builder::DeltaTableBuilder; use crate::table::DeltaTableMetaData; -use crate::{storage::DeltaObjectStore, DeltaTable}; +use crate::DeltaTable; /// Writes messages to a delta lake table. pub struct RecordBatchWriter { - storage: Arc, + storage: Arc, arrow_schema_ref: Arc, writer_properties: WriterProperties, partition_columns: Vec, diff --git a/crates/deltalake-core/tests/fs_common/mod.rs b/crates/deltalake-core/tests/fs_common/mod.rs index 4de3ff1546..73593f26b1 100644 --- a/crates/deltalake-core/tests/fs_common/mod.rs +++ b/crates/deltalake-core/tests/fs_common/mod.rs @@ -5,7 +5,8 @@ use deltalake_core::kernel::{ use deltalake_core::operations::create::CreateBuilder; use deltalake_core::operations::transaction::commit; use deltalake_core::protocol::{DeltaOperation, SaveMode}; -use deltalake_core::storage::{DeltaObjectStore, GetResult, ObjectStoreResult}; +use deltalake_core::storage::config::configure_store; +use deltalake_core::storage::{GetResult, ObjectStoreResult}; use deltalake_core::DeltaTable; use object_store::path::Path as StorePath; use object_store::ObjectStore; @@ -13,6 +14,7 @@ use serde_json::Value; use std::collections::HashMap; use std::fs; use std::path::Path; +use std::sync::Arc; use url::Url; use uuid::Uuid; @@ -133,7 +135,7 @@ pub async fn commit_actions( #[derive(Debug)] pub struct SlowStore { - inner: DeltaObjectStore, + inner: Arc, } impl std::fmt::Display for SlowStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -147,8 +149,9 @@ impl SlowStore { location: Url, options: impl Into + Clone, ) -> deltalake_core::DeltaResult { + let mut options = options.into(); Ok(Self { - inner: DeltaObjectStore::try_new(location, options).unwrap(), + inner: configure_store(&location, &mut options).unwrap(), }) } } diff --git a/crates/deltalake-core/tests/integration_checkpoint.rs b/crates/deltalake-core/tests/integration_checkpoint.rs index 219474765a..56d253eb85 100644 --- a/crates/deltalake-core/tests/integration_checkpoint.rs +++ b/crates/deltalake-core/tests/integration_checkpoint.rs @@ -59,16 +59,12 @@ async fn cleanup_metadata_hdfs_test() -> TestResult { // test to run longer but reliable async fn cleanup_metadata_test(context: &IntegrationContext) -> TestResult { let table_uri = context.root_uri(); - let object_store = DeltaTableBuilder::from_uri(table_uri) + let log_store = DeltaTableBuilder::from_uri(table_uri) .with_allow_http(true) - .build_storage()? - .object_store(); + .build_storage()?; + let object_store = log_store.object_store(); - let log_path = |version| { - object_store - .log_path() - .child(format!("{:020}.json", version)) - }; + let log_path = |version| log_store.log_path().child(format!("{:020}.json", version)); // we don't need to actually populate files with content as cleanup works only with file's metadata object_store @@ -99,7 +95,7 @@ async fn cleanup_metadata_test(context: &IntegrationContext) -> TestResult { assert!(retention_timestamp > v1time.timestamp_millis()); assert!(retention_timestamp < v2time.timestamp_millis()); - let removed = cleanup_expired_logs_for(3, object_store.as_ref(), retention_timestamp).await?; + let removed = cleanup_expired_logs_for(3, log_store.as_ref(), retention_timestamp).await?; assert_eq!(removed, 2); assert!(object_store.head(&log_path(0)).await.is_err()); @@ -143,7 +139,7 @@ async fn test_issue_1420_cleanup_expired_logs_for() -> DeltaResult<()> { // Should delete v1 but not v2 or v2.checkpoint.parquet cleanup_expired_logs_for( table.version(), - table.object_store().as_ref(), + table.log_store().as_ref(), ts.timestamp_millis(), ) .await?; @@ -185,7 +181,7 @@ async fn test_issue_1420_cleanup_expired_logs_for() -> DeltaResult<()> { cleanup_expired_logs_for( table.version(), - table.object_store().as_ref(), + table.log_store().as_ref(), ts.timestamp_millis(), ) .await?; diff --git a/crates/deltalake-core/tests/integration_datafusion.rs b/crates/deltalake-core/tests/integration_datafusion.rs index c9293f187e..7a9c38463f 100644 --- a/crates/deltalake-core/tests/integration_datafusion.rs +++ b/crates/deltalake-core/tests/integration_datafusion.rs @@ -34,11 +34,11 @@ use deltalake_core::delta_datafusion::{DeltaPhysicalCodec, DeltaScan}; use deltalake_core::kernel::{DataType, MapType, PrimitiveType, StructField, StructType}; use deltalake_core::operations::create::CreateBuilder; use deltalake_core::protocol::SaveMode; -use deltalake_core::storage::DeltaObjectStore; use deltalake_core::writer::{DeltaWriter, RecordBatchWriter}; use deltalake_core::{ open_table, operations::{write::WriteBuilder, DeltaOps}, + storage::config::configure_log_store, DeltaTable, DeltaTableError, }; use std::error::Error; @@ -227,12 +227,12 @@ mod local { .table_uri .clone(); let source_location = Url::parse(&source_uri).unwrap(); - let source_store = DeltaObjectStore::try_new(source_location, HashMap::new()).unwrap(); + let source_store = configure_log_store(source_location, HashMap::new()).unwrap(); let object_store_url = source_store.object_store_url(); let source_store_url: &Url = object_store_url.as_ref(); state .runtime_env() - .register_object_store(source_store_url, Arc::from(source_store)); + .register_object_store(source_store_url, source_store.object_store()); // Execute write to the target table with the proper state let target_table = WriteBuilder::new(target_table.log_store(), target_table.state.clone()) diff --git a/crates/deltalake-core/tests/repair_s3_rename_test.rs b/crates/deltalake-core/tests/repair_s3_rename_test.rs index 25eb9704ee..ecfba0fbbe 100644 --- a/crates/deltalake-core/tests/repair_s3_rename_test.rs +++ b/crates/deltalake-core/tests/repair_s3_rename_test.rs @@ -117,8 +117,7 @@ fn create_s3_backend( .with_allow_http(true) .build_storage() .unwrap() - .object_store() - .storage_backend(); + .object_store(); let delayed_store = DelayedObjectStore { inner: store,