Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move unity catalog integration to it's own crate #3044

Merged
merged 4 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,9 @@ mod tests {
let factory = S3LogStoreFactory::default();
let store = InMemory::new();
let url = Url::parse("s3://test-bucket").unwrap();
std::env::remove_var(crate::constants::AWS_S3_LOCKING_PROVIDER);
unsafe {
std::env::remove_var(crate::constants::AWS_S3_LOCKING_PROVIDER);
}
let logstore = factory
.with_options(Arc::new(store), &url, &StorageOptions::from(HashMap::new()))
.unwrap();
Expand Down
4 changes: 1 addition & 3 deletions crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ mod tests {
ScopedEnv::run(|| {
clear_env_of_aws_keys();
std::env::remove_var(constants::AWS_ENDPOINT_URL);

let options = S3StorageOptions::from_map(&hashmap! {
constants::AWS_REGION.to_string() => "eu-west-1".to_string(),
constants::AWS_ACCESS_KEY_ID.to_string() => "test".to_string(),
Expand Down Expand Up @@ -767,12 +768,10 @@ mod tests {
ScopedEnv::run(|| {
clear_env_of_aws_keys();
let raw_options = hashmap! {};

std::env::set_var(constants::AWS_ACCESS_KEY_ID, "env_key");
std::env::set_var(constants::AWS_ENDPOINT_URL, "env_key");
std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "env_key");
std::env::set_var(constants::AWS_REGION, "env_key");

let combined_options =
S3ObjectStoreFactory {}.with_env_s3(&StorageOptions(raw_options));

Expand All @@ -795,7 +794,6 @@ mod tests {
"AWS_SECRET_ACCESS_KEY".to_string() => "options_key".to_string(),
"AWS_REGION".to_string() => "options_key".to_string()
};

std::env::set_var("aws_access_key_id", "env_key");
std::env::set_var("aws_endpoint", "env_key");
std::env::set_var("aws_secret_access_key", "env_key");
Expand Down
12 changes: 7 additions & 5 deletions crates/aws/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use deltalake_aws::constants;
use deltalake_aws::register_handlers;
use deltalake_aws::storage::*;
use deltalake_test::utils::*;
use rand::Rng;
use rand::random;
use std::process::{Command, ExitStatus, Stdio};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -43,14 +43,16 @@ impl StorageIntegration for S3Integration {
fn prepare_env(&self) {
set_env_if_not_set(
constants::LOCK_TABLE_KEY_NAME,
format!("delta_log_it_{}", rand::thread_rng().gen::<u16>()),
format!("delta_log_it_{}", random::<u16>()),
);
match std::env::var(s3_constants::AWS_ENDPOINT_URL).ok() {
Some(endpoint_url) if endpoint_url.to_lowercase() == "none" => {
Some(endpoint_url) if endpoint_url.to_lowercase() == "none" => unsafe {
std::env::remove_var(s3_constants::AWS_ENDPOINT_URL)
}
},
Some(_) => (),
None => std::env::set_var(s3_constants::AWS_ENDPOINT_URL, "http://localhost:4566"),
None => unsafe {
std::env::set_var(s3_constants::AWS_ENDPOINT_URL, "http://localhost:4566")
},
}
set_env_if_not_set(s3_constants::AWS_ACCESS_KEY_ID, "deltalake");
set_env_if_not_set(s3_constants::AWS_SECRET_ACCESS_KEY, "weloverust");
Expand Down
2 changes: 2 additions & 0 deletions crates/catalog-glue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ const PLACEHOLDER_SUFFIX: &str = "-__PLACEHOLDER__";

#[async_trait::async_trait]
impl DataCatalog for GlueDataCatalog {
type Error = DataCatalogError;

/// Get the table storage location from the Glue Data Catalog
async fn get_table_storage_location(
&self,
Expand Down
40 changes: 40 additions & 0 deletions crates/catalog-unity/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
[package]
name = "deltalake-catalog-unity"
version = "0.6.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
edition.workspace = true
homepage.workspace = true
description.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true

[dependencies]
async-trait.workspace = true
tokio.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
deltalake-core = { version = "0.22", path = "../core" }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json", "http2"] }
reqwest-retry = "0.7"
reqwest-middleware = "0.4.0"
rand = "0.8"
futures = "0.3"
chrono = "0.4"
dashmap = "6"
tracing = "0.1"
datafusion = { version = "43", optional = true }
datafusion-common = { version = "43", optional = true }

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tempfile = "3"
httpmock = { version = "0.8.0-alpha.1" }

[features]
default = []
datafusion = ["dep:datafusion", "datafusion-common"]

Empty file.
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
//! Generic utilities reqwest based Catalog implementations

pub mod backoff;
#[cfg(test)]
pub mod mock_server;
#[allow(unused)]
pub mod pagination;
pub mod retry;
pub mod token;

use crate::client::retry::RetryConfig;
use crate::UnityCatalogError;
use deltalake_core::data_catalog::DataCatalogResult;
use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::{Client, ClientBuilder, Proxy};
use reqwest::{ClientBuilder, Proxy};
use reqwest_middleware::ClientWithMiddleware;
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use std::time::Duration;

fn map_client_error(e: reqwest::Error) -> super::DataCatalogError {
Expand Down Expand Up @@ -38,6 +41,7 @@ pub struct ClientOptions {
http2_keep_alive_while_idle: bool,
http1_only: bool,
http2_only: bool,
retry_config: Option<RetryConfig>,
}

impl ClientOptions {
Expand Down Expand Up @@ -164,7 +168,12 @@ impl ClientOptions {
self
}

pub(crate) fn client(&self) -> super::DataCatalogResult<Client> {
pub fn with_retry_config(mut self, cfg: RetryConfig) -> Self {
self.retry_config = Some(cfg);
self
}

pub(crate) fn client(&self) -> DataCatalogResult<ClientWithMiddleware> {
let mut builder = ClientBuilder::new();

match &self.user_agent {
Expand Down Expand Up @@ -221,9 +230,19 @@ impl ClientOptions {
builder = builder.danger_accept_invalid_certs(self.allow_insecure)
}

builder
let inner_client = builder
.https_only(!self.allow_http)
.build()
.map_err(map_client_error)
.map_err(UnityCatalogError::from)?;
let retry_policy = self
.retry_config
.as_ref()
.map(|retry| retry.into())
.unwrap_or(ExponentialBackoff::builder().build_with_max_retries(3));

let middleware = RetryTransientMiddleware::new_with_policy(retry_policy);
Ok(reqwest_middleware::ClientBuilder::new(inner_client)
.with(middleware)
.build())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::future::Future;

use futures::Stream;

use crate::data_catalog::DataCatalogResult;
use deltalake_core::data_catalog::DataCatalogResult;

/// Takes a paginated operation `op` that when called with:
///
Expand Down
118 changes: 118 additions & 0 deletions crates/catalog-unity/src/client/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
//! A shared HTTP client implementation incorporating retries

use super::backoff::BackoffConfig;
use deltalake_core::DataCatalogError;
use reqwest::StatusCode;
use reqwest_retry::policies::ExponentialBackoff;
use std::time::Duration;

/// Retry request error
#[derive(Debug)]
pub struct RetryError {
retries: usize,
message: String,
source: Option<reqwest::Error>,
}

impl std::fmt::Display for RetryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"response error \"{}\", after {} retries",
self.message, self.retries
)?;
if let Some(source) = &self.source {
write!(f, ": {source}")?;
}
Ok(())
}
}

impl std::error::Error for RetryError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.source.as_ref().map(|e| e as _)
}
}

impl RetryError {
/// Returns the status code associated with this error if any
pub fn status(&self) -> Option<StatusCode> {
self.source.as_ref().and_then(|e| e.status())
}
}

impl From<RetryError> for std::io::Error {
fn from(err: RetryError) -> Self {
use std::io::ErrorKind;
match (&err.source, err.status()) {
(Some(source), _) if source.is_builder() || source.is_request() => {
Self::new(ErrorKind::InvalidInput, err)
}
(_, Some(StatusCode::NOT_FOUND)) => Self::new(ErrorKind::NotFound, err),
(_, Some(StatusCode::BAD_REQUEST)) => Self::new(ErrorKind::InvalidInput, err),
(Some(source), None) if source.is_timeout() => Self::new(ErrorKind::TimedOut, err),
(Some(source), None) if source.is_connect() => Self::new(ErrorKind::NotConnected, err),
_ => Self::new(ErrorKind::Other, err),
}
}
}

impl From<RetryError> for DataCatalogError {
fn from(value: RetryError) -> Self {
DataCatalogError::Generic {
catalog: "",
source: Box::new(value),
}
}
}

/// Error retrying http requests
pub type Result<T, E = RetryError> = std::result::Result<T, E>;

/// Contains the configuration for how to respond to server errors
///
/// By default, they will be retried up to some limit, using exponential
/// backoff with jitter. See [`BackoffConfig`] for more information
///
#[derive(Debug, Clone)]
pub struct RetryConfig {
/// The backoff configuration
pub backoff: BackoffConfig,

/// The maximum number of times to retry a request
///
/// Set to 0 to disable retries
pub max_retries: usize,

/// The maximum length of time from the initial request
/// after which no further retries will be attempted
///
/// This not only bounds the length of time before a server
/// error will be surfaced to the application, but also bounds
/// the length of time a request's credentials must remain valid.
///
/// As requests are retried without renewing credentials or
/// regenerating request payloads, this number should be kept
/// below 5 minutes to avoid errors due to expired credentials
/// and/or request payloads
pub retry_timeout: Duration,
}

impl Default for RetryConfig {
fn default() -> Self {
Self {
backoff: Default::default(),
max_retries: 10,
retry_timeout: Duration::from_secs(3 * 60),
}
}
}

impl From<&RetryConfig> for ExponentialBackoff {
fn from(val: &RetryConfig) -> ExponentialBackoff {
ExponentialBackoff::builder()
.retry_bounds(val.backoff.init_backoff, val.backoff.max_backoff)
.base(val.backoff.base as u32)
.build_with_max_retries(val.max_retries as u32)
}
}
Loading
Loading