Skip to content

Commit

Permalink
feat: move unity catalog integration into its own crate
Browse files Browse the repository at this point in the history
Signed-off-by: Stephen Carman <[email protected]>
  • Loading branch information
hntd187 committed Dec 15, 2024
1 parent cb3fb18 commit 93701f4
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 27 deletions.
4 changes: 3 additions & 1 deletion crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,9 @@ mod tests {
let factory = S3LogStoreFactory::default();
let store = InMemory::new();
let url = Url::parse("s3://test-bucket").unwrap();
unsafe { std::env::remove_var(crate::constants::AWS_S3_LOCKING_PROVIDER); }
unsafe {
std::env::remove_var(crate::constants::AWS_S3_LOCKING_PROVIDER);
}
let logstore = factory
.with_options(Arc::new(store), &url, &StorageOptions::from(HashMap::new()))
.unwrap();
Expand Down
10 changes: 6 additions & 4 deletions crates/aws/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ impl StorageIntegration for S3Integration {
format!("delta_log_it_{}", random::<u16>()),
);
match std::env::var(s3_constants::AWS_ENDPOINT_URL).ok() {
Some(endpoint_url) if endpoint_url.to_lowercase() == "none" => {
unsafe { std::env::remove_var(s3_constants::AWS_ENDPOINT_URL) }
}
Some(endpoint_url) if endpoint_url.to_lowercase() == "none" => unsafe {
std::env::remove_var(s3_constants::AWS_ENDPOINT_URL)
},
Some(_) => (),
None => unsafe { std::env::set_var(s3_constants::AWS_ENDPOINT_URL, "http://localhost:4566") },
None => unsafe {
std::env::set_var(s3_constants::AWS_ENDPOINT_URL, "http://localhost:4566")
},
}
set_env_if_not_set(s3_constants::AWS_ACCESS_KEY_ID, "deltalake");
set_env_if_not_set(s3_constants::AWS_SECRET_ACCESS_KEY, "weloverust");
Expand Down
34 changes: 15 additions & 19 deletions crates/catalog-unity/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ pub mod client;
pub mod credential;
#[cfg(feature = "datafusion")]
pub mod datafusion;
pub mod models;
pub mod error;
pub mod models;

/// Possible errors from the unity-catalog/tables API call
#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -330,7 +330,7 @@ impl UnityCatalogBuilder {
}

/// Hydrate builder from key value pairs
pub fn try_with_options<I: IntoIterator<Item=(impl AsRef<str>, impl Into<String>)>>(
pub fn try_with_options<I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
mut self,
options: I,
) -> DataCatalogResult<Self> {
Expand Down Expand Up @@ -496,17 +496,16 @@ impl UnityCatalog {
/// all catalogs will be retrieved. Otherwise, only catalogs owned by the caller
/// (or for which the caller has the USE_CATALOG privilege) will be retrieved.
/// There is no guarantee of a specific ordering of the elements in the array.
pub async fn list_catalogs(&self) -> DataCatalogResult<ListCatalogsResponse> {
pub async fn list_catalogs(&self) -> Result<ListCatalogsResponse, UnityCatalogError> {
let token = self.get_credential().await?;
// https://docs.databricks.com/api-explorer/workspace/schemas/list
let resp = self
.client
.get(format!("{}/catalogs", self.catalog_url()))
.header(AUTHORIZATION, token)
.send()
.await
.map_err(UnityCatalogError::from)?;
Ok(resp.json().await.map_err(UnityCatalogError::from)?)
.await?;
Ok(resp.json().await?)
}

/// List all schemas for a catalog in the metastore.
Expand All @@ -521,7 +520,7 @@ impl UnityCatalog {
pub async fn list_schemas(
&self,
catalog_name: impl AsRef<str>,
) -> DataCatalogResult<ListSchemasResponse> {
) -> Result<ListSchemasResponse, UnityCatalogError> {
let token = self.get_credential().await?;
// https://docs.databricks.com/api-explorer/workspace/schemas/list
let resp = self
Expand All @@ -530,9 +529,8 @@ impl UnityCatalog {
.header(AUTHORIZATION, token)
.query(&[("catalog_name", catalog_name.as_ref())])
.send()
.await
.map_err(UnityCatalogError::from)?;
Ok(resp.json().await.map_err(UnityCatalogError::from)?)
.await?;
Ok(resp.json().await?)
}

/// Gets the specified schema within the metastore.#
Expand All @@ -543,7 +541,7 @@ impl UnityCatalog {
&self,
catalog_name: impl AsRef<str>,
schema_name: impl AsRef<str>,
) -> DataCatalogResult<GetSchemaResponse> {
) -> Result<GetSchemaResponse, UnityCatalogError> {
let token = self.get_credential().await?;
// https://docs.databricks.com/api-explorer/workspace/schemas/get
let resp = self
Expand All @@ -556,9 +554,8 @@ impl UnityCatalog {
))
.header(AUTHORIZATION, token)
.send()
.await
.map_err(UnityCatalogError::from)?;
Ok(resp.json().await.map_err(UnityCatalogError::from)?)
.await?;
Ok(resp.json().await?)
}

/// Gets an array of summaries for tables for a schema and catalog within the metastore.
Expand All @@ -576,7 +573,7 @@ impl UnityCatalog {
&self,
catalog_name: impl AsRef<str>,
schema_name_pattern: impl AsRef<str>,
) -> DataCatalogResult<ListTableSummariesResponse> {
) -> Result<ListTableSummariesResponse, UnityCatalogError> {
let token = self.get_credential().await?;
// https://docs.databricks.com/api-explorer/workspace/tables/listsummaries
let resp = self
Expand All @@ -588,10 +585,9 @@ impl UnityCatalog {
])
.header(AUTHORIZATION, token)
.send()
.await
.map_err(UnityCatalogError::from)?;
.await?;

Ok(resp.json().await.map_err(UnityCatalogError::from)?)
Ok(resp.json().await?)
}

/// Gets a table from the metastore for a specific catalog and schema.
Expand Down Expand Up @@ -649,7 +645,7 @@ impl DataCatalog for UnityCatalog {
error_code: err.error_code,
message: err.message,
}
.into()),
.into()),
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,13 +482,13 @@ mod datafusion {
use std::collections::HashSet;
use std::sync::Arc;

use ::datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use ::datafusion::physical_optimizer::pruning::PruningStatistics;
use ::datafusion::physical_plan::Accumulator;
use arrow::compute::concat_batches;
use arrow_arith::aggregate::sum;
use arrow_array::{ArrayRef, BooleanArray, Int64Array, UInt64Array};
use arrow_schema::DataType as ArrowDataType;
use ::datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use ::datafusion::physical_optimizer::pruning::PruningStatistics;
use ::datafusion::physical_plan::Accumulator;
use datafusion_common::scalar::ScalarValue;
use datafusion_common::stats::{ColumnStatistics, Precision, Statistics};
use datafusion_common::Column;
Expand Down

0 comments on commit 93701f4

Please sign in to comment.