Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): expose create to DeltaTable class #1912

Closed
wants to merge 11 commits into from
62 changes: 58 additions & 4 deletions crates/deltalake-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
#![deny(warnings)]
#![deny(missing_docs)]
#![allow(rustdoc::invalid_html_tags)]
#![allow(clippy::nonminimal_bool)]

#[cfg(all(feature = "parquet", feature = "parquet2"))]
compile_error!(
Expand Down Expand Up @@ -135,18 +136,22 @@ pub mod test_utils;

/// Creates and loads a DeltaTable from the given path with current metadata.
/// Infers the storage backend to use from the scheme in the given table path.
///
/// Will fail fast if specified `table_uri` is a local path but doesn't exist.
pub async fn open_table(table_uri: impl AsRef<str>) -> Result<DeltaTable, DeltaTableError> {
let table = DeltaTableBuilder::from_uri(table_uri).load().await?;
let table = DeltaTableBuilder::from_valid_uri(table_uri)?.load().await?;
Ok(table)
}

/// Same as `open_table`, but also accepts storage options to aid in building the table for a deduced
/// `StorageService`.
///
/// Will fail fast if specified `table_uri` is a local path but doesn't exist.
pub async fn open_table_with_storage_options(
table_uri: impl AsRef<str>,
storage_options: HashMap<String, String>,
) -> Result<DeltaTable, DeltaTableError> {
let table = DeltaTableBuilder::from_uri(table_uri)
let table = DeltaTableBuilder::from_valid_uri(table_uri)?
.with_storage_options(storage_options)
.load()
.await?;
Expand All @@ -155,11 +160,13 @@ pub async fn open_table_with_storage_options(

/// Creates a DeltaTable from the given path and loads it with the metadata from the given version.
/// Infers the storage backend to use from the scheme in the given table path.
///
/// Will fail fast if specified `table_uri` is a local path but doesn't exist.
pub async fn open_table_with_version(
table_uri: impl AsRef<str>,
version: i64,
) -> Result<DeltaTable, DeltaTableError> {
let table = DeltaTableBuilder::from_uri(table_uri)
let table = DeltaTableBuilder::from_valid_uri(table_uri)?
.with_version(version)
.load()
.await?;
Expand All @@ -169,11 +176,13 @@ pub async fn open_table_with_version(
/// Creates a DeltaTable from the given path.
/// Loads metadata from the version appropriate based on the given ISO-8601/RFC-3339 timestamp.
/// Infers the storage backend to use from the scheme in the given table path.
///
/// Will fail fast if specified `table_uri` is a local path but doesn't exist.
pub async fn open_table_with_ds(
table_uri: impl AsRef<str>,
ds: impl AsRef<str>,
) -> Result<DeltaTable, DeltaTableError> {
let table = DeltaTableBuilder::from_uri(table_uri)
let table = DeltaTableBuilder::from_valid_uri(table_uri)?
.with_datestring(ds)?
.load()
.await?;
Expand Down Expand Up @@ -680,4 +689,49 @@ mod tests {
),]
);
}

#[tokio::test()]
async fn test_version_zero_table_load() {
let path = "./tests/data/COVID-19_NYT";
let mut latest_table: DeltaTable = crate::open_table(path).await.unwrap();

let mut version_0_table = crate::open_table_with_version(path, 0).await.unwrap();

let version_0_history = version_0_table
.history(None)
.await
.expect("Cannot get table history");
let latest_table_history = latest_table
.history(None)
.await
.expect("Cannot get table history");

assert_eq!(latest_table_history, version_0_history);
}

#[tokio::test()]
#[should_panic(expected = "does not exist or you don't have access!")]
async fn test_fail_fast_on_not_existing_path() {
use std::path::Path as FolderPath;

let path_str = "./tests/data/folder_doesnt_exist";

// Check that there is no such path at the beginning
let path_doesnt_exist = !FolderPath::new(path_str).exists();
assert!(path_doesnt_exist);

match crate::open_table(path_str).await {
Ok(table) => Ok(table),
Err(e) => {
let path_still_doesnt_exist = !FolderPath::new(path_str).exists();
assert!(
path_still_doesnt_exist,
"Path exists for some reason, but it shouldn't"
);

Err(e)
}
}
.unwrap();
}
}
3 changes: 3 additions & 0 deletions crates/deltalake-core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ async fn excute_non_empty_expr(
None,
writer_properties,
false,
false,
)
.await?;

Expand Down Expand Up @@ -274,6 +275,8 @@ impl std::future::IntoFuture for DeleteBuilder {
let mut this = self;

Box::pin(async move {
PROTOCOL.check_append_only(&this.snapshot)?;

PROTOCOL.can_write_to(&this.snapshot)?;

let state = this.state.unwrap_or_else(|| {
Expand Down
1 change: 1 addition & 0 deletions crates/deltalake-core/src/operations/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,7 @@ async fn execute(
None,
writer_properties,
safe_cast,
false,
)
.await?;

Expand Down
17 changes: 17 additions & 0 deletions crates/deltalake-core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use self::vacuum::VacuumBuilder;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::table::builder::DeltaTableBuilder;
use crate::DeltaTable;
use std::collections::HashMap;

#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod convert_to_delta;
Expand Down Expand Up @@ -73,6 +74,22 @@ impl DeltaOps {
}
}

/// try from uri with storage options
pub async fn try_from_uri_with_storage_options(
uri: impl AsRef<str>,
storage_options: HashMap<String, String>,
) -> DeltaResult<Self> {
let mut table = DeltaTableBuilder::from_uri(uri)
.with_storage_options(storage_options)
.build()?;
// We allow for uninitialized locations, since we may want to create the table
match table.load().await {
Ok(_) => Ok(table.into()),
Err(DeltaTableError::NotATable(_)) => Ok(table.into()),
Err(err) => Err(err),
}
}

/// Create a new [`DeltaOps`] instance, backed by an un-initialized in memory table
///
/// Using this will not persist any changes beyond the lifetime of the table object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ impl ProtocolChecker {
2
}

/// Check append-only at the high level (operation level)
pub fn check_append_only(&self, snapshot: &DeltaTableState) -> Result<(), TransactionError> {
if snapshot.table_config().append_only() {
return Err(TransactionError::DeltaTableAppendOnly);
}
Ok(())
}

/// Check if delta-rs can read form the given delta table.
pub fn can_read_from(&self, snapshot: &DeltaTableState) -> Result<(), TransactionError> {
let required_features: Option<&HashSet<ReaderFeatures>> =
Expand Down
3 changes: 3 additions & 0 deletions crates/deltalake-core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ async fn execute(
None,
writer_properties,
safe_cast,
false,
)
.await?;

Expand Down Expand Up @@ -427,6 +428,8 @@ impl std::future::IntoFuture for UpdateBuilder {
let mut this = self;

Box::pin(async move {
PROTOCOL.check_append_only(&this.snapshot)?;

PROTOCOL.can_write_to(&this.snapshot)?;

let state = this.state.unwrap_or_else(|| {
Expand Down
Loading