diff --git a/Cargo.toml b/Cargo.toml index 2b436a8..4a8e313 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ aws-credential-types = "1.1" chrono = "0.4" clap = { version = "4.5", features = ["derive"] } datafusion = { version = "35", features = ["avro"] } -deltalake = { version = "0.17", default-features = false, features = ["datafusion-ext", "s3"] } +deltalake = { version = "0.17", default-features = false, features = ["datafusion-ext", "s3", "gcs"] } futures = "0.3" glob = "0.3" object_store = { version = "0.9", features = ["aws", "gcp"] } diff --git a/src/main.rs b/src/main.rs index 570cdd0..49941bb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,6 +22,7 @@ use datafusion::datasource::TableProvider; use datafusion::prelude::*; use deltalake::open_table; use object_store::aws::{AmazonS3, AmazonS3Builder}; +use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder}; use object_store::path::Path; use object_store::ObjectStore; use regex::Regex; @@ -30,8 +31,6 @@ use url::Url; use crate::args::Args; mod args; -mod globbing_path; -mod globbing_table; mod object_store_util; #[tokio::main] @@ -73,6 +72,20 @@ async fn main() -> Result<()> { data_path }; + let data_path = if data_path.starts_with("gs://") || data_path.starts_with("gcs://") { + let gcs_url = Url::parse(&data_path) + .map_err(|e| DataFusionError::Execution(format!("Failed to parse url, {e}")))?; + let gcs = build_gcs(&gcs_url).await?; + let gcs_arc = Arc::new(gcs); + ctx.runtime_env().register_object_store(&gcs_url, gcs_arc); + + deltalake::gcp::register_handlers(None); + + data_path + } else { + data_path + }; + let table: Arc = if let Ok(mut delta_table) = open_table(&data_path).await { if let Some(at) = args.at { delta_table.load_with_datetime(at).await?; @@ -380,3 +393,17 @@ async fn build_s3(url: &Url, sdk_config: &SdkConfig) -> Result { Ok(s3) } + +async fn build_gcs(gcs_url: &Url) -> Result { + let google_application_credentials = env::var("GOOGLE_APPLICATION_CREDENTIALS") + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + let bucket_name = gcs_url.host_str().unwrap(); + + let gcs_builder = GoogleCloudStorageBuilder::new(); + let gcs_builder = gcs_builder.with_bucket_name(bucket_name); + let gcs_builder = gcs_builder.with_service_account_path(google_application_credentials); + let gcs = gcs_builder.build()?; + + Ok(gcs) +}