Skip to content

Commit

Permalink
fix datafile path
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKaul committed Jan 23, 2024
1 parent a809c0d commit f553031
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 3 deletions.
9 changes: 7 additions & 2 deletions iceberg-rust/src/arrow/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use iceberg_rust_spec::{
use parquet::arrow::AsyncArrowWriter;
use uuid::Uuid;

use crate::{error::Error, file_format::parquet::parquet_to_datafile, table::Table};
use crate::{
catalog::bucket::parse_bucket, error::Error, file_format::parquet::parquet_to_datafile,
table::Table,
};

use super::partition::partition_record_batches;

Expand Down Expand Up @@ -93,6 +96,7 @@ async fn write_parquet_files(
batches: impl Stream<Item = Result<RecordBatch, ArrowError>> + Send,
object_store: Arc<dyn ObjectStore>,
) -> Result<Vec<DataFile>, ArrowError> {
let bucket = parse_bucket(location)?;
let current_writer = Arc::new(Mutex::new(
create_arrow_writer(location, arrow_schema, object_store.clone()).await?,
));
Expand Down Expand Up @@ -145,6 +149,7 @@ async fn write_parquet_files(
writer_reciever
.then(|writer| {
let object_store = object_store.clone();
let bucket = bucket.to_string();
async move {
let metadata = writer.1.close().await?;
let size = object_store
Expand All @@ -153,7 +158,7 @@ async fn write_parquet_files(
.map_err(|err| ArrowError::from_external_error(err.into()))?
.size;
Ok(parquet_to_datafile(
&writer.0,
&(bucket + &writer.0),
size,
&metadata,
schema,
Expand Down
12 changes: 11 additions & 1 deletion iceberg-rust/src/catalog/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Defining the [Bucket] struct for specifying buckets for the ObjectStore.
*/

use std::sync::Arc;
use std::{fmt::Display, sync::Arc};

use object_store::{
aws::AmazonS3Builder, gcp::GoogleCloudStorageBuilder, local::LocalFileSystem, memory::InMemory,
Expand All @@ -21,6 +21,16 @@ pub enum Bucket<'s> {
Local,
}

impl<'s> Display for Bucket<'s> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Bucket::S3(s) => write!(f, "s3://{}", s),
Bucket::GCS(s) => write!(f, "gcs://{}", s),
Bucket::Local => write!(f, ""),
}
}
}

/// Get the bucket and coud provider from the location string
pub fn parse_bucket(path: &str) -> Result<Bucket, Error> {
if path.starts_with("s3://") {
Expand Down

0 comments on commit f553031

Please sign in to comment.