From 9812bec7d2e109b8c1a3d357ed4b5111d95226c8 Mon Sep 17 00:00:00 2001 From: Robin Lin <128118209+RobinLin666@users.noreply.github.com> Date: Fri, 15 Mar 2024 16:39:57 +0800 Subject: [PATCH] fix: compatible to write to local file systems that do not support hard link (#1868) compatible to write to local file systems that do not support hard link. # Description When we write to the local file system, sometimes hard link is not supported, such as blobfuse, goofys, s3fs, so deal with it with compatibility. It is important to note that: There is another problem with blobfuse, that is, when it comes to rename, it will report errors. Because rename did not release the file handle before. See here for details: https://github.com/delta-io/delta-rs/issues/1765 Arrow-rs is required to cooperate with the modification, for example: https://github.com/GlareDB/arrow-rs/pull/2/files Because object_store has been upgraded to 0.8, there are a lot of breaking change, so I haven't changed this one for the time being. Will fix it after upgrading to 0.8 https://github.com/delta-io/delta-rs/issues/1858 # Related Issue(s) #1765 #1376 # Documentation --- crates/deltalake/Cargo.toml | 4 +- crates/deltalake/src/lib.rs | 2 + crates/mount/Cargo.toml | 43 ++++ crates/mount/src/config.rs | 80 +++++++ crates/mount/src/error.rs | 33 +++ crates/mount/src/file.rs | 349 ++++++++++++++++++++++++++++++ crates/mount/src/lib.rs | 102 +++++++++ crates/mount/tests/context.rs | 85 ++++++++ crates/mount/tests/integration.rs | 39 ++++ python/Cargo.toml | 2 +- python/src/lib.rs | 1 + python/tests/test_writer.py | 24 ++ 12 files changed, 762 insertions(+), 2 deletions(-) create mode 100644 crates/mount/Cargo.toml create mode 100644 crates/mount/src/config.rs create mode 100644 crates/mount/src/error.rs create mode 100644 crates/mount/src/file.rs create mode 100644 crates/mount/src/lib.rs create mode 100644 crates/mount/tests/context.rs create mode 100644 crates/mount/tests/integration.rs diff --git a/crates/deltalake/Cargo.toml b/crates/deltalake/Cargo.toml index c541cc1284..ba3e26cbf9 100644 --- a/crates/deltalake/Cargo.toml +++ b/crates/deltalake/Cargo.toml @@ -14,7 +14,7 @@ rust-version.workspace = true [package.metadata.docs.rs] # We cannot use all_features because TLS features are mutually exclusive. # We cannot use hdfs feature because it requires Java to be installed. -features = ["azure", "datafusion", "gcs", "hdfs", "json", "python", "s3", "unity-experimental"] +features = ["azure", "datafusion", "gcs", "hdfs", "json", "mount", "python", "s3", "unity-experimental"] [dependencies] deltalake-core = { version = "0.17.1", path = "../core" } @@ -22,6 +22,7 @@ deltalake-aws = { version = "0.1.0", path = "../aws", default-features = false, deltalake-azure = { version = "0.1.0", path = "../azure", optional = true } deltalake-gcp = { version = "0.1.0", path = "../gcp", optional = true } deltalake-catalog-glue = { version = "0.1.0", path = "../catalog-glue", optional = true } +deltalake-mount = { version = "0.1.0", path = "../mount", optional = true } [features] # All of these features are just reflected into the core crate until that @@ -34,6 +35,7 @@ gcs = ["deltalake-gcp"] glue = ["deltalake-catalog-glue"] hdfs = [] json = ["deltalake-core/json"] +mount = ["deltalake-mount"] python = ["deltalake-core/python"] s3-native-tls = ["deltalake-aws/native-tls"] s3 = ["deltalake-aws/rustls"] diff --git a/crates/deltalake/src/lib.rs b/crates/deltalake/src/lib.rs index 38dc5d52dc..c72a72e8bf 100644 --- a/crates/deltalake/src/lib.rs +++ b/crates/deltalake/src/lib.rs @@ -9,3 +9,5 @@ pub use deltalake_aws as aws; pub use deltalake_azure as azure; #[cfg(feature = "gcs")] pub use deltalake_gcp as gcp; +#[cfg(feature = "mount")] +pub use deltalake_mount as mount; diff --git a/crates/mount/Cargo.toml b/crates/mount/Cargo.toml new file mode 100644 index 0000000000..979a19592a --- /dev/null +++ b/crates/mount/Cargo.toml @@ -0,0 +1,43 @@ +[package] +name = "deltalake-mount" +version = "0.1.0" +authors.workspace = true +keywords.workspace = true +readme.workspace = true +edition.workspace = true +homepage.workspace = true +description.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true + +[dependencies] +deltalake-core = { version = "0.17.0", path = "../core", features = [ + "datafusion", +] } +lazy_static = "1" +errno = "0.3" + +# workspace depenndecies +async-trait = { workspace = true } +bytes = { workspace = true } +futures = { workspace = true } +tracing = { workspace = true } +object_store = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } +regex = { workspace = true } +url = { workspace = true } + +[dev-dependencies] +chrono = { workspace = true } +serial_test = "3" +deltalake-test = { path = "../test" } +pretty_env_logger = "0.5.0" +rand = "0.8" +serde_json = { workspace = true } +tempfile = "3" +fs_extra = "1.3.0" + +[features] +integration_test = [] diff --git a/crates/mount/src/config.rs b/crates/mount/src/config.rs new file mode 100644 index 0000000000..79dbfc88d0 --- /dev/null +++ b/crates/mount/src/config.rs @@ -0,0 +1,80 @@ +//! Auxiliary module for generating a valig Mount configuration. +use std::collections::{hash_map::Entry, HashMap}; +use std::str::FromStr; + +use crate::error::{Error, Result}; + +/// Typed property keys that can be defined on a mounted path +#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)] +#[non_exhaustive] +pub enum MountConfigKey { + /// If set to "true", allows creating commits without concurrent writer protection. + /// Only safe if there is one writer to a given table. + AllowUnsafeRename, +} + +impl AsRef for MountConfigKey { + fn as_ref(&self) -> &str { + match self { + Self::AllowUnsafeRename => "mount_allow_unsafe_rename", + } + } +} + +impl FromStr for MountConfigKey { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s { + "mount_allow_unsafe_rename" | "allow_unsafe_rename" => Ok(Self::AllowUnsafeRename), + _ => Err(Error::UnknownConfigKey(s.to_string())), + } + } +} + +/// Helper struct to create full configuration from passed options and environment +pub(crate) struct MountConfigHelper { + config: HashMap, + env_config: HashMap, +} + +impl MountConfigHelper { + /// Create a new [`ConfigHelper`] + pub fn try_new( + config: impl IntoIterator, impl Into)>, + ) -> Result { + let mut env_config = HashMap::new(); + for (os_key, os_value) in std::env::vars_os() { + if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) { + if key.starts_with("MOUNT_") { + if let Ok(config_key) = MountConfigKey::from_str(&key.to_ascii_lowercase()) { + env_config.insert(config_key, value.to_string()); + } + } + } + } + + Ok(Self { + config: config + .into_iter() + .map(|(key, value)| Ok((MountConfigKey::from_str(key.as_ref())?, value.into()))) + .collect::>()?, + env_config, + }) + } + + /// Generate a cofiguration augmented with options from the environment + pub fn build(mut self) -> Result> { + // Add keys from the environment to the configuration, as e.g. client configuration options. + // NOTE We have to specifically configure omitting keys, since workload identity can + // work purely using defaults, but partial config may be present in the environment. + // Preference of conflicting configs (e.g. msi resource id vs. client id is handled in object store) + for key in self.env_config.keys() { + if let Entry::Vacant(e) = self.config.entry(*key) { + e.insert(self.env_config.get(key).unwrap().to_owned()); + } + } + + Ok(self.config) + } +} diff --git a/crates/mount/src/error.rs b/crates/mount/src/error.rs new file mode 100644 index 0000000000..3693b0be07 --- /dev/null +++ b/crates/mount/src/error.rs @@ -0,0 +1,33 @@ +use deltalake_core::errors::DeltaTableError; + +pub(crate) type Result = std::result::Result; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[allow(dead_code)] + #[error("failed to parse config: {0}")] + Parse(String), + + /// Unknown configuration key + #[error("Unknown configuration key: {0}")] + UnknownConfigKey(String), + + #[error("The `allow_unsafe_rename` parameter must be specified")] + AllowUnsafeRenameNotSpecified, + + #[error(transparent)] + ObjectStore(#[from] object_store::Error), +} + +impl From for DeltaTableError { + fn from(e: Error) -> Self { + match e { + Error::Parse(msg) => DeltaTableError::Generic(msg), + Error::UnknownConfigKey(msg) => DeltaTableError::Generic(msg), + Error::AllowUnsafeRenameNotSpecified => DeltaTableError::Generic( + "The `allow_unsafe_rename` parameter must be specified".to_string(), + ), + Error::ObjectStore(e) => DeltaTableError::ObjectStore { source: e }, + } + } +} diff --git a/crates/mount/src/file.rs b/crates/mount/src/file.rs new file mode 100644 index 0000000000..02e5f82e6c --- /dev/null +++ b/crates/mount/src/file.rs @@ -0,0 +1,349 @@ +//! Mount file storage backend. This backend read and write objects from mounted filesystem. +//! +//! The mount file storage backend is not multi-writer safe. + +use bytes::Bytes; +use futures::stream::BoxStream; +use object_store::{ + local::LocalFileSystem, path::Path as ObjectStorePath, Error as ObjectStoreError, GetOptions, + GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, + Result as ObjectStoreResult, +}; +use std::ops::Range; +use std::sync::Arc; +use tokio::io::AsyncWrite; +use url::Url; + +pub(crate) const STORE_NAME: &str = "MountObjectStore"; + +/// Error raised by storage lock client +#[derive(thiserror::Error, Debug)] +#[allow(dead_code)] +pub enum LocalFileSystemError { + /// Object exists already at path + #[error("Object exists already at path: {} ({:?})", path, source)] + AlreadyExists { + /// Path of the already existing file + path: String, + /// Originating error + source: Box, + }, + + /// Object not found at the given path + #[error("Object not found at path: {} ({:?})", path, source)] + NotFound { + /// Provided path which does not exist + path: String, + /// Originating error + source: Box, + }, + + /// Invalid argument sent to OS call + #[error("Invalid argument in OS call for path: {} ({:?})", path, source)] + InvalidArgument { + /// Provided path + path: String, + /// Originating error + source: errno::Errno, + }, + + /// Null error for path for FFI + #[error("Null error in FFI for path: {} ({:?})", path, source)] + NullError { + /// Given path + path: String, + /// Originating error + source: std::ffi::NulError, + }, + + /// Generic catch-all error for this store + #[error("Generic error in store: {} ({:?})", store, source)] + Generic { + /// String name of the object store + store: &'static str, + /// Originating error + source: Box, + }, + + /// Errors from the Tokio runtime + #[error("Error executing async task for path: {} ({:?})", path, source)] + Tokio { + /// Path + path: String, + /// Originating error + source: tokio::task::JoinError, + }, +} + +impl From for ObjectStoreError { + fn from(e: LocalFileSystemError) -> Self { + match e { + LocalFileSystemError::AlreadyExists { path, source } => { + ObjectStoreError::AlreadyExists { path, source } + } + LocalFileSystemError::NotFound { path, source } => { + ObjectStoreError::NotFound { path, source } + } + LocalFileSystemError::InvalidArgument { source, .. } => ObjectStoreError::Generic { + store: STORE_NAME, + source: Box::new(source), + }, + LocalFileSystemError::NullError { source, .. } => ObjectStoreError::Generic { + store: STORE_NAME, + source: Box::new(source), + }, + LocalFileSystemError::Tokio { source, .. } => ObjectStoreError::Generic { + store: STORE_NAME, + source: Box::new(source), + }, + LocalFileSystemError::Generic { store, source } => { + ObjectStoreError::Generic { store, source } + } + } + } +} + +/// Mount File Storage Backend. +/// Note that it's non-atomic writing and may leave the filesystem in an inconsistent state if it fails. +#[derive(Debug)] +pub struct MountFileStorageBackend { + inner: Arc, + root_url: Arc, +} + +impl MountFileStorageBackend { + /// Creates a new MountFileStorageBackend. + pub fn try_new(path: impl AsRef) -> ObjectStoreResult { + Ok(Self { + root_url: Arc::new(Self::path_to_root_url(path.as_ref())?), + inner: Arc::new(LocalFileSystem::new_with_prefix(path)?), + }) + } + + fn path_to_root_url(path: &std::path::Path) -> ObjectStoreResult { + let root_path = + std::fs::canonicalize(path).map_err(|e| object_store::Error::InvalidPath { + source: object_store::path::Error::Canonicalize { + path: path.into(), + source: e, + }, + })?; + + Url::from_file_path(root_path).map_err(|_| object_store::Error::InvalidPath { + source: object_store::path::Error::InvalidPath { path: path.into() }, + }) + } + + /// Return an absolute filesystem path of the given location + fn path_to_filesystem(&self, location: &ObjectStorePath) -> String { + let mut url = self.root_url.as_ref().clone(); + url.path_segments_mut() + .expect("url path") + // technically not necessary as Path ignores empty segments + // but avoids creating paths with "//" which look odd in error messages. + .pop_if_empty() + .extend(location.parts()); + + url.to_file_path().unwrap().to_str().unwrap().to_owned() + } +} + +impl std::fmt::Display for MountFileStorageBackend { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "MountFileStorageBackend") + } +} + +#[async_trait::async_trait] +impl ObjectStore for MountFileStorageBackend { + async fn put(&self, location: &ObjectStorePath, bytes: Bytes) -> ObjectStoreResult { + self.inner.put(location, bytes).await + } + + async fn put_opts( + &self, + location: &ObjectStorePath, + bytes: Bytes, + options: PutOptions, + ) -> ObjectStoreResult { + self.inner.put_opts(location, bytes, options).await + } + + async fn get(&self, location: &ObjectStorePath) -> ObjectStoreResult { + self.inner.get(location).await + } + + async fn get_opts( + &self, + location: &ObjectStorePath, + options: GetOptions, + ) -> ObjectStoreResult { + self.inner.get_opts(location, options).await + } + + async fn get_range( + &self, + location: &ObjectStorePath, + range: Range, + ) -> ObjectStoreResult { + self.inner.get_range(location, range).await + } + + async fn head(&self, location: &ObjectStorePath) -> ObjectStoreResult { + self.inner.head(location).await + } + + async fn delete(&self, location: &ObjectStorePath) -> ObjectStoreResult<()> { + self.inner.delete(location).await + } + + fn list( + &self, + prefix: Option<&ObjectStorePath>, + ) -> BoxStream<'_, ObjectStoreResult> { + self.inner.list(prefix) + } + + fn list_with_offset( + &self, + prefix: Option<&ObjectStorePath>, + offset: &ObjectStorePath, + ) -> BoxStream<'_, ObjectStoreResult> { + self.inner.list_with_offset(prefix, offset) + } + + async fn list_with_delimiter( + &self, + prefix: Option<&ObjectStorePath>, + ) -> ObjectStoreResult { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &ObjectStorePath, to: &ObjectStorePath) -> ObjectStoreResult<()> { + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists( + &self, + from: &ObjectStorePath, + to: &ObjectStorePath, + ) -> ObjectStoreResult<()> { + self.inner.copy_if_not_exists(from, to).await + } + + async fn rename_if_not_exists( + &self, + from: &ObjectStorePath, + to: &ObjectStorePath, + ) -> ObjectStoreResult<()> { + let path_from = self.path_to_filesystem(from); + let path_to = self.path_to_filesystem(to); + Ok(regular_rename(path_from.as_ref(), path_to.as_ref()).await?) + } + + async fn put_multipart( + &self, + location: &ObjectStorePath, + ) -> ObjectStoreResult<(MultipartId, Box)> { + self.inner.put_multipart(location).await + } + + async fn abort_multipart( + &self, + location: &ObjectStorePath, + multipart_id: &MultipartId, + ) -> ObjectStoreResult<()> { + self.inner.abort_multipart(location, multipart_id).await + } +} + +/// Regular renames `from` to `to`. +/// `from` has to exist, but `to` is not, otherwise the operation will fail. +/// It's not atomic and cannot be called in parallel with other operations on the same file. +#[inline] +async fn regular_rename(from: &str, to: &str) -> Result<(), LocalFileSystemError> { + let from_path = String::from(from); + let to_path = String::from(to); + + tokio::task::spawn_blocking(move || { + if std::fs::metadata(&to_path).is_ok() { + Err(LocalFileSystemError::AlreadyExists { + path: to_path, + source: Box::new(std::io::Error::new( + std::io::ErrorKind::AlreadyExists, + "Already exists", + )), + }) + } else { + std::fs::rename(&from_path, &to_path).map_err(|err| { + if err.kind() == std::io::ErrorKind::NotFound { + LocalFileSystemError::NotFound { + path: from_path.clone(), + source: Box::new(err), + } + } else { + LocalFileSystemError::Generic { + store: STORE_NAME, + source: Box::new(err), + } + } + })?; + Ok(()) + } + }) + .await + .unwrap() +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs::File; + use std::io::Write; + use std::path::{Path, PathBuf}; + + #[tokio::test] + async fn test_regular_rename() { + let tmp_dir = tempfile::tempdir().unwrap(); + let a = create_file(tmp_dir.path(), "a"); + let b = create_file(tmp_dir.path(), "b"); + let c = &tmp_dir.path().join("c"); + + // unsuccessful move not_exists to C, not_exists is missing + let result = regular_rename("not_exists", c.to_str().unwrap()).await; + assert!(matches!( + result.expect_err("nonexistent should fail"), + LocalFileSystemError::NotFound { .. } + )); + + // successful move A to C + assert!(a.exists()); + assert!(!c.exists()); + match regular_rename(a.to_str().unwrap(), c.to_str().unwrap()).await { + Err(LocalFileSystemError::InvalidArgument {source, ..}) => + panic!("expected success, got: {source:?}. Note: atomically renaming Windows files from WSL2 is not supported."), + Err(e) => panic!("expected success, got: {e:?}"), + _ => {} + } + assert!(!a.exists()); + assert!(c.exists()); + + // unsuccessful move B to C, C already exists, B is not deleted + assert!(b.exists()); + match regular_rename(b.to_str().unwrap(), c.to_str().unwrap()).await { + Err(LocalFileSystemError::AlreadyExists { path, .. }) => { + assert_eq!(path, c.to_str().unwrap()) + } + _ => panic!("unexpected"), + } + assert!(b.exists()); + assert_eq!(std::fs::read_to_string(c).unwrap(), "a"); + } + + fn create_file(dir: &Path, name: &str) -> PathBuf { + let path = dir.join(name); + let mut file = File::create(&path).unwrap(); + file.write_all(name.as_bytes()).unwrap(); + path + } +} diff --git a/crates/mount/src/lib.rs b/crates/mount/src/lib.rs new file mode 100644 index 0000000000..2decb92b6c --- /dev/null +++ b/crates/mount/src/lib.rs @@ -0,0 +1,102 @@ +use std::collections::HashMap; +use std::str::FromStr; +use std::sync::Arc; + +use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory}; +use deltalake_core::storage::{ + factories, str_is_truthy, ObjectStoreFactory, ObjectStoreRef, StorageOptions, +}; +use deltalake_core::{DeltaResult, DeltaTableError, Path}; +use object_store::local::LocalFileSystem; +use url::Url; + +mod config; +pub mod error; +mod file; + +trait MountOptions { + fn as_mount_options(&self) -> HashMap; +} + +impl MountOptions for StorageOptions { + fn as_mount_options(&self) -> HashMap { + self.0 + .iter() + .filter_map(|(key, value)| { + Some(( + config::MountConfigKey::from_str(&key.to_ascii_lowercase()).ok()?, + value.clone(), + )) + }) + .collect() + } +} + +#[derive(Clone, Default, Debug)] +pub struct MountFactory {} + +impl ObjectStoreFactory for MountFactory { + fn parse_url_opts( + &self, + url: &Url, + options: &StorageOptions, + ) -> DeltaResult<(ObjectStoreRef, Path)> { + let config = config::MountConfigHelper::try_new(options.as_mount_options())?.build()?; + + let allow_unsafe_rename = str_is_truthy( + config + .get(&config::MountConfigKey::AllowUnsafeRename) + .unwrap_or(&String::new()), + ); + + match url.scheme() { + "dbfs" => { + if !allow_unsafe_rename { + // Just let the user know that they need to set the allow_unsafe_rename option + return Err(error::Error::AllowUnsafeRenameNotSpecified.into()); + } + // We need to convert the dbfs url to a file url + let new_url = Url::parse(&format!("file:///dbfs{}", url.path())).unwrap(); + let store = Arc::new(file::MountFileStorageBackend::try_new( + new_url.to_file_path().unwrap(), + )?) as ObjectStoreRef; + Ok((store, Path::from("/"))) + } + "file" => { + if allow_unsafe_rename { + let store = Arc::new(file::MountFileStorageBackend::try_new( + url.to_file_path().unwrap(), + )?) as ObjectStoreRef; + Ok((store, Path::from("/"))) + } else { + let store = Arc::new(LocalFileSystem::new_with_prefix( + url.to_file_path().unwrap(), + )?) as ObjectStoreRef; + Ok((store, Path::from("/"))) + } + } + _ => Err(DeltaTableError::InvalidTableLocation(url.clone().into())), + } + } +} + +impl LogStoreFactory for MountFactory { + fn with_options( + &self, + store: ObjectStoreRef, + location: &Url, + options: &StorageOptions, + ) -> DeltaResult> { + Ok(default_logstore(store, location, options)) + } +} + +/// Register an [ObjectStoreFactory] for common Mount [Url] schemes +pub fn register_handlers(_additional_prefixes: Option) { + let factory = Arc::new(MountFactory {}); + for scheme in ["dbfs", "file"].iter() { + let url = Url::parse(&format!("{}://", scheme)).unwrap(); + factories().insert(url.clone(), factory.clone()); + logstores().insert(url.clone(), factory.clone()); + } +} diff --git a/crates/mount/tests/context.rs b/crates/mount/tests/context.rs new file mode 100644 index 0000000000..d7977b36de --- /dev/null +++ b/crates/mount/tests/context.rs @@ -0,0 +1,85 @@ +use deltalake_mount::register_handlers; +use deltalake_test::utils::{set_env_if_not_set, StorageIntegration}; +use fs_extra::dir::{copy, CopyOptions}; +use std::process::ExitStatus; +use tempfile::{tempdir, TempDir}; + +pub struct MountIntegration { + tmp_dir: TempDir, +} + +impl Default for MountIntegration { + fn default() -> Self { + register_handlers(None); + Self { + tmp_dir: tempdir().expect("Failed to make temp dir"), + } + } +} + +impl StorageIntegration for MountIntegration { + fn create_bucket(&self) -> std::io::Result { + Ok(ExitStatus::default()) + } + + fn prepare_env(&self) { + set_env_if_not_set("MOUNT_ALLOW_UNSAFE_RENAME", "true"); + } + fn bucket_name(&self) -> String { + self.tmp_dir.as_ref().to_str().unwrap().to_owned() + } + fn root_uri(&self) -> String { + format!("file://{}", self.bucket_name()) + } + fn copy_directory(&self, source: &str, destination: &str) -> std::io::Result { + let mut options = CopyOptions::new(); + options.content_only = true; + let dest_path = self.tmp_dir.path().join(destination); + std::fs::create_dir_all(&dest_path)?; + copy(source, &dest_path, &options).expect("Failed to copy"); + Ok(ExitStatus::default()) + } +} + +pub struct DbfsIntegration { + tmp_dir: TempDir, +} + +impl Default for DbfsIntegration { + fn default() -> Self { + register_handlers(None); + Self { + tmp_dir: tempdir().expect("Failed to make temp dir"), + } + } +} + +impl StorageIntegration for DbfsIntegration { + fn create_bucket(&self) -> std::io::Result { + Ok(ExitStatus::default()) + } + + fn prepare_env(&self) { + set_env_if_not_set("MOUNT_ALLOW_UNSAFE_RENAME", "true"); + std::fs::create_dir_all(format!("/dbfs{}", self.tmp_dir.as_ref().to_str().unwrap())) + .expect("Failed to create dir"); + } + fn bucket_name(&self) -> String { + self.tmp_dir.as_ref().to_str().unwrap().to_owned() + } + fn root_uri(&self) -> String { + format!("dbfs:{}", self.bucket_name()) + } + fn copy_directory(&self, source: &str, destination: &str) -> std::io::Result { + let mut options = CopyOptions::new(); + options.content_only = true; + let dest_path = format!( + "/dbfs{}/{}", + self.tmp_dir.as_ref().to_str().unwrap(), + destination + ); + std::fs::create_dir_all(&dest_path)?; + copy(source, &dest_path, &options).expect("Failed to copy"); + Ok(ExitStatus::default()) + } +} diff --git a/crates/mount/tests/integration.rs b/crates/mount/tests/integration.rs new file mode 100644 index 0000000000..14fcbcdc95 --- /dev/null +++ b/crates/mount/tests/integration.rs @@ -0,0 +1,39 @@ +#![cfg(feature = "integration_test")] + +use deltalake_test::read::read_table_paths; +use deltalake_test::{test_read_tables, IntegrationContext, TestResult}; +use serial_test::serial; + +mod context; +use context::*; + +static TEST_PREFIXES: &[&str] = &["my table", "你好/😊"]; + +#[tokio::test] +#[serial] +async fn test_integration_local() -> TestResult { + let context = IntegrationContext::new(Box::::default())?; + + test_read_tables(&context).await?; + + for prefix in TEST_PREFIXES { + read_table_paths(&context, prefix, prefix).await?; + } + + Ok(()) +} + +#[tokio::test] +#[serial] +#[ignore = "The DBFS tests currently hang due to CI pipeline cannot write to /dbfs"] +async fn test_integration_dbfs() -> TestResult { + let context = IntegrationContext::new(Box::::default())?; + + test_read_tables(&context).await?; + + for prefix in TEST_PREFIXES { + read_table_paths(&context, prefix, prefix).await?; + } + + Ok(()) +} diff --git a/python/Cargo.toml b/python/Cargo.toml index c7fa5ca2b1..486f37d695 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -45,7 +45,7 @@ features = ["extension-module", "abi3", "abi3-py38"] [dependencies.deltalake] path = "../crates/deltalake" version = "0" -features = ["azure", "gcs", "python", "datafusion", "unity-experimental"] +features = ["azure", "gcs", "python", "datafusion", "unity-experimental", "mount"] [features] default = ["rustls"] diff --git a/python/src/lib.rs b/python/src/lib.rs index 03326a1671..445ef921ea 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1674,6 +1674,7 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> { deltalake::aws::register_handlers(None); deltalake::azure::register_handlers(None); deltalake::gcp::register_handlers(None); + deltalake::mount::register_handlers(None); m.add("DeltaError", py.get_type::())?; m.add("CommitFailedError", py.get_type::())?; diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 42f0cd825e..dfd124a73d 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -322,6 +322,30 @@ def test_local_path( assert table == sample_data +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_local_path_with_unsafe_rename( + tmp_path: pathlib.Path, + sample_data: pa.Table, + monkeypatch, + engine: Literal["pyarrow", "rust"], +): + monkeypatch.chdir(tmp_path) # Make tmp_path the working directory + (tmp_path / "path/to/table").mkdir(parents=True) + + local_path = "./path/to/table" + storage_opts = { + "allow_unsafe_rename": "true", + } + write_deltalake( + local_path, sample_data, storage_options=storage_opts, engine=engine + ) + delta_table = DeltaTable(local_path, storage_options=storage_opts) + assert delta_table.schema().to_pyarrow() == sample_data.schema + + table = delta_table.to_pyarrow_table() + assert table == sample_data + + @pytest.mark.parametrize("engine", ["pyarrow", "rust"]) def test_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table, engine): write_deltalake(