diff --git a/iceberg-rust/src/arrow/write.rs b/iceberg-rust/src/arrow/write.rs index bb549992..e941f92b 100644 --- a/iceberg-rust/src/arrow/write.rs +++ b/iceberg-rust/src/arrow/write.rs @@ -23,7 +23,10 @@ use iceberg_rust_spec::{ use parquet::arrow::AsyncArrowWriter; use uuid::Uuid; -use crate::{error::Error, file_format::parquet::parquet_to_datafile, table::Table}; +use crate::{ + catalog::bucket::parse_bucket, error::Error, file_format::parquet::parquet_to_datafile, + table::Table, +}; use super::partition::partition_record_batches; @@ -93,6 +96,7 @@ async fn write_parquet_files( batches: impl Stream> + Send, object_store: Arc, ) -> Result, ArrowError> { + let bucket = parse_bucket(location)?; let current_writer = Arc::new(Mutex::new( create_arrow_writer(location, arrow_schema, object_store.clone()).await?, )); @@ -145,6 +149,7 @@ async fn write_parquet_files( writer_reciever .then(|writer| { let object_store = object_store.clone(); + let bucket = bucket.to_string(); async move { let metadata = writer.1.close().await?; let size = object_store @@ -153,7 +158,7 @@ async fn write_parquet_files( .map_err(|err| ArrowError::from_external_error(err.into()))? .size; Ok(parquet_to_datafile( - &writer.0, + &(bucket + &writer.0), size, &metadata, schema, diff --git a/iceberg-rust/src/catalog/bucket.rs b/iceberg-rust/src/catalog/bucket.rs index 54ae88ef..e159de62 100644 --- a/iceberg-rust/src/catalog/bucket.rs +++ b/iceberg-rust/src/catalog/bucket.rs @@ -2,7 +2,7 @@ Defining the [Bucket] struct for specifying buckets for the ObjectStore. */ -use std::sync::Arc; +use std::{fmt::Display, sync::Arc}; use object_store::{ aws::AmazonS3Builder, gcp::GoogleCloudStorageBuilder, local::LocalFileSystem, memory::InMemory, @@ -21,6 +21,16 @@ pub enum Bucket<'s> { Local, } +impl<'s> Display for Bucket<'s> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Bucket::S3(s) => write!(f, "s3://{}", s), + Bucket::GCS(s) => write!(f, "gcs://{}", s), + Bucket::Local => write!(f, ""), + } + } +} + /// Get the bucket and coud provider from the location string pub fn parse_bucket(path: &str) -> Result { if path.starts_with("s3://") {