diff --git a/crates/deltalake-core/src/lib.rs b/crates/deltalake-core/src/lib.rs index dfdbac97d6..a14a89c33e 100644 --- a/crates/deltalake-core/src/lib.rs +++ b/crates/deltalake-core/src/lib.rs @@ -71,6 +71,7 @@ #![deny(warnings)] #![deny(missing_docs)] #![allow(rustdoc::invalid_html_tags)] +#![allow(clippy::nonminimal_bool)] #[cfg(all(feature = "parquet", feature = "parquet2"))] compile_error!( @@ -135,18 +136,22 @@ pub mod test_utils; /// Creates and loads a DeltaTable from the given path with current metadata. /// Infers the storage backend to use from the scheme in the given table path. +/// +/// Will fail fast if specified `table_uri` is a local path but doesn't exist. pub async fn open_table(table_uri: impl AsRef) -> Result { - let table = DeltaTableBuilder::from_uri(table_uri).load().await?; + let table = DeltaTableBuilder::from_valid_uri(table_uri)?.load().await?; Ok(table) } /// Same as `open_table`, but also accepts storage options to aid in building the table for a deduced /// `StorageService`. +/// +/// Will fail fast if specified `table_uri` is a local path but doesn't exist. pub async fn open_table_with_storage_options( table_uri: impl AsRef, storage_options: HashMap, ) -> Result { - let table = DeltaTableBuilder::from_uri(table_uri) + let table = DeltaTableBuilder::from_valid_uri(table_uri)? .with_storage_options(storage_options) .load() .await?; @@ -155,11 +160,13 @@ pub async fn open_table_with_storage_options( /// Creates a DeltaTable from the given path and loads it with the metadata from the given version. /// Infers the storage backend to use from the scheme in the given table path. +/// +/// Will fail fast if specified `table_uri` is a local path but doesn't exist. pub async fn open_table_with_version( table_uri: impl AsRef, version: i64, ) -> Result { - let table = DeltaTableBuilder::from_uri(table_uri) + let table = DeltaTableBuilder::from_valid_uri(table_uri)? .with_version(version) .load() .await?; @@ -169,11 +176,13 @@ pub async fn open_table_with_version( /// Creates a DeltaTable from the given path. /// Loads metadata from the version appropriate based on the given ISO-8601/RFC-3339 timestamp. /// Infers the storage backend to use from the scheme in the given table path. +/// +/// Will fail fast if specified `table_uri` is a local path but doesn't exist. pub async fn open_table_with_ds( table_uri: impl AsRef, ds: impl AsRef, ) -> Result { - let table = DeltaTableBuilder::from_uri(table_uri) + let table = DeltaTableBuilder::from_valid_uri(table_uri)? .with_datestring(ds)? .load() .await?; @@ -680,4 +689,49 @@ mod tests { ),] ); } + + #[tokio::test()] + async fn test_version_zero_table_load() { + let path = "./tests/data/COVID-19_NYT"; + let mut latest_table: DeltaTable = crate::open_table(path).await.unwrap(); + + let mut version_0_table = crate::open_table_with_version(path, 0).await.unwrap(); + + let version_0_history = version_0_table + .history(None) + .await + .expect("Cannot get table history"); + let latest_table_history = latest_table + .history(None) + .await + .expect("Cannot get table history"); + + assert_eq!(latest_table_history, version_0_history); + } + + #[tokio::test()] + #[should_panic(expected = "does not exist or you don't have access!")] + async fn test_fail_fast_on_not_existing_path() { + use std::path::Path as FolderPath; + + let path_str = "./tests/data/folder_doesnt_exist"; + + // Check that there is no such path at the beginning + let path_doesnt_exist = !FolderPath::new(path_str).exists(); + assert!(path_doesnt_exist); + + match crate::open_table(path_str).await { + Ok(table) => Ok(table), + Err(e) => { + let path_still_doesnt_exist = !FolderPath::new(path_str).exists(); + assert!( + path_still_doesnt_exist, + "Path exists for some reason, but it shouldn't" + ); + + Err(e) + } + } + .unwrap(); + } } diff --git a/crates/deltalake-core/src/operations/delete.rs b/crates/deltalake-core/src/operations/delete.rs index b6c94f423b..7ee70a3a63 100644 --- a/crates/deltalake-core/src/operations/delete.rs +++ b/crates/deltalake-core/src/operations/delete.rs @@ -172,6 +172,7 @@ async fn excute_non_empty_expr( None, writer_properties, false, + false, ) .await?; @@ -274,6 +275,8 @@ impl std::future::IntoFuture for DeleteBuilder { let mut this = self; Box::pin(async move { + PROTOCOL.check_append_only(&this.snapshot)?; + PROTOCOL.can_write_to(&this.snapshot)?; let state = this.state.unwrap_or_else(|| { diff --git a/crates/deltalake-core/src/operations/merge.rs b/crates/deltalake-core/src/operations/merge.rs index 8b0dd56708..061c2eb912 100644 --- a/crates/deltalake-core/src/operations/merge.rs +++ b/crates/deltalake-core/src/operations/merge.rs @@ -1013,6 +1013,7 @@ async fn execute( None, writer_properties, safe_cast, + false, ) .await?; diff --git a/crates/deltalake-core/src/operations/mod.rs b/crates/deltalake-core/src/operations/mod.rs index a81e16578f..473ef1451b 100644 --- a/crates/deltalake-core/src/operations/mod.rs +++ b/crates/deltalake-core/src/operations/mod.rs @@ -13,6 +13,7 @@ use self::vacuum::VacuumBuilder; use crate::errors::{DeltaResult, DeltaTableError}; use crate::table::builder::DeltaTableBuilder; use crate::DeltaTable; +use std::collections::HashMap; #[cfg(all(feature = "arrow", feature = "parquet"))] pub mod convert_to_delta; @@ -73,6 +74,22 @@ impl DeltaOps { } } + /// try from uri with storage options + pub async fn try_from_uri_with_storage_options( + uri: impl AsRef, + storage_options: HashMap, + ) -> DeltaResult { + let mut table = DeltaTableBuilder::from_uri(uri) + .with_storage_options(storage_options) + .build()?; + // We allow for uninitialized locations, since we may want to create the table + match table.load().await { + Ok(_) => Ok(table.into()), + Err(DeltaTableError::NotATable(_)) => Ok(table.into()), + Err(err) => Err(err), + } + } + /// Create a new [`DeltaOps`] instance, backed by an un-initialized in memory table /// /// Using this will not persist any changes beyond the lifetime of the table object. diff --git a/crates/deltalake-core/src/operations/transaction/protocol.rs b/crates/deltalake-core/src/operations/transaction/protocol.rs index 47e4d0a41a..a0ce1df64e 100644 --- a/crates/deltalake-core/src/operations/transaction/protocol.rs +++ b/crates/deltalake-core/src/operations/transaction/protocol.rs @@ -68,6 +68,14 @@ impl ProtocolChecker { 2 } + /// Check append-only at the high level (operation level) + pub fn check_append_only(&self, snapshot: &DeltaTableState) -> Result<(), TransactionError> { + if snapshot.table_config().append_only() { + return Err(TransactionError::DeltaTableAppendOnly); + } + Ok(()) + } + /// Check if delta-rs can read form the given delta table. pub fn can_read_from(&self, snapshot: &DeltaTableState) -> Result<(), TransactionError> { let required_features: Option<&HashSet> = diff --git a/crates/deltalake-core/src/operations/update.rs b/crates/deltalake-core/src/operations/update.rs index 907dec5998..45d0306697 100644 --- a/crates/deltalake-core/src/operations/update.rs +++ b/crates/deltalake-core/src/operations/update.rs @@ -363,6 +363,7 @@ async fn execute( None, writer_properties, safe_cast, + false, ) .await?; @@ -427,6 +428,8 @@ impl std::future::IntoFuture for UpdateBuilder { let mut this = self; Box::pin(async move { + PROTOCOL.check_append_only(&this.snapshot)?; + PROTOCOL.can_write_to(&this.snapshot)?; let state = this.state.unwrap_or_else(|| { diff --git a/crates/deltalake-core/src/operations/write.rs b/crates/deltalake-core/src/operations/write.rs index cb68b72bb2..7f61e46c5c 100644 --- a/crates/deltalake-core/src/operations/write.rs +++ b/crates/deltalake-core/src/operations/write.rs @@ -43,7 +43,7 @@ use super::writer::{DeltaWriter, WriterConfig}; use super::{transaction::commit, CreateBuilder}; use crate::delta_datafusion::DeltaDataChecker; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{Action, Add, Remove, StructType}; +use crate::kernel::{Action, Add, Metadata, Remove, StructType}; use crate::logstore::LogStoreRef; use crate::protocol::{DeltaOperation, SaveMode}; use crate::storage::ObjectStoreRef; @@ -103,12 +103,20 @@ pub struct WriteBuilder { write_batch_size: Option, /// RecordBatches to be written into the table batches: Option>, + /// whether to overwrite the schema + overwrite_schema: bool, /// how to handle cast failures, either return NULL (safe=true) or return ERR (safe=false) safe_cast: bool, /// Parquet writer properties writer_properties: Option, /// Additional metadata to be added to commit app_metadata: Option>, + /// Name of the table, only used when table doesn't exist yet + name: Option, + /// Description of the table, only used when table doesn't exist yet + description: Option, + /// Configurations of the delta table, only used when table doesn't exist + configuration: HashMap>, } impl WriteBuilder { @@ -126,8 +134,12 @@ impl WriteBuilder { write_batch_size: None, batches: None, safe_cast: false, + overwrite_schema: false, writer_properties: None, app_metadata: None, + name: None, + description: None, + configuration: Default::default(), } } @@ -137,6 +149,12 @@ impl WriteBuilder { self } + /// Add overwrite_schema + pub fn with_overwrite_schema(mut self, overwrite_schema: bool) -> Self { + self.overwrite_schema = overwrite_schema; + self + } + /// When using `Overwrite` mode, replace data that matches a predicate pub fn with_replace_where(mut self, predicate: impl Into) -> Self { self.predicate = Some(predicate.into()); @@ -205,6 +223,31 @@ impl WriteBuilder { self } + /// Specify the table name. Optionally qualified with + /// a database name [database_name.] table_name. + pub fn with_table_name(mut self, name: impl Into) -> Self { + self.name = Some(name.into()); + self + } + + /// Comment to describe the table. + pub fn with_description(mut self, description: impl Into) -> Self { + self.description = Some(description.into()); + self + } + + /// Set configuration on created table + pub fn with_configuration( + mut self, + configuration: impl IntoIterator, Option>)>, + ) -> Self { + self.configuration = configuration + .into_iter() + .map(|(k, v)| (k.into(), v.map(|s| s.into()))) + .collect(); + self + } + async fn check_preconditions(&self) -> DeltaResult> { match self.log_store.is_delta_table_location().await? { true => { @@ -229,10 +272,20 @@ impl WriteBuilder { }?; let mut builder = CreateBuilder::new() .with_log_store(self.log_store.clone()) - .with_columns(schema.fields().clone()); + .with_columns(schema.fields().clone()) + .with_configuration(self.configuration.clone()); if let Some(partition_columns) = self.partition_columns.as_ref() { builder = builder.with_partition_columns(partition_columns.clone()) } + + if let Some(name) = self.name.as_ref() { + builder = builder.with_table_name(name.clone()); + }; + + if let Some(desc) = self.description.as_ref() { + builder = builder.with_comment(desc.clone()); + }; + let (_, actions, _) = builder.into_table_and_actions()?; Ok(actions) } @@ -251,6 +304,7 @@ pub(crate) async fn write_execution_plan( write_batch_size: Option, writer_properties: Option, safe_cast: bool, + overwrite_schema: bool, ) -> DeltaResult> { let invariants = snapshot .current_metadata() @@ -258,7 +312,11 @@ pub(crate) async fn write_execution_plan( .unwrap_or_default(); // Use input schema to prevent wrapping partitions columns into a dictionary. - let schema = snapshot.input_schema().unwrap_or(plan.schema()); + let schema: ArrowSchemaRef = if overwrite_schema { + plan.schema() + } else { + snapshot.input_schema().unwrap_or(plan.schema()) + }; let checker = DeltaDataChecker::new(invariants); @@ -313,6 +371,10 @@ impl std::future::IntoFuture for WriteBuilder { let mut this = self; Box::pin(async move { + if this.mode == SaveMode::Overwrite { + PROTOCOL.check_append_only(&this.snapshot)?; + } + // Create table actions to initialize table in case it does not yet exist and should be created let mut actions = this.check_preconditions().await?; @@ -339,13 +401,14 @@ impl std::future::IntoFuture for WriteBuilder { Ok(this.partition_columns.unwrap_or_default()) }?; + let mut schema: ArrowSchemaRef = arrow_schema::Schema::empty().into(); let plan = if let Some(plan) = this.input { Ok(plan) } else if let Some(batches) = this.batches { if batches.is_empty() { Err(WriteError::MissingData) } else { - let schema = batches[0].schema(); + schema = batches[0].schema(); let table_schema = this .snapshot .physical_arrow_schema(this.log_store.object_store().clone()) @@ -353,9 +416,11 @@ impl std::future::IntoFuture for WriteBuilder { .or_else(|_| this.snapshot.arrow_schema()) .unwrap_or(schema.clone()); - if !can_cast_batch(schema.fields(), table_schema.fields()) { + if !can_cast_batch(schema.fields(), table_schema.fields()) + && !(this.overwrite_schema && matches!(this.mode, SaveMode::Overwrite)) + { return Err(DeltaTableError::Generic( - "Updating table schema not yet implemented".to_string(), + "Schema of data does not match table schema".to_string(), )); }; @@ -390,7 +455,7 @@ impl std::future::IntoFuture for WriteBuilder { vec![batches] }; - Ok(Arc::new(MemoryExec::try_new(&data, schema, None)?) + Ok(Arc::new(MemoryExec::try_new(&data, schema.clone(), None)?) as Arc) } } else { @@ -415,12 +480,31 @@ impl std::future::IntoFuture for WriteBuilder { this.write_batch_size, this.writer_properties, this.safe_cast, + this.overwrite_schema, ) .await?; actions.extend(add_actions.into_iter().map(Action::Add)); // Collect remove actions if we are overwriting the table if matches!(this.mode, SaveMode::Overwrite) { + // Update metadata with new schema + let table_schema = this + .snapshot + .physical_arrow_schema(this.log_store.object_store().clone()) + .await + .or_else(|_| this.snapshot.arrow_schema()) + .unwrap_or(schema.clone()); + + if schema != table_schema { + let mut metadata = this + .snapshot + .current_metadata() + .ok_or(DeltaTableError::NoMetadata)? + .clone(); + metadata.schema = schema.clone().try_into()?; + let metadata_action = Metadata::try_from(metadata)?; + actions.push(Action::Metadata(metadata_action)); + } // This should never error, since now() will always be larger than UNIX_EPOCH let deletion_timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -445,7 +529,10 @@ impl std::future::IntoFuture for WriteBuilder { match this.predicate { Some(_pred) => { - todo!("Overwriting data based on predicate is not yet implemented") + return Err(DeltaTableError::Generic( + "Overwriting data based on predicate is not yet implemented" + .to_string(), + )); } _ => { let remove_actions = this diff --git a/crates/deltalake-core/src/protocol/checkpoints.rs b/crates/deltalake-core/src/protocol/checkpoints.rs index 837483c35c..ef521159d9 100644 --- a/crates/deltalake-core/src/protocol/checkpoints.rs +++ b/crates/deltalake-core/src/protocol/checkpoints.rs @@ -468,22 +468,19 @@ fn apply_stats_conversion( data_type: &DataType, ) { if path.len() == 1 { - match data_type { - DataType::Primitive(PrimitiveType::Timestamp) => { - let v = context.get_mut(&path[0]); - - if let Some(v) = v { - let ts = v - .as_str() - .and_then(|s| time_utils::timestamp_micros_from_stats_string(s).ok()) - .map(|n| Value::Number(serde_json::Number::from(n))); - - if let Some(ts) = ts { - *v = ts; - } + if let DataType::Primitive(PrimitiveType::Timestamp) = data_type { + let v = context.get_mut(&path[0]); + + if let Some(v) = v { + let ts = v + .as_str() + .and_then(|s| time_utils::timestamp_micros_from_stats_string(s).ok()) + .map(|n| Value::Number(serde_json::Number::from(n))); + + if let Some(ts) = ts { + *v = ts; } } - _ => { /* noop */ } } } else { let next_context = context.get_mut(&path[0]).and_then(|v| v.as_object_mut()); diff --git a/crates/deltalake-core/src/protocol/mod.rs b/crates/deltalake-core/src/protocol/mod.rs index 264a45eb8b..56edde7f78 100644 --- a/crates/deltalake-core/src/protocol/mod.rs +++ b/crates/deltalake-core/src/protocol/mod.rs @@ -578,7 +578,7 @@ impl DeltaOperation { } /// The SaveMode used when performing a DeltaOperation -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub enum SaveMode { /// Files will be appended to the target location. Append, diff --git a/crates/deltalake-core/src/schema/partitions.rs b/crates/deltalake-core/src/schema/partitions.rs index bc69d019fd..4e8830596c 100644 --- a/crates/deltalake-core/src/schema/partitions.rs +++ b/crates/deltalake-core/src/schema/partitions.rs @@ -2,6 +2,8 @@ use std::convert::TryFrom; +use chrono::{NaiveDateTime, ParseResult}; + use crate::errors::DeltaTableError; use crate::kernel::{DataType, PrimitiveType}; use std::cmp::Ordering; @@ -40,6 +42,13 @@ pub struct PartitionFilter { pub value: PartitionValue, } +fn parse_timestamp(timestamp_str: &str) -> ParseResult { + // Timestamp format as per https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization + let format = "%Y-%m-%d %H:%M:%S%.f"; + + NaiveDateTime::parse_from_str(timestamp_str, format) +} + fn compare_typed_value( partition_value: &str, filter_value: &str, @@ -64,6 +73,13 @@ fn compare_typed_value( } _ => None, }, + PrimitiveType::Timestamp => match parse_timestamp(filter_value) { + Ok(parsed_filter_value) => { + let parsed_partition_value = parse_timestamp(partition_value).unwrap(); + parsed_partition_value.partial_cmp(&parsed_filter_value) + } + _ => None, + }, _ => partition_value.partial_cmp(filter_value), }, _ => partition_value.partial_cmp(filter_value), @@ -79,8 +95,24 @@ impl PartitionFilter { } match &self.value { - PartitionValue::Equal(value) => value == &partition.value, - PartitionValue::NotEqual(value) => value != &partition.value, + PartitionValue::Equal(value) => { + if let DataType::Primitive(PrimitiveType::Timestamp) = data_type { + compare_typed_value(&partition.value, value, data_type) + .map(|x| x.is_eq()) + .unwrap_or(false) + } else { + value == &partition.value + } + } + PartitionValue::NotEqual(value) => { + if let DataType::Primitive(PrimitiveType::Timestamp) = data_type { + compare_typed_value(&partition.value, value, data_type) + .map(|x| !x.is_eq()) + .unwrap_or(false) + } else { + value != &partition.value + } + } PartitionValue::GreaterThan(value) => { compare_typed_value(&partition.value, value, data_type) .map(|x| x.is_gt()) diff --git a/crates/deltalake-core/src/table/builder.rs b/crates/deltalake-core/src/table/builder.rs index 2a4f8aca41..89962ed518 100644 --- a/crates/deltalake-core/src/table/builder.rs +++ b/crates/deltalake-core/src/table/builder.rs @@ -135,6 +135,11 @@ impl DeltaTableLoadOptions { } } +enum UriType { + LocalPath(PathBuf), + Url(Url), +} + /// builder for configuring a delta table load. #[derive(Debug)] pub struct DeltaTableBuilder { @@ -154,6 +159,19 @@ impl DeltaTableBuilder { } } + /// Creates `DeltaTableBuilder` from verified table uri. + /// Will fail fast if specified `table_uri` is a local path but doesn't exist. + pub fn from_valid_uri(table_uri: impl AsRef) -> DeltaResult { + let table_uri = table_uri.as_ref(); + + if let UriType::LocalPath(path) = resolve_uri_type(table_uri)? { + if !path.exists() { + panic!("Path \"{table_uri}\" does not exist or you don't have access!"); + } + } + + Ok(DeltaTableBuilder::from_uri(table_uri)) + } /// Sets `require_tombstones=false` to the builder pub fn without_tombstones(mut self) -> Self { self.options.require_tombstones = false; @@ -391,6 +409,30 @@ lazy_static::lazy_static! { ]); } +/// Utility function to figure out whether string representation of the path +/// is either local path or some kind or URL. +/// +/// Will return an error if the path is not valid. +fn resolve_uri_type(table_uri: impl AsRef) -> DeltaResult { + let table_uri = table_uri.as_ref(); + + if let Ok(url) = Url::parse(table_uri) { + if url.scheme() == "file" { + Ok(UriType::LocalPath(url.to_file_path().map_err(|err| { + let msg = format!("Invalid table location: {}\nError: {:?}", table_uri, err); + DeltaTableError::InvalidTableLocation(msg) + })?)) + // NOTE this check is required to support absolute windows paths which may properly parse as url + } else if KNOWN_SCHEMES.contains(&url.scheme()) { + Ok(UriType::Url(url)) + } else { + Ok(UriType::LocalPath(PathBuf::from(table_uri))) + } + } else { + Ok(UriType::LocalPath(PathBuf::from(table_uri))) + } +} + /// Attempt to create a Url from given table location. /// /// The location could be: @@ -405,25 +447,7 @@ lazy_static::lazy_static! { pub fn ensure_table_uri(table_uri: impl AsRef) -> DeltaResult { let table_uri = table_uri.as_ref(); - enum UriType { - LocalPath(PathBuf), - Url(Url), - } - let uri_type: UriType = if let Ok(url) = Url::parse(table_uri) { - if url.scheme() == "file" { - UriType::LocalPath(url.to_file_path().map_err(|err| { - let msg = format!("Invalid table location: {}\nError: {:?}", table_uri, err); - DeltaTableError::InvalidTableLocation(msg) - })?) - // NOTE this check is required to support absolute windows paths which may properly parse as url - } else if KNOWN_SCHEMES.contains(&url.scheme()) { - UriType::Url(url) - } else { - UriType::LocalPath(PathBuf::from(table_uri)) - } - } else { - UriType::LocalPath(PathBuf::from(table_uri)) - }; + let uri_type: UriType = resolve_uri_type(table_uri)?; // If it is a local path, we need to create it if it does not exist. let mut url = match uri_type { @@ -465,7 +489,7 @@ mod tests { #[test] fn test_ensure_table_uri() { - // parse an exisiting relative directory + // parse an existing relative directory let uri = ensure_table_uri("."); assert!(uri.is_ok()); let _uri = ensure_table_uri("./nonexistent"); diff --git a/crates/deltalake-core/src/writer/utils.rs b/crates/deltalake-core/src/writer/utils.rs index 49c3c6bfee..173340f368 100644 --- a/crates/deltalake-core/src/writer/utils.rs +++ b/crates/deltalake-core/src/writer/utils.rs @@ -5,13 +5,14 @@ use std::io::Write; use std::sync::Arc; use arrow::array::{ - as_boolean_array, as_generic_binary_array, as_primitive_array, as_string_array, Array, + as_boolean_array, as_generic_binary_array, as_largestring_array, as_primitive_array, + as_string_array, Array, }; use arrow::datatypes::{ - DataType, Date32Type, Date64Type, Int16Type, Int32Type, Int64Type, Int8Type, - Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit, TimestampMicrosecondType, - TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, - UInt64Type, UInt8Type, + DataType, Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, + Int8Type, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit, + TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, + TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; use arrow::json::ReaderBuilder; use arrow::record_batch::*; @@ -184,7 +185,10 @@ pub(crate) fn stringified_partition_value( DataType::UInt16 => as_primitive_array::(arr).value(0).to_string(), DataType::UInt32 => as_primitive_array::(arr).value(0).to_string(), DataType::UInt64 => as_primitive_array::(arr).value(0).to_string(), + DataType::Float32 => as_primitive_array::(arr).value(0).to_string(), + DataType::Float64 => as_primitive_array::(arr).value(0).to_string(), DataType::Utf8 => as_string_array(arr).value(0).to_string(), + DataType::LargeUtf8 => as_largestring_array(arr).value(0).to_string(), DataType::Boolean => as_boolean_array(arr).value(0).to_string(), DataType::Date32 => as_primitive_array::(arr) .value_as_date(0) diff --git a/crates/deltalake-core/tests/read_delta_partitions_test.rs b/crates/deltalake-core/tests/read_delta_partitions_test.rs index 514cdefde8..182502fb13 100644 --- a/crates/deltalake-core/tests/read_delta_partitions_test.rs +++ b/crates/deltalake-core/tests/read_delta_partitions_test.rs @@ -56,6 +56,23 @@ fn test_match_partition() { assert!(partition_year_2020_filter.match_partition(&partition_2020, &string_type)); assert!(!partition_year_2020_filter.match_partition(&partition_2019, &string_type)); assert!(!partition_month_12_filter.match_partition(&partition_2019, &string_type)); + + let partition_2020_12_31_23_59_59 = deltalake_core::DeltaTablePartition { + key: "time".to_string(), + value: "2020-12-31 23:59:59".to_string(), + }; + + let partition_time_2020_12_31_23_59_59_filter = deltalake_core::PartitionFilter { + key: "time".to_string(), + value: deltalake_core::PartitionValue::Equal("2020-12-31 23:59:59.000000".to_string()), + }; + + assert!(partition_time_2020_12_31_23_59_59_filter.match_partition( + &partition_2020_12_31_23_59_59, + &DataType::Primitive(PrimitiveType::Timestamp) + )); + assert!(!partition_time_2020_12_31_23_59_59_filter + .match_partition(&partition_2020_12_31_23_59_59, &string_type)); } #[test] diff --git a/docs/integrations/delta-lake-arrow.md b/docs/integrations/delta-lake-arrow.md new file mode 100644 index 0000000000..6da4d5fcc2 --- /dev/null +++ b/docs/integrations/delta-lake-arrow.md @@ -0,0 +1,108 @@ +# Delta Lake Arrow Integrations + +Delta Lake tables can be exposed as Arrow tables and Arrow datasets, which allows for interoperability with a variety of query engines. + +This page shows you how to convert Delta tables to Arrow data structures and teaches you the difference between Arrow tables and Arrow datasets. + +## Delta Lake to Arrow Dataset + +Delta tables can easily be exposed as Arrow datasets. This makes it easy for any query engine that can read Arrow datasets to read a Delta table. + +Let's take a look at the h2o groupby dataset that contains 9 columns of data. Here are three representative rows of data: + +``` ++-------+-------+--------------+-------+-------+--------+------+------+---------+ +| id1 | id2 | id3 | id4 | id5 | id6 | v1 | v2 | v3 | +|-------+-------+--------------+-------+-------+--------+------+------+---------| +| id016 | id046 | id0000109363 | 88 | 13 | 146094 | 4 | 6 | 18.8377 | +| id039 | id087 | id0000466766 | 14 | 30 | 111330 | 4 | 14 | 46.7973 | +| id047 | id098 | id0000307804 | 85 | 23 | 187639 | 3 | 5 | 47.5773 | ++-------+-------+--------------+-------+-------+--------+------+------+---------+ +``` + +Here's how to expose the Delta table as a PyArrow dataset and run a query with DuckDB: + +```python +import duckdb +from deltalake import DeltaTable + +table = DeltaTable("delta/G1_1e9_1e2_0_0") +dataset = table.to_pyarrow_dataset() +quack = duckdb.arrow(dataset) +quack.filter("id1 = 'id016' and v2 > 10") +``` + +Here's the result: + +``` +┌─────────┬─────────┬──────────────┬───────┬───────┬─────────┬───────┬───────┬───────────┐ +│ id1 │ id2 │ id3 │ id4 │ id5 │ id6 │ v1 │ v2 │ v3 │ +│ varchar │ varchar │ varchar │ int32 │ int32 │ int32 │ int32 │ int32 │ double │ +├─────────┼─────────┼──────────────┼───────┼───────┼─────────┼───────┼───────┼───────────┤ +│ id016 │ id054 │ id0002309114 │ 62 │ 95 │ 7180859 │ 4 │ 13 │ 7.750173 │ +│ id016 │ id044 │ id0003968533 │ 63 │ 98 │ 2356363 │ 4 │ 14 │ 3.942417 │ +│ id016 │ id034 │ id0001082839 │ 58 │ 73 │ 8039808 │ 5 │ 12 │ 76.820135 │ +├─────────┴─────────┴──────────────┴───────┴───────┴─────────┴───────┴───────┴───────────┤ +│ ? rows (>9999 rows, 3 shown) 9 columns │ +└────────────────────────────────────────────────────────────────────────────────────────┘ +``` + +Arrow datasets allow for the predicates to get pushed down to the query engine, so the query is executed quickly. + +## Delta Lake to Arrow Table + +You can also run the same query with DuckDB on an Arrow table: + +```python +quack = duckdb.arrow(table.to_pyarrow_table()) +quack.filter("id1 = 'id016' and v2 > 10") +``` + +This returns the same result, but it runs slower. + +## Difference between Arrow Dataset and Arrow Table + +Arrow Datasets are lazy and allow for full predicate pushdown unlike Arrow tables which are eagerly loaded into memory. + +The previous DuckDB queries were run on a 1 billion row dataset that's roughly 50 GB when stored as an uncompressed CSV file. Here are the runtimes when the data is stored in a Delta table and the queries are executed on a 2021 Macbook M1 with 64 GB of RAM: + +* Arrow table: 17.1 seconds +* Arrow dataset: 0.01 seconds + +The query runs much faster on an Arrow dataset because the predicates can be pushed down to the query engine and lots of data can be skipped. + +Arrow tables are eagerly materialized in memory and don't allow for the same amount of data skipping. + +## Multiple query engines can query Arrow Datasets + +Other query engines like DataFusion can also query Arrow datasets, see the following example: + +```python +from datafusion import SessionContext + +ctx = SessionContext() +ctx.register_dataset("my_dataset", table.to_pyarrow_dataset()) +ctx.sql("select * from my_dataset where v2 > 5") +``` + +Here's the result: + +``` ++-------+-------+--------------+-----+-----+--------+----+----+-----------+ +| id1 | id2 | id3 | id4 | id5 | id6 | v1 | v2 | v3 | ++-------+-------+--------------+-----+-----+--------+----+----+-----------+ +| id082 | id049 | id0000022715 | 97 | 55 | 756924 | 2 | 11 | 74.161136 | +| id053 | id052 | id0000113549 | 19 | 56 | 139048 | 1 | 10 | 95.178444 | +| id090 | id043 | id0000637409 | 94 | 50 | 12448 | 3 | 12 | 60.21896 | ++-------+-------+--------------+-----+-----+--------+----+----+-----------+ +``` + +Any query engine that's capable of reading an Arrow table/dataset can read a Delta table. + +## Conclusion + +Delta tables can easily be exposed as Arrow tables/datasets. + +Therefore any query engine that can read an Arrow table/dataset can also read a Delta table. + +Arrow datasets allow for more predicates to be pushed down to the query engine, so they can perform better performance than Arrow tables. diff --git a/mkdocs.yml b/mkdocs.yml index 7b3e2ccee6..e5f24ced2c 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -14,6 +14,8 @@ theme: - navigation.tabs.sticky - navigation.footer - content.tabs.link + - content.code.annotation + - content.code.copy nav: - Home: index.md - Usage: @@ -35,6 +37,7 @@ nav: - api/schema.md - api/storage.md - Integrations: + - Arrow: integrations/delta-lake-arrow.md - pandas: integrations/delta-lake-pandas.md not_in_nav: | /_build/ @@ -80,6 +83,11 @@ plugins: on_page_markdown: 'docs._build.hooks:on_page_markdown' markdown_extensions: + - pymdownx.highlight: + anchor_linenums: true + line_spans: __span + pygments_lang_class: true + - pymdownx.inlinehilite - admonition - pymdownx.details - attr_list diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index f751afa36f..642a85b96a 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -140,6 +140,19 @@ def write_new_deltalake( configuration: Optional[Mapping[str, Optional[str]]], storage_options: Optional[Dict[str, str]], ) -> None: ... +def write_to_deltalake( + table_uri: str, + data: pyarrow.RecordBatchReader, + partition_by: Optional[List[str]], + mode: str, + max_rows_per_group: int, + overwrite_schema: bool, + predicate: Optional[str], + name: Optional[str], + description: Optional[str], + configuration: Optional[Mapping[str, Optional[str]]], + storage_options: Optional[Dict[str, str]], +) -> None: ... def convert_to_deltalake( uri: str, partition_by: Optional[pyarrow.Schema], @@ -150,6 +163,16 @@ def convert_to_deltalake( storage_options: Optional[Dict[str, str]], custom_metadata: Optional[Dict[str, str]], ) -> None: ... +def create_deltalake( + table_uri: str, + schema: pyarrow.Schema, + partition_by: List[str], + mode: str, + name: Optional[str], + description: Optional[str], + configuration: Optional[Mapping[str, Optional[str]]], + storage_options: Optional[Dict[str, str]], +) -> None: ... def batch_distinct(batch: pyarrow.RecordBatch) -> pyarrow.RecordBatch: ... # Can't implement inheritance (see note in src/schema.rs), so this is next diff --git a/python/deltalake/table.py b/python/deltalake/table.py index b238af7929..1973254730 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -12,6 +12,8 @@ Generator, Iterable, List, + Literal, + Mapping, NamedTuple, Optional, Tuple, @@ -35,11 +37,12 @@ from deltalake._internal import DeltaDataChecker as _DeltaDataChecker from deltalake._internal import RawDeltaTable +from deltalake._internal import create_deltalake as _create_deltalake from deltalake._util import encode_partition_value from deltalake.data_catalog import DataCatalog from deltalake.exceptions import DeltaProtocolError from deltalake.fs import DeltaStorageHandler -from deltalake.schema import Schema +from deltalake.schema import Schema as DeltaSchema MAX_SUPPORTED_READER_VERSION = 1 MAX_SUPPORTED_WRITER_VERSION = 2 @@ -295,6 +298,73 @@ def from_data_catalog( table_uri=table_uri, version=version, log_buffer_size=log_buffer_size ) + @classmethod + def create( + cls, + table_uri: Union[str, Path], + schema: Union[pyarrow.Schema, DeltaSchema], + mode: Literal["error", "append", "overwrite", "ignore"] = "error", + partition_by: Optional[Union[List[str], str]] = None, + name: Optional[str] = None, + description: Optional[str] = None, + configuration: Optional[Mapping[str, Optional[str]]] = None, + storage_options: Optional[Dict[str, str]] = None, + ) -> "DeltaTable": + """`CREATE` or `CREATE_OR_REPLACE` a delta table given a table_uri. + + Args: + table_uri: URI of a table + schema: Table schema + mode: How to handle existing data. Default is to error if table already exists. + If 'append', returns not support error if table exists. + If 'overwrite', will `CREATE_OR_REPLACE` table. + If 'ignore', will not do anything if table already exists. Defaults to "error". + partition_by: List of columns to partition the table by. + name: User-provided identifier for this table. + description: User-provided description for this table. + configuration: A map containing configuration options for the metadata action. + storage_options: options passed to the object store crate. + + Returns: + DeltaTable: created delta table + + **Examples:** + ``` .py + import pyarrow as pa + + from deltalake import DeltaTable + + dt = DeltaTable.create( + table_uri="my_local_table", + schema=pa.schema( + [pa.field("foo", pa.string()), pa.field("bar", pa.string())] + ), + mode="error", + partition_by="bar", + ) + ``` + """ + if isinstance(schema, DeltaSchema): + schema = schema.to_pyarrow() + if isinstance(partition_by, str): + partition_by = [partition_by] + + if isinstance(table_uri, Path): + table_uri = str(table_uri) + + _create_deltalake( + table_uri, + schema, + partition_by or [], + mode, + name, + description, + configuration, + storage_options, + ) + + return cls(table_uri=table_uri, storage_options=storage_options) + def version(self) -> int: """ Get the version of the DeltaTable. @@ -410,7 +480,7 @@ def load_with_datetime(self, datetime_string: str) -> None: def table_uri(self) -> str: return self._table.table_uri() - def schema(self) -> Schema: + def schema(self) -> DeltaSchema: """ Get the current schema of the DeltaTable. diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 065803f5c7..2b4814f98b 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -17,9 +17,11 @@ Optional, Tuple, Union, + overload, ) from urllib.parse import unquote +from deltalake import Schema from deltalake.fs import DeltaStorageHandler from ._util import encode_partition_value @@ -37,7 +39,8 @@ from ._internal import DeltaDataChecker as _DeltaDataChecker from ._internal import batch_distinct from ._internal import convert_to_deltalake as _convert_to_deltalake -from ._internal import write_new_deltalake as _write_new_deltalake +from ._internal import write_new_deltalake as write_deltalake_pyarrow +from ._internal import write_to_deltalake as write_deltalake_rust from .exceptions import DeltaProtocolError, TableNotFoundError from .schema import ( convert_pyarrow_dataset, @@ -67,6 +70,7 @@ class AddAction: stats: str +@overload def write_deltalake( table_or_uri: Union[str, Path, DeltaTable], data: Union[ @@ -78,7 +82,68 @@ def write_deltalake( RecordBatchReader, ], *, - schema: Optional[pa.Schema] = None, + schema: Optional[Union[pa.Schema, Schema]] = ..., + partition_by: Optional[Union[List[str], str]] = ..., + filesystem: Optional[pa_fs.FileSystem] = None, + mode: Literal["error", "append", "overwrite", "ignore"] = ..., + file_options: Optional[ds.ParquetFileWriteOptions] = ..., + max_partitions: Optional[int] = ..., + max_open_files: int = ..., + max_rows_per_file: int = ..., + min_rows_per_group: int = ..., + max_rows_per_group: int = ..., + name: Optional[str] = ..., + description: Optional[str] = ..., + configuration: Optional[Mapping[str, Optional[str]]] = ..., + overwrite_schema: bool = ..., + storage_options: Optional[Dict[str, str]] = ..., + partition_filters: Optional[List[Tuple[str, str, Any]]] = ..., + large_dtypes: bool = ..., + engine: Literal["pyarrow"] = ..., +) -> None: + ... + + +@overload +def write_deltalake( + table_or_uri: Union[str, Path, DeltaTable], + data: Union[ + "pd.DataFrame", + ds.Dataset, + pa.Table, + pa.RecordBatch, + Iterable[pa.RecordBatch], + RecordBatchReader, + ], + *, + schema: Optional[Union[pa.Schema, Schema]] = ..., + partition_by: Optional[Union[List[str], str]] = ..., + mode: Literal["error", "append", "overwrite", "ignore"] = ..., + max_rows_per_group: int = ..., + name: Optional[str] = ..., + description: Optional[str] = ..., + configuration: Optional[Mapping[str, Optional[str]]] = ..., + overwrite_schema: bool = ..., + storage_options: Optional[Dict[str, str]] = ..., + predicate: Optional[str] = ..., + large_dtypes: bool = ..., + engine: Literal["rust"], +) -> None: + ... + + +def write_deltalake( + table_or_uri: Union[str, Path, DeltaTable], + data: Union[ + "pd.DataFrame", + ds.Dataset, + pa.Table, + pa.RecordBatch, + Iterable[pa.RecordBatch], + RecordBatchReader, + ], + *, + schema: Optional[Union[pa.Schema, Schema]] = None, partition_by: Optional[Union[List[str], str]] = None, filesystem: Optional[pa_fs.FileSystem] = None, mode: Literal["error", "append", "overwrite", "ignore"] = "error", @@ -94,7 +159,9 @@ def write_deltalake( overwrite_schema: bool = False, storage_options: Optional[Dict[str, str]] = None, partition_filters: Optional[List[Tuple[str, str, Any]]] = None, + predicate: Optional[str] = None, large_dtypes: bool = False, + engine: Literal["pyarrow", "rust"] = "pyarrow", ) -> None: """Write to a Delta Lake table @@ -140,20 +207,20 @@ def write_deltalake( file_options: Optional write options for Parquet (ParquetFileWriteOptions). Can be provided with defaults using ParquetFileWriteOptions().make_write_options(). Please refer to https://github.com/apache/arrow/blob/master/python/pyarrow/_dataset_parquet.pyx#L492-L533 - for the list of available options - max_partitions: the maximum number of partitions that will be used. + for the list of available options. Only used in pyarrow engine. + max_partitions: the maximum number of partitions that will be used. Only used in pyarrow engine. max_open_files: Limits the maximum number of files that can be left open while writing. If an attempt is made to open too many files then the least recently used file will be closed. If this setting is set too low you may end up fragmenting your - data into many small files. + data into many small files. Only used in pyarrow engine. max_rows_per_file: Maximum number of rows per file. If greater than 0 then this will limit how many rows are placed in any single file. Otherwise there will be no limit and one file will be created in each output directory unless files need to be closed to respect max_open_files min_rows_per_group: Minimum number of rows per group. When the value is set, the dataset writer will batch incoming data and only write the row groups to the disk - when sufficient rows have accumulated. + when sufficient rows have accumulated. Only used in pyarrow engine. max_rows_per_group: Maximum number of rows per group. If the value is set, then the dataset writer may split up large incoming batches into multiple row groups. If this value is set, then min_rows_per_group should also be set. @@ -162,16 +229,25 @@ def write_deltalake( configuration: A map containing configuration options for the metadata action. overwrite_schema: If True, allows updating the schema of the table. storage_options: options passed to the native delta filesystem. Unused if 'filesystem' is defined. - partition_filters: the partition filters that will be used for partition overwrite. + predicate: When using `Overwrite` mode, replace data that matches a predicate. Only used in rust engine. + partition_filters: the partition filters that will be used for partition overwrite. Only used in pyarrow engine. large_dtypes: If True, the data schema is kept in large_dtypes, has no effect on pandas dataframe input """ - table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options) + if table is not None: + storage_options = table._storage_options or {} + storage_options.update(storage_options or {}) - # We need to write against the latest table version - if table: table.update_incremental() + __enforce_append_only(table=table, configuration=configuration, mode=mode) + + if isinstance(partition_by, str): + partition_by = [partition_by] + + if isinstance(schema, Schema): + schema = schema.to_pyarrow() + if isinstance(data, RecordBatchReader): data = convert_pyarrow_recordbatchreader(data, large_dtypes) elif isinstance(data, pa.RecordBatch): @@ -182,9 +258,13 @@ def write_deltalake( data = convert_pyarrow_dataset(data, large_dtypes) elif _has_pandas and isinstance(data, pd.DataFrame): if schema is not None: - data = pa.Table.from_pandas(data, schema=schema) + data = convert_pyarrow_table( + pa.Table.from_pandas(data, schema=schema), large_dtypes=large_dtypes + ) else: - data = convert_pyarrow_table(pa.Table.from_pandas(data), False) + data = convert_pyarrow_table( + pa.Table.from_pandas(data), large_dtypes=large_dtypes + ) elif isinstance(data, Iterable): if schema is None: raise ValueError("You must provide schema if data is Iterable") @@ -196,204 +276,227 @@ def write_deltalake( if schema is None: schema = data.schema - if filesystem is not None: - raise NotImplementedError("Filesystem support is not yet implemented. #570") + if engine == "rust": + if table is not None and mode == "ignore": + return - if table is not None: - storage_options = table._storage_options or {} - storage_options.update(storage_options or {}) + data = RecordBatchReader.from_batches(schema, (batch for batch in data)) + write_deltalake_rust( + table_uri=table_uri, + data=data, + partition_by=partition_by, + mode=mode, + max_rows_per_group=max_rows_per_group, + overwrite_schema=overwrite_schema, + predicate=predicate, + name=name, + description=description, + configuration=configuration, + storage_options=storage_options, + ) + if table: + table.update_incremental() + + elif engine == "pyarrow": + # We need to write against the latest table version + if filesystem is not None: + raise NotImplementedError( + "Filesystem support is not yet implemented. #570" + ) - filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri, storage_options)) + filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri, storage_options)) - __enforce_append_only(table=table, configuration=configuration, mode=mode) + if table: # already exists + if schema != table.schema().to_pyarrow( + as_large_types=large_dtypes + ) and not (mode == "overwrite" and overwrite_schema): + raise ValueError( + "Schema of data does not match table schema\n" + f"Data schema:\n{schema}\nTable Schema:\n{table.schema().to_pyarrow(as_large_types=large_dtypes)}" + ) + if mode == "error": + raise AssertionError("DeltaTable already exists.") + elif mode == "ignore": + return - if isinstance(partition_by, str): - partition_by = [partition_by] + current_version = table.version() - if table: # already exists - if schema != table.schema().to_pyarrow(as_large_types=large_dtypes) and not ( - mode == "overwrite" and overwrite_schema - ): - raise ValueError( - "Schema of data does not match table schema\n" - f"Data schema:\n{schema}\nTable Schema:\n{table.schema().to_pyarrow(as_large_types=large_dtypes)}" - ) + if partition_by: + assert partition_by == table.metadata().partition_columns + else: + partition_by = table.metadata().partition_columns - if mode == "error": - raise AssertionError("DeltaTable already exists.") - elif mode == "ignore": - return + else: # creating a new table + current_version = -1 - current_version = table.version() + dtype_map = { + pa.large_string(): pa.string(), + } + + def _large_to_normal_dtype(dtype: pa.DataType) -> pa.DataType: + try: + return dtype_map[dtype] + except KeyError: + return dtype if partition_by: - assert partition_by == table.metadata().partition_columns + table_schema: pa.Schema = schema + if PYARROW_MAJOR_VERSION < 12: + partition_schema = pa.schema( + [ + pa.field( + name, _large_to_normal_dtype(table_schema.field(name).type) + ) + for name in partition_by + ] + ) + else: + partition_schema = pa.schema( + [table_schema.field(name) for name in partition_by] + ) + partitioning = ds.partitioning(partition_schema, flavor="hive") else: - partition_by = table.metadata().partition_columns - - if table.protocol().min_writer_version > MAX_SUPPORTED_WRITER_VERSION: - raise DeltaProtocolError( - "This table's min_writer_version is " - f"{table.protocol().min_writer_version}, " - "but this method only supports version 2." + partitioning = None + + add_actions: List[AddAction] = [] + + def visitor(written_file: Any) -> None: + path, partition_values = get_partitions_from_path(written_file.path) + stats = get_file_stats_from_metadata(written_file.metadata) + + # PyArrow added support for written_file.size in 9.0.0 + if PYARROW_MAJOR_VERSION >= 9: + size = written_file.size + elif filesystem is not None: + size = filesystem.get_file_info([path])[0].size + else: + size = 0 + + add_actions.append( + AddAction( + path, + size, + partition_values, + int(datetime.now().timestamp() * 1000), + True, + json.dumps(stats, cls=DeltaJSONEncoder), + ) ) - else: # creating a new table - current_version = -1 - dtype_map = { - pa.large_string(): pa.string(), - } + if table is not None: + # We don't currently provide a way to set invariants + # (and maybe never will), so only enforce if already exist. + if table.protocol().min_writer_version > MAX_SUPPORTED_WRITER_VERSION: + raise DeltaProtocolError( + "This table's min_writer_version is " + f"{table.protocol().min_writer_version}, " + "but this method only supports version 2." + ) - def _large_to_normal_dtype(dtype: pa.DataType) -> pa.DataType: - try: - return dtype_map[dtype] - except KeyError: - return dtype - - if partition_by: - if PYARROW_MAJOR_VERSION < 12: - partition_schema = pa.schema( - [ - pa.field(name, _large_to_normal_dtype(schema.field(name).type)) - for name in partition_by - ] + invariants = table.schema().invariants + checker = _DeltaDataChecker(invariants) + + def check_data_is_aligned_with_partition_filtering( + batch: pa.RecordBatch, + ) -> None: + if table is None: + return + existed_partitions: FrozenSet[ + FrozenSet[Tuple[str, Optional[str]]] + ] = table._table.get_active_partitions() + allowed_partitions: FrozenSet[ + FrozenSet[Tuple[str, Optional[str]]] + ] = table._table.get_active_partitions(partition_filters) + partition_values = pa.RecordBatch.from_arrays( + [ + batch.column(column_name) + for column_name in table.metadata().partition_columns + ], + table.metadata().partition_columns, + ) + partition_values = batch_distinct(partition_values) + for i in range(partition_values.num_rows): + # Map will maintain order of partition_columns + partition_map = { + column_name: encode_partition_value( + batch.column(column_name)[i].as_py() + ) + for column_name in table.metadata().partition_columns + } + partition = frozenset(partition_map.items()) + if ( + partition not in allowed_partitions + and partition in existed_partitions + ): + partition_repr = " ".join( + f"{key}={value}" for key, value in partition_map.items() + ) + raise ValueError( + f"Data should be aligned with partitioning. " + f"Data contained values for partition {partition_repr}" + ) + + def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: + checker.check_batch(batch) + + if mode == "overwrite" and partition_filters: + check_data_is_aligned_with_partition_filtering(batch) + + return batch + + data = RecordBatchReader.from_batches( + schema, (validate_batch(batch) for batch in data) ) - else: - partition_schema = pa.schema([schema.field(name) for name in partition_by]) - partitioning = ds.partitioning(partition_schema, flavor="hive") - else: - partitioning = None - - add_actions: List[AddAction] = [] - - def visitor(written_file: Any) -> None: - path, partition_values = get_partitions_from_path(written_file.path) - stats = get_file_stats_from_metadata(written_file.metadata) - # PyArrow added support for written_file.size in 9.0.0 - if PYARROW_MAJOR_VERSION >= 9: - size = written_file.size - elif filesystem is not None: - size = filesystem.get_file_info([path])[0].size + if file_options is not None: + file_options.update(use_compliant_nested_type=False) else: - size = 0 - - add_actions.append( - AddAction( - path, - size, - partition_values, - int(datetime.now().timestamp() * 1000), - True, - json.dumps(stats, cls=DeltaJSONEncoder), - ) - ) - - if table is not None: - # We don't currently provide a way to set invariants - # (and maybe never will), so only enforce if already exist. - invariants = table.schema().invariants - checker = _DeltaDataChecker(invariants) - - def check_data_is_aligned_with_partition_filtering( - batch: pa.RecordBatch, - ) -> None: - if table is None: - return - existed_partitions: FrozenSet[ - FrozenSet[Tuple[str, Optional[str]]] - ] = table._table.get_active_partitions() - allowed_partitions: FrozenSet[ - FrozenSet[Tuple[str, Optional[str]]] - ] = table._table.get_active_partitions(partition_filters) - partition_values = pa.RecordBatch.from_arrays( - [ - batch.column(column_name) - for column_name in table.metadata().partition_columns - ], - table.metadata().partition_columns, + file_options = ds.ParquetFileFormat().make_write_options( + use_compliant_nested_type=False ) - partition_values = batch_distinct(partition_values) - for i in range(partition_values.num_rows): - # Map will maintain order of partition_columns - partition_map = { - column_name: encode_partition_value( - batch.column(column_name)[i].as_py() - ) - for column_name in table.metadata().partition_columns - } - partition = frozenset(partition_map.items()) - if ( - partition not in allowed_partitions - and partition in existed_partitions - ): - partition_repr = " ".join( - f"{key}={value}" for key, value in partition_map.items() - ) - raise ValueError( - f"Data should be aligned with partitioning. " - f"Data contained values for partition {partition_repr}" - ) - - def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: - checker.check_batch(batch) - - if mode == "overwrite" and partition_filters: - check_data_is_aligned_with_partition_filtering(batch) - - return batch - - data = RecordBatchReader.from_batches( - schema, (validate_batch(batch) for batch in data) - ) - if file_options is not None: - file_options.update(use_compliant_nested_type=False) - else: - file_options = ds.ParquetFileFormat().make_write_options( - use_compliant_nested_type=False + ds.write_dataset( + data, + base_dir="/", + basename_template=f"{current_version + 1}-{uuid.uuid4()}-{{i}}.parquet", + format="parquet", + partitioning=partitioning, + # It will not accept a schema if using a RBR + schema=schema if not isinstance(data, RecordBatchReader) else None, + file_visitor=visitor, + existing_data_behavior="overwrite_or_ignore", + file_options=file_options, + max_open_files=max_open_files, + max_rows_per_file=max_rows_per_file, + min_rows_per_group=min_rows_per_group, + max_rows_per_group=max_rows_per_group, + filesystem=filesystem, + max_partitions=max_partitions, ) - ds.write_dataset( - data, - base_dir="/", - basename_template=f"{current_version + 1}-{uuid.uuid4()}-{{i}}.parquet", - format="parquet", - partitioning=partitioning, - # It will not accept a schema if using a RBR - schema=schema if not isinstance(data, RecordBatchReader) else None, - file_visitor=visitor, - existing_data_behavior="overwrite_or_ignore", - file_options=file_options, - max_open_files=max_open_files, - max_rows_per_file=max_rows_per_file, - min_rows_per_group=min_rows_per_group, - max_rows_per_group=max_rows_per_group, - filesystem=filesystem, - max_partitions=max_partitions, - ) - - if table is None: - _write_new_deltalake( - table_uri, - schema, - add_actions, - mode, - partition_by or [], - name, - description, - configuration, - storage_options, - ) + if table is None: + write_deltalake_pyarrow( + table_uri, + schema, + add_actions, + mode, + partition_by or [], + name, + description, + configuration, + storage_options, + ) + else: + table._table.create_write_transaction( + add_actions, + mode, + partition_by or [], + schema, + partition_filters, + ) + table.update_incremental() else: - table._table.create_write_transaction( - add_actions, - mode, - partition_by or [], - schema, - partition_filters, - ) - table.update_incremental() + raise ValueError("Only `pyarrow` or `rust` are valid inputs for the engine.") def convert_to_deltalake( diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index a8bfb6668a..b50f738bec 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -13,6 +13,8 @@ use tokio::runtime::Runtime; use crate::error::PythonError; use crate::utils::{delete_dir, rt, walk_tree}; +const DEFAULT_MAX_BUFFER_SIZE: i64 = 4 * 1024 * 1024; + #[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct FsConfig { pub(crate) root_url: String, @@ -279,12 +281,20 @@ impl DeltaFileSystemHandler { #[allow(unused)] metadata: Option>, ) -> PyResult { let path = Self::parse_path(&path); + let max_buffer_size = self + .config + .options + .get("max_buffer_size") + .map_or(DEFAULT_MAX_BUFFER_SIZE, |v| { + v.parse::().unwrap_or(DEFAULT_MAX_BUFFER_SIZE) + }); let file = self .rt .block_on(ObjectOutputStream::try_new( Arc::clone(&self.rt), self.inner.clone(), path, + max_buffer_size, )) .map_err(PythonError::from)?; Ok(file) @@ -492,6 +502,8 @@ pub struct ObjectOutputStream { closed: bool, #[pyo3(get)] mode: String, + max_buffer_size: i64, + buffer_size: i64, } impl ObjectOutputStream { @@ -499,6 +511,7 @@ impl ObjectOutputStream { rt: Arc, store: Arc, path: Path, + max_buffer_size: i64, ) -> Result { let (multipart_id, writer) = store.put_multipart(&path).await?; Ok(Self { @@ -510,6 +523,8 @@ impl ObjectOutputStream { pos: 0, closed: false, mode: "wb".into(), + max_buffer_size, + buffer_size: 0, }) } @@ -582,7 +597,7 @@ impl ObjectOutputStream { let len = data.as_bytes().len() as i64; let py = data.py(); let data = data.as_bytes(); - py.allow_threads(|| match self.rt.block_on(self.writer.write_all(data)) { + let res = py.allow_threads(|| match self.rt.block_on(self.writer.write_all(data)) { Ok(_) => Ok(len), Err(err) => { self.rt @@ -590,7 +605,13 @@ impl ObjectOutputStream { .map_err(PythonError::from)?; Err(PyIOError::new_err(err.to_string())) } - }) + })?; + self.buffer_size += len; + if self.buffer_size >= self.max_buffer_size { + let _ = self.flush(py); + self.buffer_size = 0; + } + Ok(res) } fn flush(&mut self, py: Python<'_>) -> PyResult<()> { diff --git a/python/src/lib.rs b/python/src/lib.rs index 69195e866d..cacef9f06b 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1126,6 +1126,109 @@ impl From<&PyAddAction> for Add { } } +#[pyfunction] +#[allow(clippy::too_many_arguments)] +fn create_deltalake( + table_uri: String, + schema: PyArrowType, + partition_by: Vec, + mode: String, + name: Option, + description: Option, + configuration: Option>>, + storage_options: Option>, +) -> PyResult<()> { + let table = DeltaTableBuilder::from_uri(table_uri) + .with_storage_options(storage_options.unwrap_or_default()) + .build() + .map_err(PythonError::from)?; + + let mode = mode.parse().map_err(PythonError::from)?; + let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?; + + let mut builder = DeltaOps(table) + .create() + .with_columns(schema.fields().clone()) + .with_save_mode(mode) + .with_partition_columns(partition_by); + + if let Some(name) = &name { + builder = builder.with_table_name(name); + }; + + if let Some(description) = &description { + builder = builder.with_comment(description); + }; + + if let Some(config) = configuration { + builder = builder.with_configuration(config); + }; + + rt()? + .block_on(builder.into_future()) + .map_err(PythonError::from)?; + + Ok(()) +} + +#[pyfunction] +#[allow(clippy::too_many_arguments)] +fn write_to_deltalake( + table_uri: String, + data: PyArrowType, + mode: String, + max_rows_per_group: i64, + overwrite_schema: bool, + partition_by: Option>, + predicate: Option, + name: Option, + description: Option, + configuration: Option>>, + storage_options: Option>, +) -> PyResult<()> { + let batches = data.0.map(|batch| batch.unwrap()).collect::>(); + let save_mode = mode.parse().map_err(PythonError::from)?; + + let options = storage_options.clone().unwrap_or_default(); + let table = rt()? + .block_on(DeltaOps::try_from_uri_with_storage_options( + &table_uri, options, + )) + .map_err(PythonError::from)?; + + let mut builder = table + .write(batches) + .with_save_mode(save_mode) + .with_overwrite_schema(overwrite_schema) + .with_write_batch_size(max_rows_per_group as usize); + + if let Some(partition_columns) = partition_by { + builder = builder.with_partition_columns(partition_columns); + } + + if let Some(name) = &name { + builder = builder.with_table_name(name); + }; + + if let Some(description) = &description { + builder = builder.with_description(description); + }; + + if let Some(predicate) = &predicate { + builder = builder.with_replace_where(predicate); + }; + + if let Some(config) = configuration { + builder = builder.with_configuration(config); + }; + + rt()? + .block_on(builder.into_future()) + .map_err(PythonError::from)?; + + Ok(()) +} + #[pyfunction] #[allow(clippy::too_many_arguments)] fn write_new_deltalake( @@ -1268,7 +1371,9 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> { m.add("__version__", env!("CARGO_PKG_VERSION"))?; m.add_function(pyo3::wrap_pyfunction!(rust_core_version, m)?)?; m.add_function(pyo3::wrap_pyfunction!(write_new_deltalake, m)?)?; + m.add_function(pyo3::wrap_pyfunction!(write_to_deltalake, m)?)?; m.add_function(pyo3::wrap_pyfunction!(convert_to_deltalake, m)?)?; + m.add_function(pyo3::wrap_pyfunction!(create_deltalake, m)?)?; m.add_function(pyo3::wrap_pyfunction!(batch_distinct, m)?)?; m.add_class::()?; m.add_class::()?; diff --git a/python/tests/test_benchmark.py b/python/tests/test_benchmark.py index fd32a7e4e6..d7299ca684 100644 --- a/python/tests/test_benchmark.py +++ b/python/tests/test_benchmark.py @@ -24,9 +24,12 @@ def sample_table() -> pa.Table: return tab +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) @pytest.mark.benchmark(group="write") -def test_benchmark_write(benchmark, sample_table, tmp_path): - benchmark(write_deltalake, str(tmp_path), sample_table, mode="overwrite") +def test_benchmark_write(benchmark, sample_table, tmp_path, engine): + benchmark( + write_deltalake, str(tmp_path), sample_table, mode="overwrite", engine=engine + ) dt = DeltaTable(str(tmp_path)) assert dt.to_pyarrow_table().sort_by("i") == sample_table diff --git a/python/tests/test_create.py b/python/tests/test_create.py new file mode 100644 index 0000000000..a618d741a1 --- /dev/null +++ b/python/tests/test_create.py @@ -0,0 +1,54 @@ +import pathlib + +import pyarrow as pa +import pytest + +from deltalake import DeltaTable +from deltalake.exceptions import DeltaError + + +def test_create_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table): + dt = DeltaTable.create( + tmp_path, + sample_data.schema, + name="test_name", + description="test_desc", + configuration={"delta.appendOnly": "false", "foo": "bar"}, + ) + + metadata = dt.metadata() + + assert metadata.name == "test_name" + assert metadata.description == "test_desc" + assert metadata.configuration == {"delta.appendOnly": "false", "foo": "bar"} + + +def test_create_modes(tmp_path: pathlib.Path, sample_data: pa.Table): + dt = DeltaTable.create(tmp_path, sample_data.schema, mode="error") + last_action = dt.history(1)[0] + + with pytest.raises(DeltaError): + dt = DeltaTable.create(tmp_path, sample_data.schema, mode="error") + + assert last_action["operation"] == "CREATE TABLE" + with pytest.raises(DeltaError): + dt = DeltaTable.create(tmp_path, sample_data.schema, mode="append") + + dt = DeltaTable.create(tmp_path, sample_data.schema, mode="ignore") + assert dt.version() == 0 + + dt = DeltaTable.create(tmp_path, sample_data.schema, mode="overwrite") + assert dt.version() == 1 + + last_action = dt.history(1)[0] + + assert last_action["operation"] == "CREATE OR REPLACE TABLE" + + +def test_create_schema(tmp_path: pathlib.Path, sample_data: pa.Table): + dt = DeltaTable.create( + tmp_path, + sample_data.schema, + ) + + assert dt.schema().to_pyarrow() == sample_data.schema diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 4330489e4a..49177782ff 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -6,7 +6,7 @@ import threading from datetime import date, datetime from math import inf -from typing import Any, Dict, Iterable, List +from typing import Any, Dict, Iterable, List, Literal from unittest.mock import Mock import pyarrow as pa @@ -16,8 +16,8 @@ from pyarrow.dataset import ParquetFileFormat, ParquetReadOptions from pyarrow.lib import RecordBatchReader -from deltalake import DeltaTable, write_deltalake -from deltalake.exceptions import CommitFailedError, DeltaProtocolError +from deltalake import DeltaTable, Schema, write_deltalake +from deltalake.exceptions import CommitFailedError, DeltaError, DeltaProtocolError from deltalake.table import ProtocolVersions from deltalake.writer import try_get_table_and_table_uri @@ -29,24 +29,30 @@ _has_pandas = True +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) @pytest.mark.skip(reason="Waiting on #570") -def test_handle_existing(tmp_path: pathlib.Path, sample_data: pa.Table): +def test_handle_existing( + tmp_path: pathlib.Path, sample_data: pa.Table, engine: Literal["pyarrow", "rust"] +): # if uri points to a non-empty directory that isn't a delta table, error tmp_path p = tmp_path / "hello.txt" p.write_text("hello") with pytest.raises(OSError) as exception: - write_deltalake(tmp_path, sample_data, mode="overwrite") + write_deltalake(tmp_path, sample_data, mode="overwrite", engine=engine) assert "directory is not empty" in str(exception) -def test_roundtrip_basic(tmp_path: pathlib.Path, sample_data: pa.Table): +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_roundtrip_basic( + tmp_path: pathlib.Path, sample_data: pa.Table, engine: Literal["pyarrow", "rust"] +): # Check we can create the subdirectory tmp_path = tmp_path / "path" / "to" / "table" start_time = datetime.now().timestamp() - write_deltalake(tmp_path, sample_data) + write_deltalake(tmp_path, sample_data, engine=engine) end_time = datetime.now().timestamp() assert ("0" * 20 + ".json") in os.listdir(tmp_path / "_delta_log") @@ -71,7 +77,8 @@ def test_roundtrip_basic(tmp_path: pathlib.Path, sample_data: pa.Table): assert modification_time < end_time -def test_roundtrip_nulls(tmp_path: pathlib.Path): +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_roundtrip_nulls(tmp_path: pathlib.Path, engine: Literal["pyarrow", "rust"]): data = pa.table({"x": pa.array([None, None, 1, 2], type=pa.int64())}) # One row group will have values, one will be all nulls. # The first will have None in min and max stats, so we need to handle that. @@ -91,6 +98,7 @@ def test_roundtrip_nulls(tmp_path: pathlib.Path): min_rows_per_group=2, max_rows_per_group=2, mode="overwrite", + engine=engine, ) delta_table = DeltaTable(tmp_path) @@ -105,11 +113,23 @@ def test_enforce_schema(existing_table: DeltaTable, mode: str): bad_data = pa.table({"x": pa.array([1, 2, 3])}) with pytest.raises(ValueError): - write_deltalake(existing_table, bad_data, mode=mode) + write_deltalake(existing_table, bad_data, mode=mode, engine="pyarrow") table_uri = existing_table._table.table_uri() with pytest.raises(ValueError): - write_deltalake(table_uri, bad_data, mode=mode) + write_deltalake(table_uri, bad_data, mode=mode, engine="pyarrow") + + +@pytest.mark.parametrize("mode", ["append", "overwrite"]) +def test_enforce_schema_rust_writer(existing_table: DeltaTable, mode: str): + bad_data = pa.table({"x": pa.array([1, 2, 3])}) + + with pytest.raises(DeltaError): + write_deltalake(existing_table, bad_data, mode=mode, engine="rust") + + table_uri = existing_table._table.table_uri() + with pytest.raises(DeltaError): + write_deltalake(table_uri, bad_data, mode=mode, engine="rust") def test_update_schema(existing_table: DeltaTable): @@ -125,12 +145,59 @@ def test_update_schema(existing_table: DeltaTable): assert existing_table.schema().to_pyarrow() == new_data.schema -def test_local_path(tmp_path: pathlib.Path, sample_data: pa.Table, monkeypatch): +def test_update_schema_rust_writer(existing_table: DeltaTable): + new_data = pa.table({"x": pa.array([1, 2, 3])}) + + with pytest.raises(DeltaError): + write_deltalake( + existing_table, + new_data, + mode="append", + overwrite_schema=True, + engine="rust", + ) + with pytest.raises(DeltaError): + write_deltalake( + existing_table, + new_data, + mode="overwrite", + overwrite_schema=False, + engine="rust", + ) + with pytest.raises(DeltaError): + write_deltalake( + existing_table, + new_data, + mode="append", + overwrite_schema=False, + engine="rust", + ) + # TODO(ion): Remove this once we add schema overwrite support + write_deltalake( + existing_table, + new_data, + mode="overwrite", + overwrite_schema=True, + engine="rust", + ) + + read_data = existing_table.to_pyarrow_table() + assert new_data == read_data + assert existing_table.schema().to_pyarrow() == new_data.schema + + +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_local_path( + tmp_path: pathlib.Path, + sample_data: pa.Table, + monkeypatch, + engine: Literal["pyarrow", "rust"], +): monkeypatch.chdir(tmp_path) # Make tmp_path the working directory (tmp_path / "path/to/table").mkdir(parents=True) local_path = "./path/to/table" - write_deltalake(local_path, sample_data) + write_deltalake(local_path, sample_data, engine=engine) delta_table = DeltaTable(local_path) assert delta_table.schema().to_pyarrow() == sample_data.schema @@ -138,13 +205,15 @@ def test_local_path(tmp_path: pathlib.Path, sample_data: pa.Table, monkeypatch): assert table == sample_data -def test_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table): +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table, engine): write_deltalake( tmp_path, sample_data, name="test_name", description="test_desc", configuration={"delta.appendOnly": "false", "foo": "bar"}, + engine=engine, ) delta_table = DeltaTable(tmp_path) @@ -156,6 +225,7 @@ def test_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table): assert metadata.configuration == {"delta.appendOnly": "false", "foo": "bar"} +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) @pytest.mark.parametrize( "column", [ @@ -173,9 +243,9 @@ def test_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table): ], ) def test_roundtrip_partitioned( - tmp_path: pathlib.Path, sample_data: pa.Table, column: str + tmp_path: pathlib.Path, sample_data: pa.Table, column: str, engine ): - write_deltalake(tmp_path, sample_data, partition_by=column) + write_deltalake(tmp_path, sample_data, partition_by=column, engine=engine) delta_table = DeltaTable(tmp_path) assert delta_table.schema().to_pyarrow() == sample_data.schema @@ -189,11 +259,16 @@ def test_roundtrip_partitioned( assert add_path.count("/") == 1 -def test_roundtrip_null_partition(tmp_path: pathlib.Path, sample_data: pa.Table): +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_roundtrip_null_partition( + tmp_path: pathlib.Path, sample_data: pa.Table, engine +): sample_data = sample_data.add_column( 0, "utf8_with_nulls", pa.array(["a"] * 4 + [None]) ) - write_deltalake(tmp_path, sample_data, partition_by=["utf8_with_nulls"]) + write_deltalake( + tmp_path, sample_data, partition_by=["utf8_with_nulls"], engine=engine + ) delta_table = DeltaTable(tmp_path) assert delta_table.schema().to_pyarrow() == sample_data.schema @@ -203,8 +278,13 @@ def test_roundtrip_null_partition(tmp_path: pathlib.Path, sample_data: pa.Table) assert table == sample_data -def test_roundtrip_multi_partitioned(tmp_path: pathlib.Path, sample_data: pa.Table): - write_deltalake(tmp_path, sample_data, partition_by=["int32", "bool"]) +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_roundtrip_multi_partitioned( + tmp_path: pathlib.Path, sample_data: pa.Table, engine +): + write_deltalake( + tmp_path, sample_data, partition_by=["int32", "bool"], engine=engine + ) delta_table = DeltaTable(tmp_path) assert delta_table.schema().to_pyarrow() == sample_data.schema @@ -218,33 +298,41 @@ def test_roundtrip_multi_partitioned(tmp_path: pathlib.Path, sample_data: pa.Tab assert add_path.count("/") == 2 -def test_write_modes(tmp_path: pathlib.Path, sample_data: pa.Table): - write_deltalake(tmp_path, sample_data) +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_write_modes(tmp_path: pathlib.Path, sample_data: pa.Table, engine): + write_deltalake(tmp_path, sample_data, engine=engine) assert DeltaTable(tmp_path).to_pyarrow_table() == sample_data - with pytest.raises(AssertionError): - write_deltalake(tmp_path, sample_data, mode="error") + if engine == "pyarrow": + with pytest.raises(AssertionError): + write_deltalake(tmp_path, sample_data, mode="error") + elif engine == "rust": + with pytest.raises(DeltaError): + write_deltalake(tmp_path, sample_data, mode="error", engine="rust") - write_deltalake(tmp_path, sample_data, mode="ignore") + write_deltalake(tmp_path, sample_data, mode="ignore", engine="rust") assert ("0" * 19 + "1.json") not in os.listdir(tmp_path / "_delta_log") - write_deltalake(tmp_path, sample_data, mode="append") + write_deltalake(tmp_path, sample_data, mode="append", engine="rust") expected = pa.concat_tables([sample_data, sample_data]) assert DeltaTable(tmp_path).to_pyarrow_table() == expected - write_deltalake(tmp_path, sample_data, mode="overwrite") + write_deltalake(tmp_path, sample_data, mode="overwrite", engine="rust") assert DeltaTable(tmp_path).to_pyarrow_table() == sample_data -def test_append_only_should_append_only_with_the_overwrite_mode( - tmp_path: pathlib.Path, sample_data: pa.Table +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_append_only_should_append_only_with_the_overwrite_mode( # Create rust equivalent rust + tmp_path: pathlib.Path, sample_data: pa.Table, engine ): config = {"delta.appendOnly": "true"} - write_deltalake(tmp_path, sample_data, mode="append", configuration=config) + write_deltalake( + tmp_path, sample_data, mode="append", configuration=config, engine=engine + ) table = DeltaTable(tmp_path) - write_deltalake(table, sample_data, mode="append") + write_deltalake(table, sample_data, mode="append", engine=engine) data_store_types = [tmp_path, table] fail_modes = ["overwrite", "ignore", "error"] @@ -257,7 +345,7 @@ def test_append_only_should_append_only_with_the_overwrite_mode( f" 'append'. Mode is currently {mode}" ), ): - write_deltalake(data_store_type, sample_data, mode=mode) + write_deltalake(data_store_type, sample_data, mode=mode, engine=engine) expected = pa.concat_tables([sample_data, sample_data]) @@ -265,21 +353,45 @@ def test_append_only_should_append_only_with_the_overwrite_mode( assert table.version() == 1 -def test_writer_with_table(existing_table: DeltaTable, sample_data: pa.Table): - write_deltalake(existing_table, sample_data, mode="overwrite") +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_writer_with_table(existing_table: DeltaTable, sample_data: pa.Table, engine): + write_deltalake(existing_table, sample_data, mode="overwrite", engine=engine) assert existing_table.to_pyarrow_table() == sample_data -def test_fails_wrong_partitioning(existing_table: DeltaTable, sample_data: pa.Table): - with pytest.raises(AssertionError): - write_deltalake( - existing_table, sample_data, mode="append", partition_by="int32" - ) +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_fails_wrong_partitioning( + existing_table: DeltaTable, sample_data: pa.Table, engine +): + if engine == "pyarrow": + with pytest.raises(AssertionError): + write_deltalake( + existing_table, + sample_data, + mode="append", + partition_by="int32", + engine=engine, + ) + elif engine == "rust": + with pytest.raises( + DeltaError, + match='Generic error: Specified table partitioning does not match table partitioning: expected: [], got: ["int32"]', + ): + write_deltalake( + existing_table, + sample_data, + mode="append", + partition_by="int32", + engine=engine, + ) +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) @pytest.mark.pandas @pytest.mark.parametrize("schema_provided", [True, False]) -def test_write_pandas(tmp_path: pathlib.Path, sample_data: pa.Table, schema_provided): +def test_write_pandas( + tmp_path: pathlib.Path, sample_data: pa.Table, schema_provided, engine +): # When timestamp is converted to Pandas, it gets casted to ns resolution, # but Delta Lake schemas only support us resolution. sample_pandas = sample_data.to_pandas() @@ -287,23 +399,27 @@ def test_write_pandas(tmp_path: pathlib.Path, sample_data: pa.Table, schema_prov schema = sample_data.schema else: schema = None - write_deltalake(tmp_path, sample_pandas, schema=schema) + write_deltalake(tmp_path, sample_pandas, schema=schema, engine=engine) delta_table = DeltaTable(tmp_path) df = delta_table.to_pandas() assert_frame_equal(df, sample_pandas) +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) def test_write_iterator( - tmp_path: pathlib.Path, existing_table: DeltaTable, sample_data: pa.Table + tmp_path: pathlib.Path, existing_table: DeltaTable, sample_data: pa.Table, engine ): batches = existing_table.to_pyarrow_dataset().to_batches() with pytest.raises(ValueError): - write_deltalake(tmp_path, batches, mode="overwrite") + write_deltalake(tmp_path, batches, mode="overwrite", engine=engine) - write_deltalake(tmp_path, batches, schema=sample_data.schema, mode="overwrite") + write_deltalake( + tmp_path, batches, schema=sample_data.schema, mode="overwrite", engine=engine + ) assert DeltaTable(tmp_path).to_pyarrow_table() == sample_data +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) @pytest.mark.parametrize("large_dtypes", [True, False]) @pytest.mark.parametrize( "constructor", @@ -317,38 +433,48 @@ def test_write_dataset_table_recordbatch( tmp_path: pathlib.Path, existing_table: DeltaTable, sample_data: pa.Table, + engine: str, large_dtypes: bool, constructor, ): dataset = constructor(existing_table) - write_deltalake(tmp_path, dataset, mode="overwrite", large_dtypes=large_dtypes) + write_deltalake( + tmp_path, dataset, mode="overwrite", large_dtypes=large_dtypes, engine=engine + ) assert DeltaTable(tmp_path).to_pyarrow_table() == sample_data @pytest.mark.parametrize("large_dtypes", [True, False]) +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) def test_write_recordbatchreader( tmp_path: pathlib.Path, existing_table: DeltaTable, sample_data: pa.Table, large_dtypes: bool, + engine: Literal["pyarrow", "rust"], ): batches = existing_table.to_pyarrow_dataset().to_batches() reader = RecordBatchReader.from_batches( existing_table.to_pyarrow_dataset().schema, batches ) - write_deltalake(tmp_path, reader, mode="overwrite", large_dtypes=large_dtypes) + write_deltalake( + tmp_path, reader, mode="overwrite", large_dtypes=large_dtypes, engine=engine + ) assert DeltaTable(tmp_path).to_pyarrow_table() == sample_data -def test_writer_partitioning(tmp_path: pathlib.Path): +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_writer_partitioning( + tmp_path: pathlib.Path, engine: Literal["pyarrow", "rust"] +): test_strings = ["a=b", "hello world", "hello%20world"] data = pa.table( {"p": pa.array(test_strings), "x": pa.array(range(len(test_strings)))} ) - write_deltalake(tmp_path, data) + write_deltalake(tmp_path, data, engine=engine) assert DeltaTable(tmp_path).to_pyarrow_table() == data @@ -437,7 +563,8 @@ def test_writer_stats(existing_table: DeltaTable, sample_data: pa.Table): assert stats["maxValues"] == expected_maxs -def test_writer_null_stats(tmp_path: pathlib.Path): +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_writer_null_stats(tmp_path: pathlib.Path, engine: Literal["pyarrow", "rust"]): data = pa.table( { "int32": pa.array([1, None, 2, None], pa.int32()), @@ -445,7 +572,7 @@ def test_writer_null_stats(tmp_path: pathlib.Path): "str": pa.array([None] * 4, pa.string()), } ) - write_deltalake(tmp_path, data) + write_deltalake(tmp_path, data, engine=engine) table = DeltaTable(tmp_path) stats = get_stats(table) @@ -454,10 +581,15 @@ def test_writer_null_stats(tmp_path: pathlib.Path): assert stats["nullCount"] == expected_nulls -def test_writer_fails_on_protocol(existing_table: DeltaTable, sample_data: pa.Table): +@pytest.mark.parametrize("engine", ["pyarrow"]) +def test_writer_fails_on_protocol( + existing_table: DeltaTable, + sample_data: pa.Table, + engine: Literal["pyarrow", "rust"], +): existing_table.protocol = Mock(return_value=ProtocolVersions(1, 3)) with pytest.raises(DeltaProtocolError): - write_deltalake(existing_table, sample_data, mode="overwrite") + write_deltalake(existing_table, sample_data, mode="overwrite", engine=engine) @pytest.mark.parametrize( @@ -722,6 +854,74 @@ def test_partition_overwrite_unfiltered_data_fails( ) +@pytest.mark.parametrize( + "value_1,value_2,value_type,filter_string", + [ + (1, 2, pa.int64(), "1"), + (False, True, pa.bool_(), "false"), + (date(2022, 1, 1), date(2022, 1, 2), pa.date32(), "2022-01-01"), + ], +) +def test_replace_where_overwrite( + tmp_path: pathlib.Path, + value_1: Any, + value_2: Any, + value_type: pa.DataType, + filter_string: str, +): + sample_data = pa.table( + { + "p1": pa.array(["1", "1", "2", "2"], pa.string()), + "p2": pa.array([value_1, value_2, value_1, value_2], value_type), + "val": pa.array([1, 1, 1, 1], pa.int64()), + } + ) + write_deltalake(tmp_path, sample_data, mode="overwrite", partition_by=["p1", "p2"]) + + delta_table = DeltaTable(tmp_path) + assert ( + delta_table.to_pyarrow_table().sort_by( + [("p1", "ascending"), ("p2", "ascending")] + ) + == sample_data + ) + + sample_data = pa.table( + { + "p1": pa.array(["1", "1"], pa.string()), + "p2": pa.array([value_2, value_1], value_type), + "val": pa.array([2, 2], pa.int64()), + } + ) + expected_data = pa.table( + { + "p1": pa.array(["1", "1", "2", "2"], pa.string()), + "p2": pa.array([value_1, value_2, value_1, value_2], value_type), + "val": pa.array([2, 2, 1, 1], pa.int64()), + } + ) + + with pytest.raises( + DeltaError, + match="Generic DeltaTable error: Overwriting data based on predicate is not yet implemented", + ): + write_deltalake( + tmp_path, + sample_data, + mode="overwrite", + predicate="`p1` = 1", + engine="rust", + ) + + delta_table.update_incremental() + assert ( + delta_table.to_pyarrow_table().sort_by( + [("p1", "ascending"), ("p2", "ascending")] + ) + == expected_data + ) + + def test_partition_overwrite_with_new_partition( tmp_path: pathlib.Path, sample_data_for_partitioning: pa.Table ): @@ -976,3 +1176,11 @@ def test_float_values(tmp_path: pathlib.Path): assert actions["min"].field("x2")[0].as_py() is None assert actions["max"].field("x2")[0].as_py() == 1.0 assert actions["null_count"].field("x2")[0].as_py() == 1 + + +def test_with_deltalake_schema(tmp_path: pathlib.Path, sample_data: pa.Table): + write_deltalake( + tmp_path, sample_data, schema=Schema.from_pyarrow(sample_data.schema) + ) + delta_table = DeltaTable(tmp_path) + assert delta_table.schema().to_pyarrow() == sample_data.schema