Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: compatible to write to local file systems that do not support hard link #1868

Merged
merged 5 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/deltalake/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ rust-version.workspace = true
[package.metadata.docs.rs]
# We cannot use all_features because TLS features are mutually exclusive.
# We cannot use hdfs feature because it requires Java to be installed.
features = ["azure", "datafusion", "gcs", "hdfs", "json", "python", "s3", "unity-experimental"]
features = ["azure", "datafusion", "gcs", "hdfs", "json", "mount", "python", "s3", "unity-experimental"]

[dependencies]
deltalake-core = { version = "0.17.1", path = "../core" }
deltalake-aws = { version = "0.1.0", path = "../aws", default-features = false, optional = true }
deltalake-azure = { version = "0.1.0", path = "../azure", optional = true }
deltalake-gcp = { version = "0.1.0", path = "../gcp", optional = true }
deltalake-catalog-glue = { version = "0.1.0", path = "../catalog-glue", optional = true }
deltalake-mount = { version = "0.1.0", path = "../mount", optional = true }

[features]
# All of these features are just reflected into the core crate until that
Expand All @@ -34,6 +35,7 @@ gcs = ["deltalake-gcp"]
glue = ["deltalake-catalog-glue"]
hdfs = []
json = ["deltalake-core/json"]
mount = ["deltalake-mount"]
python = ["deltalake-core/python"]
s3-native-tls = ["deltalake-aws/native-tls"]
s3 = ["deltalake-aws/rustls"]
Expand Down
2 changes: 2 additions & 0 deletions crates/deltalake/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ pub use deltalake_aws as aws;
pub use deltalake_azure as azure;
#[cfg(feature = "gcs")]
pub use deltalake_gcp as gcp;
#[cfg(feature = "mount")]
pub use deltalake_mount as mount;
43 changes: 43 additions & 0 deletions crates/mount/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
[package]
name = "deltalake-mount"
version = "0.1.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
edition.workspace = true
homepage.workspace = true
description.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.17.0", path = "../core", features = [
"datafusion",
] }
lazy_static = "1"
errno = "0.3"

# workspace depenndecies
async-trait = { workspace = true }
bytes = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }
object_store = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
regex = { workspace = true }
url = { workspace = true }

[dev-dependencies]
chrono = { workspace = true }
serial_test = "3"
deltalake-test = { path = "../test" }
pretty_env_logger = "0.5.0"
rand = "0.8"
serde_json = { workspace = true }
tempfile = "3"
fs_extra = "1.3.0"

[features]
integration_test = []
80 changes: 80 additions & 0 deletions crates/mount/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//! Auxiliary module for generating a valig Mount configuration.
use std::collections::{hash_map::Entry, HashMap};
use std::str::FromStr;

use crate::error::{Error, Result};

/// Typed property keys that can be defined on a mounted path
#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
#[non_exhaustive]
pub enum MountConfigKey {
/// If set to "true", allows creating commits without concurrent writer protection.
/// Only safe if there is one writer to a given table.
AllowUnsafeRename,
}

impl AsRef<str> for MountConfigKey {
fn as_ref(&self) -> &str {
match self {
Self::AllowUnsafeRename => "mount_allow_unsafe_rename",
}
}
}

impl FromStr for MountConfigKey {
type Err = Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"mount_allow_unsafe_rename" | "allow_unsafe_rename" => Ok(Self::AllowUnsafeRename),
_ => Err(Error::UnknownConfigKey(s.to_string())),
}
}
}

/// Helper struct to create full configuration from passed options and environment
pub(crate) struct MountConfigHelper {
config: HashMap<MountConfigKey, String>,
env_config: HashMap<MountConfigKey, String>,
}

impl MountConfigHelper {
/// Create a new [`ConfigHelper`]
pub fn try_new(
config: impl IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>,
) -> Result<Self> {
let mut env_config = HashMap::new();
for (os_key, os_value) in std::env::vars_os() {
if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
if key.starts_with("MOUNT_") {
if let Ok(config_key) = MountConfigKey::from_str(&key.to_ascii_lowercase()) {
env_config.insert(config_key, value.to_string());
}
}
}
}

Ok(Self {
config: config
.into_iter()
.map(|(key, value)| Ok((MountConfigKey::from_str(key.as_ref())?, value.into())))
.collect::<Result<_, Error>>()?,
env_config,
})
}

/// Generate a cofiguration augmented with options from the environment
pub fn build(mut self) -> Result<HashMap<MountConfigKey, String>> {
// Add keys from the environment to the configuration, as e.g. client configuration options.
// NOTE We have to specifically configure omitting keys, since workload identity can
// work purely using defaults, but partial config may be present in the environment.
// Preference of conflicting configs (e.g. msi resource id vs. client id is handled in object store)
for key in self.env_config.keys() {
if let Entry::Vacant(e) = self.config.entry(*key) {
e.insert(self.env_config.get(key).unwrap().to_owned());
}
}

Ok(self.config)
}
}
33 changes: 33 additions & 0 deletions crates/mount/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use deltalake_core::errors::DeltaTableError;

pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[allow(dead_code)]
#[error("failed to parse config: {0}")]
Parse(String),

/// Unknown configuration key
#[error("Unknown configuration key: {0}")]
UnknownConfigKey(String),

#[error("The `allow_unsafe_rename` parameter must be specified")]
AllowUnsafeRenameNotSpecified,

#[error(transparent)]
ObjectStore(#[from] object_store::Error),
}

impl From<Error> for DeltaTableError {
fn from(e: Error) -> Self {
match e {
Error::Parse(msg) => DeltaTableError::Generic(msg),
Error::UnknownConfigKey(msg) => DeltaTableError::Generic(msg),
Error::AllowUnsafeRenameNotSpecified => DeltaTableError::Generic(
"The `allow_unsafe_rename` parameter must be specified".to_string(),
),
Error::ObjectStore(e) => DeltaTableError::ObjectStore { source: e },
}
}
}
Loading
Loading