From 9dc7cccd4fdf3e081426e27d7be8159cea550c7d Mon Sep 17 00:00:00 2001 From: Yi Lin Date: Mon, 11 Mar 2024 11:10:12 +0800 Subject: [PATCH 1/5] feat: support deltalake-mount for writing the mounted path --- crates/deltalake/Cargo.toml | 2 + crates/mount/Cargo.toml | 43 ++++ crates/mount/src/config.rs | 80 +++++++ crates/mount/src/error.rs | 33 +++ crates/mount/src/file.rs | 349 ++++++++++++++++++++++++++++++ crates/mount/src/lib.rs | 103 +++++++++ crates/mount/tests/context.rs | 79 +++++++ crates/mount/tests/integration.rs | 38 ++++ 8 files changed, 727 insertions(+) create mode 100644 crates/mount/Cargo.toml create mode 100644 crates/mount/src/config.rs create mode 100644 crates/mount/src/error.rs create mode 100644 crates/mount/src/file.rs create mode 100644 crates/mount/src/lib.rs create mode 100644 crates/mount/tests/context.rs create mode 100644 crates/mount/tests/integration.rs diff --git a/crates/deltalake/Cargo.toml b/crates/deltalake/Cargo.toml index c541cc1284..92dcdfab9d 100644 --- a/crates/deltalake/Cargo.toml +++ b/crates/deltalake/Cargo.toml @@ -22,6 +22,7 @@ deltalake-aws = { version = "0.1.0", path = "../aws", default-features = false, deltalake-azure = { version = "0.1.0", path = "../azure", optional = true } deltalake-gcp = { version = "0.1.0", path = "../gcp", optional = true } deltalake-catalog-glue = { version = "0.1.0", path = "../catalog-glue", optional = true } +deltalake-mount = { version = "0.1.0", path = "../mount", optional = true } [features] # All of these features are just reflected into the core crate until that @@ -38,6 +39,7 @@ python = ["deltalake-core/python"] s3-native-tls = ["deltalake-aws/native-tls"] s3 = ["deltalake-aws/rustls"] unity-experimental = ["deltalake-core/unity-experimental"] +mount = ["deltalake-mount"] [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } diff --git a/crates/mount/Cargo.toml b/crates/mount/Cargo.toml new file mode 100644 index 0000000000..979a19592a --- /dev/null +++ b/crates/mount/Cargo.toml @@ -0,0 +1,43 @@ +[package] +name = "deltalake-mount" +version = "0.1.0" +authors.workspace = true +keywords.workspace = true +readme.workspace = true +edition.workspace = true +homepage.workspace = true +description.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true + +[dependencies] +deltalake-core = { version = "0.17.0", path = "../core", features = [ + "datafusion", +] } +lazy_static = "1" +errno = "0.3" + +# workspace depenndecies +async-trait = { workspace = true } +bytes = { workspace = true } +futures = { workspace = true } +tracing = { workspace = true } +object_store = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } +regex = { workspace = true } +url = { workspace = true } + +[dev-dependencies] +chrono = { workspace = true } +serial_test = "3" +deltalake-test = { path = "../test" } +pretty_env_logger = "0.5.0" +rand = "0.8" +serde_json = { workspace = true } +tempfile = "3" +fs_extra = "1.3.0" + +[features] +integration_test = [] diff --git a/crates/mount/src/config.rs b/crates/mount/src/config.rs new file mode 100644 index 0000000000..79dbfc88d0 --- /dev/null +++ b/crates/mount/src/config.rs @@ -0,0 +1,80 @@ +//! Auxiliary module for generating a valig Mount configuration. +use std::collections::{hash_map::Entry, HashMap}; +use std::str::FromStr; + +use crate::error::{Error, Result}; + +/// Typed property keys that can be defined on a mounted path +#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)] +#[non_exhaustive] +pub enum MountConfigKey { + /// If set to "true", allows creating commits without concurrent writer protection. + /// Only safe if there is one writer to a given table. + AllowUnsafeRename, +} + +impl AsRef for MountConfigKey { + fn as_ref(&self) -> &str { + match self { + Self::AllowUnsafeRename => "mount_allow_unsafe_rename", + } + } +} + +impl FromStr for MountConfigKey { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s { + "mount_allow_unsafe_rename" | "allow_unsafe_rename" => Ok(Self::AllowUnsafeRename), + _ => Err(Error::UnknownConfigKey(s.to_string())), + } + } +} + +/// Helper struct to create full configuration from passed options and environment +pub(crate) struct MountConfigHelper { + config: HashMap, + env_config: HashMap, +} + +impl MountConfigHelper { + /// Create a new [`ConfigHelper`] + pub fn try_new( + config: impl IntoIterator, impl Into)>, + ) -> Result { + let mut env_config = HashMap::new(); + for (os_key, os_value) in std::env::vars_os() { + if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) { + if key.starts_with("MOUNT_") { + if let Ok(config_key) = MountConfigKey::from_str(&key.to_ascii_lowercase()) { + env_config.insert(config_key, value.to_string()); + } + } + } + } + + Ok(Self { + config: config + .into_iter() + .map(|(key, value)| Ok((MountConfigKey::from_str(key.as_ref())?, value.into()))) + .collect::>()?, + env_config, + }) + } + + /// Generate a cofiguration augmented with options from the environment + pub fn build(mut self) -> Result> { + // Add keys from the environment to the configuration, as e.g. client configuration options. + // NOTE We have to specifically configure omitting keys, since workload identity can + // work purely using defaults, but partial config may be present in the environment. + // Preference of conflicting configs (e.g. msi resource id vs. client id is handled in object store) + for key in self.env_config.keys() { + if let Entry::Vacant(e) = self.config.entry(*key) { + e.insert(self.env_config.get(key).unwrap().to_owned()); + } + } + + Ok(self.config) + } +} diff --git a/crates/mount/src/error.rs b/crates/mount/src/error.rs new file mode 100644 index 0000000000..3693b0be07 --- /dev/null +++ b/crates/mount/src/error.rs @@ -0,0 +1,33 @@ +use deltalake_core::errors::DeltaTableError; + +pub(crate) type Result = std::result::Result; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[allow(dead_code)] + #[error("failed to parse config: {0}")] + Parse(String), + + /// Unknown configuration key + #[error("Unknown configuration key: {0}")] + UnknownConfigKey(String), + + #[error("The `allow_unsafe_rename` parameter must be specified")] + AllowUnsafeRenameNotSpecified, + + #[error(transparent)] + ObjectStore(#[from] object_store::Error), +} + +impl From for DeltaTableError { + fn from(e: Error) -> Self { + match e { + Error::Parse(msg) => DeltaTableError::Generic(msg), + Error::UnknownConfigKey(msg) => DeltaTableError::Generic(msg), + Error::AllowUnsafeRenameNotSpecified => DeltaTableError::Generic( + "The `allow_unsafe_rename` parameter must be specified".to_string(), + ), + Error::ObjectStore(e) => DeltaTableError::ObjectStore { source: e }, + } + } +} diff --git a/crates/mount/src/file.rs b/crates/mount/src/file.rs new file mode 100644 index 0000000000..02e5f82e6c --- /dev/null +++ b/crates/mount/src/file.rs @@ -0,0 +1,349 @@ +//! Mount file storage backend. This backend read and write objects from mounted filesystem. +//! +//! The mount file storage backend is not multi-writer safe. + +use bytes::Bytes; +use futures::stream::BoxStream; +use object_store::{ + local::LocalFileSystem, path::Path as ObjectStorePath, Error as ObjectStoreError, GetOptions, + GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, + Result as ObjectStoreResult, +}; +use std::ops::Range; +use std::sync::Arc; +use tokio::io::AsyncWrite; +use url::Url; + +pub(crate) const STORE_NAME: &str = "MountObjectStore"; + +/// Error raised by storage lock client +#[derive(thiserror::Error, Debug)] +#[allow(dead_code)] +pub enum LocalFileSystemError { + /// Object exists already at path + #[error("Object exists already at path: {} ({:?})", path, source)] + AlreadyExists { + /// Path of the already existing file + path: String, + /// Originating error + source: Box, + }, + + /// Object not found at the given path + #[error("Object not found at path: {} ({:?})", path, source)] + NotFound { + /// Provided path which does not exist + path: String, + /// Originating error + source: Box, + }, + + /// Invalid argument sent to OS call + #[error("Invalid argument in OS call for path: {} ({:?})", path, source)] + InvalidArgument { + /// Provided path + path: String, + /// Originating error + source: errno::Errno, + }, + + /// Null error for path for FFI + #[error("Null error in FFI for path: {} ({:?})", path, source)] + NullError { + /// Given path + path: String, + /// Originating error + source: std::ffi::NulError, + }, + + /// Generic catch-all error for this store + #[error("Generic error in store: {} ({:?})", store, source)] + Generic { + /// String name of the object store + store: &'static str, + /// Originating error + source: Box, + }, + + /// Errors from the Tokio runtime + #[error("Error executing async task for path: {} ({:?})", path, source)] + Tokio { + /// Path + path: String, + /// Originating error + source: tokio::task::JoinError, + }, +} + +impl From for ObjectStoreError { + fn from(e: LocalFileSystemError) -> Self { + match e { + LocalFileSystemError::AlreadyExists { path, source } => { + ObjectStoreError::AlreadyExists { path, source } + } + LocalFileSystemError::NotFound { path, source } => { + ObjectStoreError::NotFound { path, source } + } + LocalFileSystemError::InvalidArgument { source, .. } => ObjectStoreError::Generic { + store: STORE_NAME, + source: Box::new(source), + }, + LocalFileSystemError::NullError { source, .. } => ObjectStoreError::Generic { + store: STORE_NAME, + source: Box::new(source), + }, + LocalFileSystemError::Tokio { source, .. } => ObjectStoreError::Generic { + store: STORE_NAME, + source: Box::new(source), + }, + LocalFileSystemError::Generic { store, source } => { + ObjectStoreError::Generic { store, source } + } + } + } +} + +/// Mount File Storage Backend. +/// Note that it's non-atomic writing and may leave the filesystem in an inconsistent state if it fails. +#[derive(Debug)] +pub struct MountFileStorageBackend { + inner: Arc, + root_url: Arc, +} + +impl MountFileStorageBackend { + /// Creates a new MountFileStorageBackend. + pub fn try_new(path: impl AsRef) -> ObjectStoreResult { + Ok(Self { + root_url: Arc::new(Self::path_to_root_url(path.as_ref())?), + inner: Arc::new(LocalFileSystem::new_with_prefix(path)?), + }) + } + + fn path_to_root_url(path: &std::path::Path) -> ObjectStoreResult { + let root_path = + std::fs::canonicalize(path).map_err(|e| object_store::Error::InvalidPath { + source: object_store::path::Error::Canonicalize { + path: path.into(), + source: e, + }, + })?; + + Url::from_file_path(root_path).map_err(|_| object_store::Error::InvalidPath { + source: object_store::path::Error::InvalidPath { path: path.into() }, + }) + } + + /// Return an absolute filesystem path of the given location + fn path_to_filesystem(&self, location: &ObjectStorePath) -> String { + let mut url = self.root_url.as_ref().clone(); + url.path_segments_mut() + .expect("url path") + // technically not necessary as Path ignores empty segments + // but avoids creating paths with "//" which look odd in error messages. + .pop_if_empty() + .extend(location.parts()); + + url.to_file_path().unwrap().to_str().unwrap().to_owned() + } +} + +impl std::fmt::Display for MountFileStorageBackend { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "MountFileStorageBackend") + } +} + +#[async_trait::async_trait] +impl ObjectStore for MountFileStorageBackend { + async fn put(&self, location: &ObjectStorePath, bytes: Bytes) -> ObjectStoreResult { + self.inner.put(location, bytes).await + } + + async fn put_opts( + &self, + location: &ObjectStorePath, + bytes: Bytes, + options: PutOptions, + ) -> ObjectStoreResult { + self.inner.put_opts(location, bytes, options).await + } + + async fn get(&self, location: &ObjectStorePath) -> ObjectStoreResult { + self.inner.get(location).await + } + + async fn get_opts( + &self, + location: &ObjectStorePath, + options: GetOptions, + ) -> ObjectStoreResult { + self.inner.get_opts(location, options).await + } + + async fn get_range( + &self, + location: &ObjectStorePath, + range: Range, + ) -> ObjectStoreResult { + self.inner.get_range(location, range).await + } + + async fn head(&self, location: &ObjectStorePath) -> ObjectStoreResult { + self.inner.head(location).await + } + + async fn delete(&self, location: &ObjectStorePath) -> ObjectStoreResult<()> { + self.inner.delete(location).await + } + + fn list( + &self, + prefix: Option<&ObjectStorePath>, + ) -> BoxStream<'_, ObjectStoreResult> { + self.inner.list(prefix) + } + + fn list_with_offset( + &self, + prefix: Option<&ObjectStorePath>, + offset: &ObjectStorePath, + ) -> BoxStream<'_, ObjectStoreResult> { + self.inner.list_with_offset(prefix, offset) + } + + async fn list_with_delimiter( + &self, + prefix: Option<&ObjectStorePath>, + ) -> ObjectStoreResult { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &ObjectStorePath, to: &ObjectStorePath) -> ObjectStoreResult<()> { + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists( + &self, + from: &ObjectStorePath, + to: &ObjectStorePath, + ) -> ObjectStoreResult<()> { + self.inner.copy_if_not_exists(from, to).await + } + + async fn rename_if_not_exists( + &self, + from: &ObjectStorePath, + to: &ObjectStorePath, + ) -> ObjectStoreResult<()> { + let path_from = self.path_to_filesystem(from); + let path_to = self.path_to_filesystem(to); + Ok(regular_rename(path_from.as_ref(), path_to.as_ref()).await?) + } + + async fn put_multipart( + &self, + location: &ObjectStorePath, + ) -> ObjectStoreResult<(MultipartId, Box)> { + self.inner.put_multipart(location).await + } + + async fn abort_multipart( + &self, + location: &ObjectStorePath, + multipart_id: &MultipartId, + ) -> ObjectStoreResult<()> { + self.inner.abort_multipart(location, multipart_id).await + } +} + +/// Regular renames `from` to `to`. +/// `from` has to exist, but `to` is not, otherwise the operation will fail. +/// It's not atomic and cannot be called in parallel with other operations on the same file. +#[inline] +async fn regular_rename(from: &str, to: &str) -> Result<(), LocalFileSystemError> { + let from_path = String::from(from); + let to_path = String::from(to); + + tokio::task::spawn_blocking(move || { + if std::fs::metadata(&to_path).is_ok() { + Err(LocalFileSystemError::AlreadyExists { + path: to_path, + source: Box::new(std::io::Error::new( + std::io::ErrorKind::AlreadyExists, + "Already exists", + )), + }) + } else { + std::fs::rename(&from_path, &to_path).map_err(|err| { + if err.kind() == std::io::ErrorKind::NotFound { + LocalFileSystemError::NotFound { + path: from_path.clone(), + source: Box::new(err), + } + } else { + LocalFileSystemError::Generic { + store: STORE_NAME, + source: Box::new(err), + } + } + })?; + Ok(()) + } + }) + .await + .unwrap() +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs::File; + use std::io::Write; + use std::path::{Path, PathBuf}; + + #[tokio::test] + async fn test_regular_rename() { + let tmp_dir = tempfile::tempdir().unwrap(); + let a = create_file(tmp_dir.path(), "a"); + let b = create_file(tmp_dir.path(), "b"); + let c = &tmp_dir.path().join("c"); + + // unsuccessful move not_exists to C, not_exists is missing + let result = regular_rename("not_exists", c.to_str().unwrap()).await; + assert!(matches!( + result.expect_err("nonexistent should fail"), + LocalFileSystemError::NotFound { .. } + )); + + // successful move A to C + assert!(a.exists()); + assert!(!c.exists()); + match regular_rename(a.to_str().unwrap(), c.to_str().unwrap()).await { + Err(LocalFileSystemError::InvalidArgument {source, ..}) => + panic!("expected success, got: {source:?}. Note: atomically renaming Windows files from WSL2 is not supported."), + Err(e) => panic!("expected success, got: {e:?}"), + _ => {} + } + assert!(!a.exists()); + assert!(c.exists()); + + // unsuccessful move B to C, C already exists, B is not deleted + assert!(b.exists()); + match regular_rename(b.to_str().unwrap(), c.to_str().unwrap()).await { + Err(LocalFileSystemError::AlreadyExists { path, .. }) => { + assert_eq!(path, c.to_str().unwrap()) + } + _ => panic!("unexpected"), + } + assert!(b.exists()); + assert_eq!(std::fs::read_to_string(c).unwrap(), "a"); + } + + fn create_file(dir: &Path, name: &str) -> PathBuf { + let path = dir.join(name); + let mut file = File::create(&path).unwrap(); + file.write_all(name.as_bytes()).unwrap(); + path + } +} diff --git a/crates/mount/src/lib.rs b/crates/mount/src/lib.rs new file mode 100644 index 0000000000..10c3596ce8 --- /dev/null +++ b/crates/mount/src/lib.rs @@ -0,0 +1,103 @@ +use std::collections::HashMap; +use std::str::FromStr; +use std::sync::Arc; + +use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory}; +use deltalake_core::storage::{ + factories, str_is_truthy, ObjectStoreFactory, ObjectStoreRef, StorageOptions, +}; +use deltalake_core::{DeltaResult, DeltaTableError, Path}; +use object_store::local::LocalFileSystem; +use url::Url; + +mod config; +pub mod error; +mod file; + +trait MountOptions { + fn as_mount_options(&self) -> HashMap; +} + +impl MountOptions for StorageOptions { + fn as_mount_options(&self) -> HashMap { + self.0 + .iter() + .filter_map(|(key, value)| { + Some(( + config::MountConfigKey::from_str(&key.to_ascii_lowercase()).ok()?, + value.clone(), + )) + }) + .collect() + } +} + +#[derive(Clone, Default, Debug)] +pub struct MountFactory {} + +impl ObjectStoreFactory for MountFactory { + fn parse_url_opts( + &self, + url: &Url, + options: &StorageOptions, + ) -> DeltaResult<(ObjectStoreRef, Path)> { + let config = config::MountConfigHelper::try_new(options.as_mount_options())?.build()?; + + let allow_unsafe_rename = str_is_truthy( + config + .get(&config::MountConfigKey::AllowUnsafeRename) + .unwrap_or(&String::new()), + ); + println!("allow_unsafe_rename: {}", allow_unsafe_rename); + println!("url: {}", url); + + match url.scheme() { + "dbfs" => { + if !allow_unsafe_rename { + // Just let the user know that they need to set the allow_unsafe_rename option + return Err(error::Error::AllowUnsafeRenameNotSpecified.into()); + } + let new_url = Url::parse(&format!("file:///dbfs{}", url.path())).unwrap(); + let store = Arc::new(file::MountFileStorageBackend::try_new( + new_url.to_file_path().unwrap(), + )?) as ObjectStoreRef; + Ok((store, Path::from("/"))) + } + "file" => { + if allow_unsafe_rename { + let store = Arc::new(file::MountFileStorageBackend::try_new( + url.to_file_path().unwrap(), + )?) as ObjectStoreRef; + Ok((store, Path::from("/"))) + } else { + let store = Arc::new(LocalFileSystem::new_with_prefix( + url.to_file_path().unwrap(), + )?) as ObjectStoreRef; + Ok((store, Path::from("/"))) + } + } + _ => Err(DeltaTableError::InvalidTableLocation(url.clone().into())), + } + } +} + +impl LogStoreFactory for MountFactory { + fn with_options( + &self, + store: ObjectStoreRef, + location: &Url, + options: &StorageOptions, + ) -> DeltaResult> { + Ok(default_logstore(store, location, options)) + } +} + +/// Register an [ObjectStoreFactory] for common Mount [Url] schemes +pub fn register_handlers(_additional_prefixes: Option) { + let factory = Arc::new(MountFactory {}); + for scheme in ["dbfs", "file"].iter() { + let url = Url::parse(&format!("{}://", scheme)).unwrap(); + factories().insert(url.clone(), factory.clone()); + logstores().insert(url.clone(), factory.clone()); + } +} diff --git a/crates/mount/tests/context.rs b/crates/mount/tests/context.rs new file mode 100644 index 0000000000..227c89f1cd --- /dev/null +++ b/crates/mount/tests/context.rs @@ -0,0 +1,79 @@ +use deltalake_mount::register_handlers; +use deltalake_test::utils::{set_env_if_not_set, StorageIntegration}; +use fs_extra::dir::{copy, CopyOptions}; +use std::process::ExitStatus; +use tempfile::{tempdir, TempDir}; + +pub struct MountIntegration { + tmp_dir: TempDir, +} + +impl Default for MountIntegration { + fn default() -> Self { + register_handlers(None); + Self { + tmp_dir: tempdir().expect("Failed to make temp dir"), + } + } +} + +impl StorageIntegration for MountIntegration { + fn create_bucket(&self) -> std::io::Result { + Ok(ExitStatus::default()) + } + + fn prepare_env(&self) { + set_env_if_not_set("MOUNT_ALLOW_UNSAFE_RENAME", "true"); + } + fn bucket_name(&self) -> String { + self.tmp_dir.as_ref().to_str().unwrap().to_owned() + } + fn root_uri(&self) -> String { + format!("file://{}", self.bucket_name()) + } + fn copy_directory(&self, source: &str, destination: &str) -> std::io::Result { + let mut options = CopyOptions::new(); + options.content_only = true; + let dest_path = self.tmp_dir.path().join(destination); + std::fs::create_dir_all(&dest_path)?; + copy(source, &dest_path, &options).expect("Failed to copy"); + Ok(ExitStatus::default()) + } +} + +pub struct DbfsIntegration { + tmp_dir: TempDir, +} + +impl Default for DbfsIntegration { + fn default() -> Self { + register_handlers(None); + Self { + tmp_dir: TempDir::with_prefix("/dbfs/").expect("Failed to make temp dir"), + } + } +} + +impl StorageIntegration for DbfsIntegration { + fn create_bucket(&self) -> std::io::Result { + Ok(ExitStatus::default()) + } + + fn prepare_env(&self) { + set_env_if_not_set("MOUNT_ALLOW_UNSAFE_RENAME", "true"); + } + fn bucket_name(&self) -> String { + self.tmp_dir.as_ref().to_str().unwrap().to_owned() + } + fn root_uri(&self) -> String { + format!("dbfs:{}", self.bucket_name()) + } + fn copy_directory(&self, source: &str, destination: &str) -> std::io::Result { + let mut options = CopyOptions::new(); + options.content_only = true; + let dest_path = self.tmp_dir.path().join(destination); + std::fs::create_dir_all(&dest_path)?; + copy(source, &dest_path, &options).expect("Failed to copy"); + Ok(ExitStatus::default()) + } +} diff --git a/crates/mount/tests/integration.rs b/crates/mount/tests/integration.rs new file mode 100644 index 0000000000..ea974454ce --- /dev/null +++ b/crates/mount/tests/integration.rs @@ -0,0 +1,38 @@ +// #![cfg(feature = "integration_test")] + +use deltalake_test::read::read_table_paths; +use deltalake_test::{test_read_tables, IntegrationContext, TestResult}; +use serial_test::serial; + +mod context; +use context::*; + +static TEST_PREFIXES: &[&str] = &["my table", "你好/😊"]; + +#[tokio::test] +#[serial] +async fn test_integration_local() -> TestResult { + let context = IntegrationContext::new(Box::::default())?; + + test_read_tables(&context).await?; + + for prefix in TEST_PREFIXES { + read_table_paths(&context, prefix, prefix).await?; + } + + Ok(()) +} + +#[tokio::test] +#[serial] +async fn test_integration_dbfs() -> TestResult { + let context = IntegrationContext::new(Box::::default())?; + + test_read_tables(&context).await?; + + for prefix in TEST_PREFIXES { + read_table_paths(&context, prefix, prefix).await?; + } + + Ok(()) +} From ac59b7e5ae74922dad1a73df37d1feae06ad10b8 Mon Sep 17 00:00:00 2001 From: Yi Lin Date: Mon, 11 Mar 2024 04:36:12 +0000 Subject: [PATCH 2/5] fix test case and python --- crates/mount/src/lib.rs | 3 +-- crates/mount/tests/context.rs | 10 ++++++++-- crates/mount/tests/integration.rs | 2 +- python/Cargo.toml | 2 +- python/src/lib.rs | 1 + 5 files changed, 12 insertions(+), 6 deletions(-) diff --git a/crates/mount/src/lib.rs b/crates/mount/src/lib.rs index 10c3596ce8..2decb92b6c 100644 --- a/crates/mount/src/lib.rs +++ b/crates/mount/src/lib.rs @@ -48,8 +48,6 @@ impl ObjectStoreFactory for MountFactory { .get(&config::MountConfigKey::AllowUnsafeRename) .unwrap_or(&String::new()), ); - println!("allow_unsafe_rename: {}", allow_unsafe_rename); - println!("url: {}", url); match url.scheme() { "dbfs" => { @@ -57,6 +55,7 @@ impl ObjectStoreFactory for MountFactory { // Just let the user know that they need to set the allow_unsafe_rename option return Err(error::Error::AllowUnsafeRenameNotSpecified.into()); } + // We need to convert the dbfs url to a file url let new_url = Url::parse(&format!("file:///dbfs{}", url.path())).unwrap(); let store = Arc::new(file::MountFileStorageBackend::try_new( new_url.to_file_path().unwrap(), diff --git a/crates/mount/tests/context.rs b/crates/mount/tests/context.rs index 227c89f1cd..d7977b36de 100644 --- a/crates/mount/tests/context.rs +++ b/crates/mount/tests/context.rs @@ -49,7 +49,7 @@ impl Default for DbfsIntegration { fn default() -> Self { register_handlers(None); Self { - tmp_dir: TempDir::with_prefix("/dbfs/").expect("Failed to make temp dir"), + tmp_dir: tempdir().expect("Failed to make temp dir"), } } } @@ -61,6 +61,8 @@ impl StorageIntegration for DbfsIntegration { fn prepare_env(&self) { set_env_if_not_set("MOUNT_ALLOW_UNSAFE_RENAME", "true"); + std::fs::create_dir_all(format!("/dbfs{}", self.tmp_dir.as_ref().to_str().unwrap())) + .expect("Failed to create dir"); } fn bucket_name(&self) -> String { self.tmp_dir.as_ref().to_str().unwrap().to_owned() @@ -71,7 +73,11 @@ impl StorageIntegration for DbfsIntegration { fn copy_directory(&self, source: &str, destination: &str) -> std::io::Result { let mut options = CopyOptions::new(); options.content_only = true; - let dest_path = self.tmp_dir.path().join(destination); + let dest_path = format!( + "/dbfs{}/{}", + self.tmp_dir.as_ref().to_str().unwrap(), + destination + ); std::fs::create_dir_all(&dest_path)?; copy(source, &dest_path, &options).expect("Failed to copy"); Ok(ExitStatus::default()) diff --git a/crates/mount/tests/integration.rs b/crates/mount/tests/integration.rs index ea974454ce..85744ab7e7 100644 --- a/crates/mount/tests/integration.rs +++ b/crates/mount/tests/integration.rs @@ -1,4 +1,4 @@ -// #![cfg(feature = "integration_test")] +#![cfg(feature = "integration_test")] use deltalake_test::read::read_table_paths; use deltalake_test::{test_read_tables, IntegrationContext, TestResult}; diff --git a/python/Cargo.toml b/python/Cargo.toml index c7fa5ca2b1..486f37d695 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -45,7 +45,7 @@ features = ["extension-module", "abi3", "abi3-py38"] [dependencies.deltalake] path = "../crates/deltalake" version = "0" -features = ["azure", "gcs", "python", "datafusion", "unity-experimental"] +features = ["azure", "gcs", "python", "datafusion", "unity-experimental", "mount"] [features] default = ["rustls"] diff --git a/python/src/lib.rs b/python/src/lib.rs index 03326a1671..445ef921ea 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1674,6 +1674,7 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> { deltalake::aws::register_handlers(None); deltalake::azure::register_handlers(None); deltalake::gcp::register_handlers(None); + deltalake::mount::register_handlers(None); m.add("DeltaError", py.get_type::())?; m.add("CommitFailedError", py.get_type::())?; From a893e5e008d28f9f5e1bc709bf5c5c6e2e4fb404 Mon Sep 17 00:00:00 2001 From: Yi Lin Date: Wed, 13 Mar 2024 08:32:42 +0800 Subject: [PATCH 3/5] fix ci --- crates/deltalake/Cargo.toml | 4 ++-- crates/deltalake/src/lib.rs | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/deltalake/Cargo.toml b/crates/deltalake/Cargo.toml index 92dcdfab9d..ba3e26cbf9 100644 --- a/crates/deltalake/Cargo.toml +++ b/crates/deltalake/Cargo.toml @@ -14,7 +14,7 @@ rust-version.workspace = true [package.metadata.docs.rs] # We cannot use all_features because TLS features are mutually exclusive. # We cannot use hdfs feature because it requires Java to be installed. -features = ["azure", "datafusion", "gcs", "hdfs", "json", "python", "s3", "unity-experimental"] +features = ["azure", "datafusion", "gcs", "hdfs", "json", "mount", "python", "s3", "unity-experimental"] [dependencies] deltalake-core = { version = "0.17.1", path = "../core" } @@ -35,11 +35,11 @@ gcs = ["deltalake-gcp"] glue = ["deltalake-catalog-glue"] hdfs = [] json = ["deltalake-core/json"] +mount = ["deltalake-mount"] python = ["deltalake-core/python"] s3-native-tls = ["deltalake-aws/native-tls"] s3 = ["deltalake-aws/rustls"] unity-experimental = ["deltalake-core/unity-experimental"] -mount = ["deltalake-mount"] [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } diff --git a/crates/deltalake/src/lib.rs b/crates/deltalake/src/lib.rs index 38dc5d52dc..c72a72e8bf 100644 --- a/crates/deltalake/src/lib.rs +++ b/crates/deltalake/src/lib.rs @@ -9,3 +9,5 @@ pub use deltalake_aws as aws; pub use deltalake_azure as azure; #[cfg(feature = "gcs")] pub use deltalake_gcp as gcp; +#[cfg(feature = "mount")] +pub use deltalake_mount as mount; From 306f7bc7b8b2b54d6a72000f17e4463b541ce100 Mon Sep 17 00:00:00 2001 From: Yi Lin Date: Fri, 15 Mar 2024 07:14:29 +0800 Subject: [PATCH 4/5] Add test case for python --- python/tests/test_writer.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 42f0cd825e..dfd124a73d 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -322,6 +322,30 @@ def test_local_path( assert table == sample_data +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_local_path_with_unsafe_rename( + tmp_path: pathlib.Path, + sample_data: pa.Table, + monkeypatch, + engine: Literal["pyarrow", "rust"], +): + monkeypatch.chdir(tmp_path) # Make tmp_path the working directory + (tmp_path / "path/to/table").mkdir(parents=True) + + local_path = "./path/to/table" + storage_opts = { + "allow_unsafe_rename": "true", + } + write_deltalake( + local_path, sample_data, storage_options=storage_opts, engine=engine + ) + delta_table = DeltaTable(local_path, storage_options=storage_opts) + assert delta_table.schema().to_pyarrow() == sample_data.schema + + table = delta_table.to_pyarrow_table() + assert table == sample_data + + @pytest.mark.parametrize("engine", ["pyarrow", "rust"]) def test_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table, engine): write_deltalake( From 2459a68efc4a96cdbe3896e09a985cc8dc0f64ae Mon Sep 17 00:00:00 2001 From: Yi Lin Date: Fri, 15 Mar 2024 16:11:22 +0800 Subject: [PATCH 5/5] add ignore flag for dbfs test --- crates/mount/tests/integration.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/mount/tests/integration.rs b/crates/mount/tests/integration.rs index 85744ab7e7..14fcbcdc95 100644 --- a/crates/mount/tests/integration.rs +++ b/crates/mount/tests/integration.rs @@ -25,6 +25,7 @@ async fn test_integration_local() -> TestResult { #[tokio::test] #[serial] +#[ignore = "The DBFS tests currently hang due to CI pipeline cannot write to /dbfs"] async fn test_integration_dbfs() -> TestResult { let context = IntegrationContext::new(Box::::default())?;