From 93701f44ea511048fe00b023be50d051ec0440f9 Mon Sep 17 00:00:00 2001
From: Stephen Carman <hntd187@users.noreply.github.com>
Date: Sun, 15 Dec 2024 12:17:45 -0500
Subject: [PATCH] feat: move unity catalog integration into its own crate

Signed-off-by: Stephen Carman <hntd187@users.noreply.github.com>
---
 crates/aws/src/lib.rs                       |  4 ++-
 crates/aws/tests/common.rs                  | 10 +++---
 crates/catalog-unity/src/lib.rs             | 34 +++++++++------------
 crates/core/src/kernel/snapshot/log_data.rs |  6 ++--
 4 files changed, 27 insertions(+), 27 deletions(-)

diff --git a/crates/aws/src/lib.rs b/crates/aws/src/lib.rs
index 981d368aa8..c062e47334 100644
--- a/crates/aws/src/lib.rs
+++ b/crates/aws/src/lib.rs
@@ -771,7 +771,9 @@ mod tests {
         let factory = S3LogStoreFactory::default();
         let store = InMemory::new();
         let url = Url::parse("s3://test-bucket").unwrap();
-        unsafe { std::env::remove_var(crate::constants::AWS_S3_LOCKING_PROVIDER); }
+        unsafe {
+            std::env::remove_var(crate::constants::AWS_S3_LOCKING_PROVIDER);
+        }
         let logstore = factory
             .with_options(Arc::new(store), &url, &StorageOptions::from(HashMap::new()))
             .unwrap();
diff --git a/crates/aws/tests/common.rs b/crates/aws/tests/common.rs
index 8f4adb7523..e32522e2d3 100644
--- a/crates/aws/tests/common.rs
+++ b/crates/aws/tests/common.rs
@@ -46,11 +46,13 @@ impl StorageIntegration for S3Integration {
             format!("delta_log_it_{}", random::<u16>()),
         );
         match std::env::var(s3_constants::AWS_ENDPOINT_URL).ok() {
-            Some(endpoint_url) if endpoint_url.to_lowercase() == "none" => {
-                unsafe { std::env::remove_var(s3_constants::AWS_ENDPOINT_URL) }
-            }
+            Some(endpoint_url) if endpoint_url.to_lowercase() == "none" => unsafe {
+                std::env::remove_var(s3_constants::AWS_ENDPOINT_URL)
+            },
             Some(_) => (),
-            None => unsafe { std::env::set_var(s3_constants::AWS_ENDPOINT_URL, "http://localhost:4566") },
+            None => unsafe {
+                std::env::set_var(s3_constants::AWS_ENDPOINT_URL, "http://localhost:4566")
+            },
         }
         set_env_if_not_set(s3_constants::AWS_ACCESS_KEY_ID, "deltalake");
         set_env_if_not_set(s3_constants::AWS_SECRET_ACCESS_KEY, "weloverust");
diff --git a/crates/catalog-unity/src/lib.rs b/crates/catalog-unity/src/lib.rs
index b8ccd23865..f5b8d1d08a 100644
--- a/crates/catalog-unity/src/lib.rs
+++ b/crates/catalog-unity/src/lib.rs
@@ -19,8 +19,8 @@ pub mod client;
 pub mod credential;
 #[cfg(feature = "datafusion")]
 pub mod datafusion;
-pub mod models;
 pub mod error;
+pub mod models;
 
 /// Possible errors from the unity-catalog/tables API call
 #[derive(thiserror::Error, Debug)]
@@ -330,7 +330,7 @@ impl UnityCatalogBuilder {
     }
 
     /// Hydrate builder from key value pairs
-    pub fn try_with_options<I: IntoIterator<Item=(impl AsRef<str>, impl Into<String>)>>(
+    pub fn try_with_options<I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
         mut self,
         options: I,
     ) -> DataCatalogResult<Self> {
@@ -496,7 +496,7 @@ impl UnityCatalog {
     /// all catalogs will be retrieved. Otherwise, only catalogs owned by the caller
     /// (or for which the caller has the USE_CATALOG privilege) will be retrieved.
     /// There is no guarantee of a specific ordering of the elements in the array.
-    pub async fn list_catalogs(&self) -> DataCatalogResult<ListCatalogsResponse> {
+    pub async fn list_catalogs(&self) -> Result<ListCatalogsResponse, UnityCatalogError> {
         let token = self.get_credential().await?;
         // https://docs.databricks.com/api-explorer/workspace/schemas/list
         let resp = self
@@ -504,9 +504,8 @@ impl UnityCatalog {
             .get(format!("{}/catalogs", self.catalog_url()))
             .header(AUTHORIZATION, token)
             .send()
-            .await
-            .map_err(UnityCatalogError::from)?;
-        Ok(resp.json().await.map_err(UnityCatalogError::from)?)
+            .await?;
+        Ok(resp.json().await?)
     }
 
     /// List all schemas for a catalog in the metastore.
@@ -521,7 +520,7 @@ impl UnityCatalog {
     pub async fn list_schemas(
         &self,
         catalog_name: impl AsRef<str>,
-    ) -> DataCatalogResult<ListSchemasResponse> {
+    ) -> Result<ListSchemasResponse, UnityCatalogError> {
         let token = self.get_credential().await?;
         // https://docs.databricks.com/api-explorer/workspace/schemas/list
         let resp = self
@@ -530,9 +529,8 @@ impl UnityCatalog {
             .header(AUTHORIZATION, token)
             .query(&[("catalog_name", catalog_name.as_ref())])
             .send()
-            .await
-            .map_err(UnityCatalogError::from)?;
-        Ok(resp.json().await.map_err(UnityCatalogError::from)?)
+            .await?;
+        Ok(resp.json().await?)
     }
 
     /// Gets the specified schema within the metastore.#
@@ -543,7 +541,7 @@ impl UnityCatalog {
         &self,
         catalog_name: impl AsRef<str>,
         schema_name: impl AsRef<str>,
-    ) -> DataCatalogResult<GetSchemaResponse> {
+    ) -> Result<GetSchemaResponse, UnityCatalogError> {
         let token = self.get_credential().await?;
         // https://docs.databricks.com/api-explorer/workspace/schemas/get
         let resp = self
@@ -556,9 +554,8 @@ impl UnityCatalog {
             ))
             .header(AUTHORIZATION, token)
             .send()
-            .await
-            .map_err(UnityCatalogError::from)?;
-        Ok(resp.json().await.map_err(UnityCatalogError::from)?)
+            .await?;
+        Ok(resp.json().await?)
     }
 
     /// Gets an array of summaries for tables for a schema and catalog within the metastore.
@@ -576,7 +573,7 @@ impl UnityCatalog {
         &self,
         catalog_name: impl AsRef<str>,
         schema_name_pattern: impl AsRef<str>,
-    ) -> DataCatalogResult<ListTableSummariesResponse> {
+    ) -> Result<ListTableSummariesResponse, UnityCatalogError> {
         let token = self.get_credential().await?;
         // https://docs.databricks.com/api-explorer/workspace/tables/listsummaries
         let resp = self
@@ -588,10 +585,9 @@ impl UnityCatalog {
             ])
             .header(AUTHORIZATION, token)
             .send()
-            .await
-            .map_err(UnityCatalogError::from)?;
+            .await?;
 
-        Ok(resp.json().await.map_err(UnityCatalogError::from)?)
+        Ok(resp.json().await?)
     }
 
     /// Gets a table from the metastore for a specific catalog and schema.
@@ -649,7 +645,7 @@ impl DataCatalog for UnityCatalog {
                 error_code: err.error_code,
                 message: err.message,
             }
-                .into()),
+            .into()),
         }
     }
 }
diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs
index e9dede10e8..05d1790dc9 100644
--- a/crates/core/src/kernel/snapshot/log_data.rs
+++ b/crates/core/src/kernel/snapshot/log_data.rs
@@ -482,13 +482,13 @@ mod datafusion {
     use std::collections::HashSet;
     use std::sync::Arc;
 
+    use ::datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
+    use ::datafusion::physical_optimizer::pruning::PruningStatistics;
+    use ::datafusion::physical_plan::Accumulator;
     use arrow::compute::concat_batches;
     use arrow_arith::aggregate::sum;
     use arrow_array::{ArrayRef, BooleanArray, Int64Array, UInt64Array};
     use arrow_schema::DataType as ArrowDataType;
-    use ::datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
-    use ::datafusion::physical_optimizer::pruning::PruningStatistics;
-    use ::datafusion::physical_plan::Accumulator;
     use datafusion_common::scalar::ScalarValue;
     use datafusion_common::stats::{ColumnStatistics, Precision, Statistics};
     use datafusion_common::Column;