Skip to content

Commit

Permalink
add try from uri with storage options
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Nov 15, 2023
1 parent cd9dd74 commit 537d8a9
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
13 changes: 13 additions & 0 deletions crates/deltalake-core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! with a [data stream][datafusion::physical_plan::SendableRecordBatchStream],
//! if the operation returns data as well.
use std::collections::{HashMap};
use self::create::CreateBuilder;
use self::filesystem_check::FileSystemCheckBuilder;
use self::vacuum::VacuumBuilder;
Expand Down Expand Up @@ -77,6 +78,18 @@ impl DeltaOps {
Err(err) => Err(err),
}
}


/// try from uri with storage options
pub async fn try_from_uri_with_storage_options(uri: impl AsRef<str>, storage_options: HashMap<String, String>) -> DeltaResult<Self> {
let mut table = DeltaTableBuilder::from_uri(uri).with_storage_options(storage_options).build()?;
// We allow for uninitialized locations, since we may want to create the table
match table.load().await {
Ok(_) => Ok(table.into()),
Err(DeltaTableError::NotATable(_)) => Ok(table.into()),
Err(err) => Err(err),
}
}

/// Create a new [`DeltaOps`] instance, backed by an un-initialized in memory table
///
Expand Down
7 changes: 2 additions & 5 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1157,13 +1157,10 @@ fn write_to_deltalake(

// let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?;


let options = storage_options.clone().unwrap_or_default();
let table = DeltaTableBuilder::from_uri(table_uri).with_storage_options(options);

let table = table.build().map_err(PythonError::from)?;
let table = rt()?.block_on(DeltaOps::try_from_uri_with_storage_options(&table_uri, options)).map_err(PythonError::from)?;

let builder = DeltaOps(table)
let builder = table
.write(batches)
.with_save_mode(mode)
.with_write_batch_size(max_rows_per_group as usize)
Expand Down

0 comments on commit 537d8a9

Please sign in to comment.