From 514c2869e896ca7311348167eddff384970ba737 Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Fri, 13 Dec 2024 14:13:30 -0500 Subject: [PATCH] Added an error type to the DataCatalog trait to not require all the conversion between error types. Signed-off-by: Stephen Carman --- crates/aws/src/storage.rs | 4 +- crates/catalog-glue/src/lib.rs | 2 + crates/catalog-unity/Cargo.toml | 10 ++--- crates/catalog-unity/src/credential.rs | 10 ++--- crates/catalog-unity/src/lib.rs | 49 ++++++++------------- crates/core/src/data_catalog/mod.rs | 4 +- crates/core/src/data_catalog/storage/mod.rs | 4 +- 7 files changed, 38 insertions(+), 45 deletions(-) diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index 143c5c4aab..cec1e2e833 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -273,7 +273,9 @@ impl S3StorageOptions { fn ensure_env_var(map: &HashMap, key: &str) { if let Some(val) = str_option(map, key) { - std::env::set_var(key, val); + unsafe { + std::env::set_var(key, val); + } } } diff --git a/crates/catalog-glue/src/lib.rs b/crates/catalog-glue/src/lib.rs index e9ef449be2..089ce56ce2 100644 --- a/crates/catalog-glue/src/lib.rs +++ b/crates/catalog-glue/src/lib.rs @@ -60,6 +60,8 @@ const PLACEHOLDER_SUFFIX: &str = "-__PLACEHOLDER__"; #[async_trait::async_trait] impl DataCatalog for GlueDataCatalog { + type Error = DataCatalogError; + /// Get the table storage location from the Glue Data Catalog async fn get_table_storage_location( &self, diff --git a/crates/catalog-unity/Cargo.toml b/crates/catalog-unity/Cargo.toml index 6ce6e689f5..8a0827386b 100644 --- a/crates/catalog-unity/Cargo.toml +++ b/crates/catalog-unity/Cargo.toml @@ -12,18 +12,18 @@ repository.workspace = true rust-version.workspace = true [dependencies] -async-trait = { workspace = true } +async-trait.workspace = true +tokio.workspace = true +serde.workspace = true +serde_json.workspace = true +thiserror.workspace = true deltalake-core = { version = "0.22", path = "../core" } -thiserror = { workspace = true } reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json", "http2"] } reqwest-retry = "0.7" reqwest-middleware = "0.4.0" rand = "0.8" futures = "0.3" chrono = "0.4" -tokio.workspace = true -serde.workspace = true -serde_json.workspace = true dashmap = "6" tracing = "0.1" datafusion = { version = "43", optional = true } diff --git a/crates/catalog-unity/src/credential.rs b/crates/catalog-unity/src/credential.rs index 9f0d08686a..c586af1bb0 100644 --- a/crates/catalog-unity/src/credential.rs +++ b/crates/catalog-unity/src/credential.rs @@ -38,7 +38,7 @@ pub trait TokenCredential: std::fmt::Debug + Send + Sync + 'static { async fn fetch_token( &self, client: &ClientWithMiddleware, - ) -> DataCatalogResult>; + ) -> Result, UnityCatalogError>; } /// Provides credentials for use when signing requests @@ -95,7 +95,7 @@ impl TokenCredential for ClientSecretOAuthProvider { async fn fetch_token( &self, client: &ClientWithMiddleware, - ) -> DataCatalogResult> { + ) -> Result, UnityCatalogError> { let response: TokenResponse = client .request(Method::POST, &self.token_url) .header(ACCEPT, HeaderValue::from_static(CONTENT_TYPE_JSON)) @@ -168,7 +168,7 @@ impl TokenCredential for AzureCliCredential { async fn fetch_token( &self, _client: &ClientWithMiddleware, - ) -> DataCatalogResult> { + ) -> Result, UnityCatalogError> { // on window az is a cmd and it should be called like this // see https://doc.rust-lang.org/nightly/std/process/struct.Command.html let program = if cfg!(target_os = "windows") { @@ -281,7 +281,7 @@ impl TokenCredential for WorkloadIdentityOAuthProvider { async fn fetch_token( &self, client: &ClientWithMiddleware, - ) -> DataCatalogResult> { + ) -> Result, UnityCatalogError> { let token_str = std::fs::read_to_string(&self.federated_token_file) .map_err(|_| UnityCatalogError::FederatedTokenFile)?; @@ -371,7 +371,7 @@ impl TokenCredential for ImdsManagedIdentityOAuthProvider { async fn fetch_token( &self, _client: &ClientWithMiddleware, - ) -> DataCatalogResult> { + ) -> Result, UnityCatalogError> { let resource_scope = format!("{}/.default", DATABRICKS_RESOURCE_SCOPE); let mut query_items = vec![ ("api-version", MSI_API_VERSION), diff --git a/crates/catalog-unity/src/lib.rs b/crates/catalog-unity/src/lib.rs index d03d08702c..2b78eb5a92 100644 --- a/crates/catalog-unity/src/lib.rs +++ b/crates/catalog-unity/src/lib.rs @@ -1,9 +1,7 @@ //! Databricks Unity Catalog. -//! -//! This module is gated behind the "unity-experimental" feature. use std::str::FromStr; -use reqwest::header::{HeaderValue, AUTHORIZATION}; +use reqwest::header::{HeaderValue, InvalidHeaderValue, AUTHORIZATION}; use crate::credential::{AzureCliCredential, ClientSecretOAuthProvider, CredentialProvider}; use crate::models::{ @@ -25,7 +23,7 @@ pub mod models; /// Possible errors from the unity-catalog/tables API call #[derive(thiserror::Error, Debug)] -enum UnityCatalogError { +pub enum UnityCatalogError { #[error("GET request error: {source}")] /// Error from reqwest library RequestError { @@ -50,9 +48,11 @@ enum UnityCatalogError { message: String, }, - /// Unknown configuration key - #[error("Unknown configuration key: {catalog} in catalog: {key}")] - UnknownConfigKey { catalog: &'static str, key: String }, + #[error("Invalid token for auth header: {header_error}")] + InvalidHeader { + #[from] + header_error: InvalidHeaderValue, + }, /// Unknown configuration key #[error("Missing configuration key: {0}")] @@ -75,9 +75,6 @@ enum UnityCatalogError { impl From for DataCatalogError { fn from(value: UnityCatalogError) -> Self { match value { - UnityCatalogError::UnknownConfigKey { catalog, key } => { - DataCatalogError::UnknownConfigKey { catalog, key } - } _ => DataCatalogError::Generic { catalog: "Unity", source: Box::new(value), @@ -227,7 +224,7 @@ impl FromStr for UnityCatalogConfigKey { Ok(UnityCatalogConfigKey::WorkspaceUrl) } _ => Err(DataCatalogError::UnknownConfigKey { - catalog: "", + catalog: "unity", key: s.to_string(), }), } @@ -471,31 +468,21 @@ pub struct UnityCatalog { } impl UnityCatalog { - async fn get_credential(&self) -> DataCatalogResult { + async fn get_credential(&self) -> Result { match &self.credential { CredentialProvider::BearerToken(token) => { - // we do the conversion to a HeaderValue here, since it is fallible + // we do the conversion to a HeaderValue here, since it is fallible, // and we want to use it in an infallible function - HeaderValue::from_str(&format!("Bearer {token}")).map_err(|err| { - DataCatalogError::Generic { - catalog: "Unity", - source: Box::new(err), - } - }) + Ok(HeaderValue::from_str(&format!("Bearer {token}"))?) } CredentialProvider::TokenCredential(cache, cred) => { let token = cache .get_or_insert_with(|| cred.fetch_token(&self.client)) .await?; - // we do the conversion to a HeaderValue here, since it is fallible + // we do the conversion to a HeaderValue here, since it is fallible, // and we want to use it in an infallible function - HeaderValue::from_str(&format!("Bearer {token}")).map_err(|err| { - DataCatalogError::Generic { - catalog: "Unity", - source: Box::new(err), - } - }) + Ok(HeaderValue::from_str(&format!("Bearer {token}"))?) } } } @@ -618,7 +605,7 @@ impl UnityCatalog { catalog_id: impl AsRef, database_name: impl AsRef, table_name: impl AsRef, - ) -> DataCatalogResult { + ) -> Result { let token = self.get_credential().await?; // https://docs.databricks.com/api-explorer/workspace/tables/get let resp = self @@ -632,22 +619,22 @@ impl UnityCatalog { )) .header(AUTHORIZATION, token) .send() - .await - .map_err(UnityCatalogError::from)?; + .await?; - Ok(resp.json().await.map_err(UnityCatalogError::from)?) + Ok(resp.json().await?) } } #[async_trait::async_trait] impl DataCatalog for UnityCatalog { + type Error = UnityCatalogError; /// Get the table storage location from the UnityCatalog async fn get_table_storage_location( &self, catalog_id: Option, database_name: &str, table_name: &str, - ) -> Result { + ) -> Result { match self .get_table( catalog_id.unwrap_or("main".into()), diff --git a/crates/core/src/data_catalog/mod.rs b/crates/core/src/data_catalog/mod.rs index 8a8fb4548a..fbb44d95c1 100644 --- a/crates/core/src/data_catalog/mod.rs +++ b/crates/core/src/data_catalog/mod.rs @@ -46,11 +46,13 @@ pub enum DataCatalogError { /// Abstractions for data catalog for the Delta table. To add support for new cloud, simply implement this trait. #[async_trait::async_trait] pub trait DataCatalog: Send + Sync + Debug { + type Error; + /// Get the table storage location from the Data Catalog async fn get_table_storage_location( &self, catalog_id: Option, database_name: &str, table_name: &str, - ) -> Result; + ) -> Result; } diff --git a/crates/core/src/data_catalog/storage/mod.rs b/crates/core/src/data_catalog/storage/mod.rs index 110e4aa075..236caf79a8 100644 --- a/crates/core/src/data_catalog/storage/mod.rs +++ b/crates/core/src/data_catalog/storage/mod.rs @@ -88,9 +88,9 @@ impl ListingSchemaProvider { } } -// noramalizes a path fragment to be a valida table name in datafusion +// normalizes a path fragment to be a valida table name in datafusion // - removes some reserved characters (-, +, ., " ") -// - lowecase ascii +// - lowercase ascii fn normalize_table_name(path: &Path) -> Result { Ok(path .file_name()