-
Notifications
You must be signed in to change notification settings - Fork 500
chore: break Glue support into its own crate without rusoto #1825
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
[package] | ||
name = "deltalake-catalog-glue" | ||
version = "0.1.0" | ||
edition = "2021" | ||
|
||
[dependencies] | ||
async-trait = { workspace = true } | ||
aws-config = "0.57.1" | ||
aws-sdk-glue = "0.35.0" | ||
deltalake-core = { path = "../deltalake-core" } | ||
# This can depend on a lowest common denominator of core once that's released | ||
# deltalake_core = { version = "0.17.0" } | ||
log = "0.4" | ||
thiserror = { workspace = true } | ||
|
||
[dev-dependencies] | ||
tokio = { version = "1", features = ["macros", "rt-multi-thread"] } | ||
|
||
[features] | ||
default = [] | ||
native-tls = [] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
|
||
.PHONY: help | ||
help: ## Show this help | ||
@egrep -h '\s##\s' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}' | ||
|
||
.PHONY: all build check test clean | ||
all: check build test ## Perform all the checks builds and testing | ||
|
||
check: ## Ensure that the crate meets the basic formatting and structure | ||
cargo fmt --check | ||
cargo clippy | ||
cargo clippy --features native-tls --no-default-features | ||
|
||
build: ## Build the crate with each set of features | ||
cargo build | ||
cargo build --features native-tls --no-default-features | ||
|
||
test: ## Run the crate's tests with each set of features | ||
cargo test | ||
cargo test --features native-tls --no-default-features | ||
|
||
clean: ## Clean up resources from build | ||
cargo clean |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
use deltalake_catalog_glue::*; | ||
use deltalake_core::*; | ||
|
||
#[tokio::main] | ||
async fn main() { | ||
println!("Reading a table"); | ||
|
||
let catalog = GlueDataCatalog::from_env() | ||
.await | ||
.expect("Failed to load catalog from the environment"); | ||
println!("catalog: {catalog:?}"); | ||
|
||
println!( | ||
"read: {:?}", | ||
catalog | ||
.get_table_storage_location(None, "database", "table") | ||
.await | ||
.expect("Failed") | ||
); | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
//! Glue Data Catalog. | ||
//! | ||
use aws_config::SdkConfig; | ||
use deltalake_core::data_catalog::{DataCatalog, DataCatalogError}; | ||
use log::*; | ||
|
||
#[derive(thiserror::Error, Debug)] | ||
pub enum GlueError { | ||
/// Missing metadata in the catalog | ||
#[error("Missing Metadata {metadata} in the Data Catalog ")] | ||
MissingMetadata { | ||
/// The missing metadata property | ||
metadata: String, | ||
}, | ||
|
||
/// Error calling the AWS SDK | ||
#[error("Failed in an AWS SDK call")] | ||
AWSError { | ||
#[from] | ||
source: aws_sdk_glue::Error, | ||
}, | ||
} | ||
|
||
impl From<GlueError> for DataCatalogError { | ||
fn from(val: GlueError) -> Self { | ||
DataCatalogError::Generic { | ||
catalog: "glue", | ||
source: Box::new(val), | ||
} | ||
} | ||
} | ||
|
||
/// A Glue Data Catalog implement of the `Catalog` trait | ||
pub struct GlueDataCatalog { | ||
client: aws_sdk_glue::Client, | ||
} | ||
|
||
impl GlueDataCatalog { | ||
/// Creates a new GlueDataCatalog with environmental configuration | ||
pub async fn from_env() -> Result<Self, GlueError> { | ||
let config = aws_config::load_from_env().await; | ||
let client = aws_sdk_glue::Client::new(&config); | ||
Ok(Self { client }) | ||
} | ||
|
||
/// Create a new [GlueDataCatalog] with the given [aws_config::SdkConfig] | ||
pub fn with_config(config: &SdkConfig) -> Self { | ||
let client = aws_sdk_glue::Client::new(config); | ||
Self { client } | ||
} | ||
} | ||
|
||
impl std::fmt::Debug for GlueDataCatalog { | ||
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { | ||
write!(fmt, "GlueDataCatalog") | ||
} | ||
} | ||
|
||
// Placeholder suffix created by Spark in the Glue Data Catalog Location | ||
const PLACEHOLDER_SUFFIX: &str = "-__PLACEHOLDER__"; | ||
|
||
#[async_trait::async_trait] | ||
impl DataCatalog for GlueDataCatalog { | ||
/// Get the table storage location from the Glue Data Catalog | ||
async fn get_table_storage_location( | ||
&self, | ||
catalog_id: Option<String>, | ||
database_name: &str, | ||
table_name: &str, | ||
) -> Result<String, DataCatalogError> { | ||
let mut builder = self | ||
.client | ||
.get_table() | ||
.database_name(database_name) | ||
.name(table_name); | ||
|
||
if let Some(catalog) = catalog_id { | ||
builder = builder.catalog_id(catalog); | ||
} | ||
|
||
let response = builder | ||
.send() | ||
.await | ||
.map_err(|e| GlueError::AWSError { source: e.into() }) | ||
.map_err(<GlueError as Into<DataCatalogError>>::into)?; | ||
|
||
let location = response | ||
.table | ||
.ok_or(GlueError::MissingMetadata { | ||
metadata: "Table".to_string(), | ||
}) | ||
.map_err(<GlueError as Into<DataCatalogError>>::into)? | ||
.storage_descriptor | ||
.ok_or(GlueError::MissingMetadata { | ||
metadata: "Storage Descriptor".to_string(), | ||
}) | ||
.map_err(<GlueError as Into<DataCatalogError>>::into)? | ||
.location | ||
.map(|l| l.replace("s3a", "s3")) | ||
.ok_or(GlueError::MissingMetadata { | ||
metadata: "Location".to_string(), | ||
}); | ||
|
||
match location { | ||
Ok(location) => { | ||
if location.ends_with(PLACEHOLDER_SUFFIX) { | ||
Ok(location[..location.len() - PLACEHOLDER_SUFFIX.len()].to_string()) | ||
} else { | ||
Ok(location) | ||
} | ||
} | ||
Err(err) => Err(err.into()), | ||
} | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.