Skip to content

Commit

Permalink
feat: move unity catalog integration into its own crate
Browse files Browse the repository at this point in the history
Signed-off-by: Stephen Carman <[email protected]>
  • Loading branch information
hntd187 committed Dec 15, 2024
1 parent 8f52fdc commit cfc6b30
Show file tree
Hide file tree
Showing 48 changed files with 841 additions and 940 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ resolver = "2"

[workspace.package]
authors = ["Qingping Hou <[email protected]>"]
rust-version = "1.85"
rust-version = "1.80"
keywords = ["deltalake", "delta", "datalake"]
readme = "README.md"
edition = "2021"
Expand All @@ -26,7 +26,7 @@ debug = true
debug = "line-tables-only"

[workspace.dependencies]
delta_kernel = { version = "0.4.1", features = ["sync-engine"] }
delta_kernel = { version = "0.5.0", features = ["default-engine"] }
#delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] }

# arrow
Expand Down
1 change: 1 addition & 0 deletions crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,7 @@ fn extract_version_from_filename(name: &str) -> Option<i64> {
mod tests {
use super::*;
use aws_sdk_sts::config::ProvideCredentials;

use object_store::memory::InMemory;
use serial_test::serial;

Expand Down
114 changes: 55 additions & 59 deletions crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,30 +529,30 @@ mod tests {
fn storage_options_default_test() {
ScopedEnv::run(|| {
clear_env_of_aws_keys();
unsafe {
std::env::set_var(constants::AWS_ENDPOINT_URL, "http://localhost");
std::env::set_var(constants::AWS_REGION, "us-west-1");
std::env::set_var(constants::AWS_PROFILE, "default");
std::env::set_var(constants::AWS_ACCESS_KEY_ID, "default_key_id");
std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "default_secret_key");
std::env::set_var(constants::AWS_S3_LOCKING_PROVIDER, "dynamodb");
std::env::set_var(
constants::AWS_IAM_ROLE_ARN,
"arn:aws:iam::123456789012:role/some_role",
);
std::env::set_var(constants::AWS_IAM_ROLE_SESSION_NAME, "session_name");
std::env::set_var(
#[allow(deprecated)]
constants::AWS_S3_ASSUME_ROLE_ARN,
"arn:aws:iam::123456789012:role/some_role",
);
std::env::set_var(
#[allow(deprecated)]
constants::AWS_S3_ROLE_SESSION_NAME,
"session_name",
);
std::env::set_var(constants::AWS_WEB_IDENTITY_TOKEN_FILE, "token_file");
}

std::env::set_var(constants::AWS_ENDPOINT_URL, "http://localhost");
std::env::set_var(constants::AWS_REGION, "us-west-1");
std::env::set_var(constants::AWS_PROFILE, "default");
std::env::set_var(constants::AWS_ACCESS_KEY_ID, "default_key_id");
std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "default_secret_key");
std::env::set_var(constants::AWS_S3_LOCKING_PROVIDER, "dynamodb");
std::env::set_var(
constants::AWS_IAM_ROLE_ARN,
"arn:aws:iam::123456789012:role/some_role",
);
std::env::set_var(constants::AWS_IAM_ROLE_SESSION_NAME, "session_name");
std::env::set_var(
#[allow(deprecated)]
constants::AWS_S3_ASSUME_ROLE_ARN,
"arn:aws:iam::123456789012:role/some_role",
);
std::env::set_var(
#[allow(deprecated)]
constants::AWS_S3_ROLE_SESSION_NAME,
"session_name",
);
std::env::set_var(constants::AWS_WEB_IDENTITY_TOKEN_FILE, "token_file");

let options = S3StorageOptions::try_default().unwrap();
assert_eq!(
S3StorageOptions {
Expand Down Expand Up @@ -585,7 +585,8 @@ mod tests {
fn storage_options_with_only_region_and_credentials() {
ScopedEnv::run(|| {
clear_env_of_aws_keys();
unsafe { std::env::remove_var(constants::AWS_ENDPOINT_URL); }
std::env::remove_var(constants::AWS_ENDPOINT_URL);

let options = S3StorageOptions::from_map(&hashmap! {
constants::AWS_REGION.to_string() => "eu-west-1".to_string(),
constants::AWS_ACCESS_KEY_ID.to_string() => "test".to_string(),
Expand Down Expand Up @@ -676,28 +677,26 @@ mod tests {
fn storage_options_mixed_test() {
ScopedEnv::run(|| {
clear_env_of_aws_keys();
unsafe {
std::env::set_var(constants::AWS_ENDPOINT_URL, "http://localhost");
std::env::set_var(
constants::AWS_ENDPOINT_URL_DYNAMODB,
"http://localhost:dynamodb",
);
std::env::set_var(constants::AWS_REGION, "us-west-1");
std::env::set_var(constants::AWS_PROFILE, "default");
std::env::set_var(constants::AWS_ACCESS_KEY_ID, "wrong_key_id");
std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "wrong_secret_key");
std::env::set_var(constants::AWS_S3_LOCKING_PROVIDER, "dynamodb");
std::env::set_var(
constants::AWS_IAM_ROLE_ARN,
"arn:aws:iam::123456789012:role/some_role",
);
std::env::set_var(constants::AWS_IAM_ROLE_SESSION_NAME, "session_name");
std::env::set_var(constants::AWS_WEB_IDENTITY_TOKEN_FILE, "token_file");

std::env::set_var(constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS, "1");
std::env::set_var(constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS, "2");
std::env::set_var(constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES, "3");
}
std::env::set_var(constants::AWS_ENDPOINT_URL, "http://localhost");
std::env::set_var(
constants::AWS_ENDPOINT_URL_DYNAMODB,
"http://localhost:dynamodb",
);
std::env::set_var(constants::AWS_REGION, "us-west-1");
std::env::set_var(constants::AWS_PROFILE, "default");
std::env::set_var(constants::AWS_ACCESS_KEY_ID, "wrong_key_id");
std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "wrong_secret_key");
std::env::set_var(constants::AWS_S3_LOCKING_PROVIDER, "dynamodb");
std::env::set_var(
constants::AWS_IAM_ROLE_ARN,
"arn:aws:iam::123456789012:role/some_role",
);
std::env::set_var(constants::AWS_IAM_ROLE_SESSION_NAME, "session_name");
std::env::set_var(constants::AWS_WEB_IDENTITY_TOKEN_FILE, "token_file");

std::env::set_var(constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS, "1");
std::env::set_var(constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS, "2");
std::env::set_var(constants::AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES, "3");
let options = S3StorageOptions::from_map(&hashmap! {
constants::AWS_ACCESS_KEY_ID.to_string() => "test_id_mixed".to_string(),
constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret_mixed".to_string(),
Expand Down Expand Up @@ -769,12 +768,10 @@ mod tests {
ScopedEnv::run(|| {
clear_env_of_aws_keys();
let raw_options = hashmap! {};
unsafe {
std::env::set_var(constants::AWS_ACCESS_KEY_ID, "env_key");
std::env::set_var(constants::AWS_ENDPOINT_URL, "env_key");
std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "env_key");
std::env::set_var(constants::AWS_REGION, "env_key");
}
std::env::set_var(constants::AWS_ACCESS_KEY_ID, "env_key");
std::env::set_var(constants::AWS_ENDPOINT_URL, "env_key");
std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "env_key");
std::env::set_var(constants::AWS_REGION, "env_key");
let combined_options =
S3ObjectStoreFactory {}.with_env_s3(&StorageOptions(raw_options));

Expand All @@ -797,12 +794,11 @@ mod tests {
"AWS_SECRET_ACCESS_KEY".to_string() => "options_key".to_string(),
"AWS_REGION".to_string() => "options_key".to_string()
};
unsafe {
std::env::set_var("aws_access_key_id", "env_key");
std::env::set_var("aws_endpoint", "env_key");
std::env::set_var("aws_secret_access_key", "env_key");
std::env::set_var("aws_region", "env_key");
}
std::env::set_var("aws_access_key_id", "env_key");
std::env::set_var("aws_endpoint", "env_key");
std::env::set_var("aws_secret_access_key", "env_key");
std::env::set_var("aws_region", "env_key");

let combined_options =
S3ObjectStoreFactory {}.with_env_s3(&StorageOptions(raw_options));

Expand Down
2 changes: 1 addition & 1 deletion crates/aws/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use deltalake_aws::constants;
use deltalake_aws::register_handlers;
use deltalake_aws::storage::*;
use deltalake_test::utils::*;
use rand::{random, Rng};
use rand::random;
use std::process::{Command, ExitStatus, Stdio};

#[derive(Clone, Debug)]
Expand Down
2 changes: 2 additions & 0 deletions crates/catalog-glue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ const PLACEHOLDER_SUFFIX: &str = "-__PLACEHOLDER__";

#[async_trait::async_trait]
impl DataCatalog for GlueDataCatalog {
type Error = DataCatalogError;

/// Get the table storage location from the Glue Data Catalog
async fn get_table_storage_location(
&self,
Expand Down
16 changes: 9 additions & 7 deletions crates/catalog-unity/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
async-trait = { workspace = true }
deltalake-core = { version = "0.22", path = "../core", features = ["unity-experimental"] }
thiserror = { workspace = true }
async-trait.workspace = true
tokio.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
deltalake-core = { version = "0.22", path = "../core" }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json", "http2"] }
reqwest-retry = "0.7"
reqwest-middleware = "0.4.0"
rand = "0.8"
futures = "0.3"
chrono = "0.4"
tokio.workspace = true
serde.workspace = true
serde_json.workspace = true
dashmap = "6"
tracing = "0.1"
datafusion = { version = "43", optional = true }
Expand All @@ -30,7 +32,7 @@ datafusion-common = { version = "43", optional = true }
[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tempfile = "3"
httpmock = { version = "0.8.0-alpha.1", features = [] }
httpmock = { version = "0.8.0-alpha.1" }

[features]
default = []
Expand Down
94 changes: 0 additions & 94 deletions crates/catalog-unity/src/client/mock_server.rs
Original file line number Diff line number Diff line change
@@ -1,94 +0,0 @@
use std::collections::VecDeque;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use parking_lot::Mutex;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;

pub type ResponseFn = Box<dyn FnOnce(Request<Body>) -> Response<Body> + Send>;

/// A mock server
pub struct MockServer {
responses: Arc<Mutex<VecDeque<ResponseFn>>>,
shutdown: oneshot::Sender<()>,
handle: JoinHandle<()>,
url: String,
}

impl Default for MockServer {
fn default() -> Self {
Self::new()
}
}

impl MockServer {
pub fn new() -> Self {
let responses: Arc<Mutex<VecDeque<ResponseFn>>> =
Arc::new(Mutex::new(VecDeque::with_capacity(10)));

let r = Arc::clone(&responses);
let make_service = make_service_fn(move |_conn| {
let r = Arc::clone(&r);
async move {
Ok::<_, Infallible>(service_fn(move |req| {
let r = Arc::clone(&r);
async move {
Ok::<_, Infallible>(match r.lock().pop_front() {
Some(r) => r(req),
None => Response::new(Body::from("Hello World")),
})
}
}))
}
});

let (shutdown, rx) = oneshot::channel::<()>();
let server = Server::bind(&SocketAddr::from(([127, 0, 0, 1], 0))).serve(make_service);

let url = format!("http://{}", server.local_addr());

let handle = tokio::spawn(async move {
server
.with_graceful_shutdown(async {
rx.await.ok();
})
.await
.unwrap()
});

Self {
responses,
shutdown,
handle,
url,
}
}

/// The url of the mock server
pub fn url(&self) -> &str {
&self.url
}

/// Add a response
pub fn push(&self, response: Response<Body>) {
self.push_fn(|_| response)
}

/// Add a response function
pub fn push_fn<F>(&self, f: F)
where
F: FnOnce(Request<Body>) -> Response<Body> + Send + 'static,
{
self.responses.lock().push_back(Box::new(f))
}

/// Shutdown the mock server
pub async fn shutdown(self) {
let _ = self.shutdown.send(());
self.handle.await.unwrap()
}
}
31 changes: 25 additions & 6 deletions crates/catalog-unity/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
//! Generic utilities reqwest based Catalog implementations
pub mod backoff;
// #[cfg(test)]
// pub mod mock_server;
#[allow(unused)]
pub mod pagination;
pub mod retry;
pub mod token;

use crate::client::retry::RetryConfig;
use crate::UnityCatalogError;
use deltalake_core::data_catalog::DataCatalogResult;
use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::{Client, ClientBuilder, Proxy};
use reqwest::{ClientBuilder, Proxy};
use reqwest_middleware::ClientWithMiddleware;
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use std::time::Duration;

fn map_client_error(e: reqwest::Error) -> super::DataCatalogError {
Expand Down Expand Up @@ -38,6 +41,7 @@ pub struct ClientOptions {
http2_keep_alive_while_idle: bool,
http1_only: bool,
http2_only: bool,
retry_config: Option<RetryConfig>,
}

impl ClientOptions {
Expand Down Expand Up @@ -164,7 +168,12 @@ impl ClientOptions {
self
}

pub(crate) fn client(&self) -> super::DataCatalogResult<Client> {
pub fn with_retry_config(mut self, cfg: RetryConfig) -> Self {
self.retry_config = Some(cfg);
self
}

pub(crate) fn client(&self) -> DataCatalogResult<ClientWithMiddleware> {
let mut builder = ClientBuilder::new();

match &self.user_agent {
Expand Down Expand Up @@ -221,9 +230,19 @@ impl ClientOptions {
builder = builder.danger_accept_invalid_certs(self.allow_insecure)
}

builder
let inner_client = builder
.https_only(!self.allow_http)
.build()
.map_err(map_client_error)
.map_err(UnityCatalogError::from)?;
let retry_policy = self
.retry_config
.as_ref()
.map(|retry| retry.into())
.unwrap_or(ExponentialBackoff::builder().build_with_max_retries(3));

let middleware = RetryTransientMiddleware::new_with_policy(retry_policy);
Ok(reqwest_middleware::ClientBuilder::new(inner_client)
.with(middleware)
.build())
}
}
Loading

0 comments on commit cfc6b30

Please sign in to comment.