Skip to content

Commit

Permalink
feature: Add support for gcs back
Browse files Browse the repository at this point in the history
  • Loading branch information
timvw committed Mar 30, 2024
1 parent f56674c commit f1f789c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ aws-credential-types = "1.1"
chrono = "0.4"
clap = { version = "4.5", features = ["derive"] }
datafusion = { version = "35", features = ["avro"] }
deltalake = { version = "0.17", default-features = false, features = ["datafusion-ext", "s3"] }
deltalake = { version = "0.17", default-features = false, features = ["datafusion-ext", "s3", "gcs"] }
futures = "0.3"
glob = "0.3"
object_store = { version = "0.9", features = ["aws", "gcp"] }
Expand Down
31 changes: 29 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use datafusion::datasource::TableProvider;
use datafusion::prelude::*;
use deltalake::open_table;
use object_store::aws::{AmazonS3, AmazonS3Builder};
use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder};
use object_store::path::Path;
use object_store::ObjectStore;
use regex::Regex;
Expand All @@ -30,8 +31,6 @@ use url::Url;
use crate::args::Args;

mod args;
mod globbing_path;
mod globbing_table;
mod object_store_util;

#[tokio::main]
Expand Down Expand Up @@ -73,6 +72,20 @@ async fn main() -> Result<()> {
data_path
};

let data_path = if data_path.starts_with("gs://") || data_path.starts_with("gcs://") {
let gcs_url = Url::parse(&data_path)
.map_err(|e| DataFusionError::Execution(format!("Failed to parse url, {e}")))?;
let gcs = build_gcs(&gcs_url).await?;
let gcs_arc = Arc::new(gcs);
ctx.runtime_env().register_object_store(&gcs_url, gcs_arc);

deltalake::gcp::register_handlers(None);

data_path
} else {
data_path
};

let table: Arc<dyn TableProvider> = if let Ok(mut delta_table) = open_table(&data_path).await {
if let Some(at) = args.at {
delta_table.load_with_datetime(at).await?;
Expand Down Expand Up @@ -380,3 +393,17 @@ async fn build_s3(url: &Url, sdk_config: &SdkConfig) -> Result<AmazonS3> {

Ok(s3)
}

async fn build_gcs(gcs_url: &Url) -> Result<GoogleCloudStorage> {
let google_application_credentials = env::var("GOOGLE_APPLICATION_CREDENTIALS")
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let bucket_name = gcs_url.host_str().unwrap();

let gcs_builder = GoogleCloudStorageBuilder::new();
let gcs_builder = gcs_builder.with_bucket_name(bucket_name);
let gcs_builder = gcs_builder.with_service_account_path(google_application_credentials);
let gcs = gcs_builder.build()?;

Ok(gcs)
}

0 comments on commit f1f789c

Please sign in to comment.