Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): expose rust writer as additional engine #1872

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions crates/deltalake-core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use self::vacuum::VacuumBuilder;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::table::builder::DeltaTableBuilder;
use crate::DeltaTable;
use std::collections::HashMap;

#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod convert_to_delta;
Expand Down Expand Up @@ -73,6 +74,22 @@ impl DeltaOps {
}
}

/// try from uri with storage options
pub async fn try_from_uri_with_storage_options(
uri: impl AsRef<str>,
storage_options: HashMap<String, String>,
) -> DeltaResult<Self> {
let mut table = DeltaTableBuilder::from_uri(uri)
.with_storage_options(storage_options)
.build()?;
// We allow for uninitialized locations, since we may want to create the table
match table.load().await {
Ok(_) => Ok(table.into()),
Err(DeltaTableError::NotATable(_)) => Ok(table.into()),
Err(err) => Err(err),
}
}

/// Create a new [`DeltaOps`] instance, backed by an un-initialized in memory table
///
/// Using this will not persist any changes beyond the lifetime of the table object.
Expand Down
61 changes: 58 additions & 3 deletions crates/deltalake-core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,20 @@ pub struct WriteBuilder {
write_batch_size: Option<usize>,
/// RecordBatches to be written into the table
batches: Option<Vec<RecordBatch>>,
/// whether to overwrite the schema
overwrite_schema: bool,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this meant for schema evolution? If so, I'd recommend moving that to a follow-up PR as it would likely blow up this PR quite a bit.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I think it's fine if we let that return NotImplementedError for now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was an quick attempt for schema evolution. I was able to write except it didn't write the columns that were not part of the original schema, so I need to dig through the code more.

Ok, let's do this as improvement in another update.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also would need to update all read path to always add null columns for columns non-existent in older parquet files. Haven't looked into it, but this would likely require some larger refactoring particularly in the datafusion DeltaScan. Saying this we likely need to validate that added columns are always nullable as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would be schema evolution for also appends.

The PyArrow writer can do schema evolution but only combined with an overwrite mode.

I think that's purely a metadata action then. Would this be doable with the existing deltalake-core crate?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure, we may end up with unreadble tables if we do this... if we replace the whole table this might work.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With PyArrow it only works together with overwrite so it should be safe. Is there a way to adjust the commit that's written during a write?

/// how to handle cast failures, either return NULL (safe=true) or return ERR (safe=false)
safe_cast: bool,
/// Parquet writer properties
writer_properties: Option<WriterProperties>,
/// Additional metadata to be added to commit
app_metadata: Option<HashMap<String, serde_json::Value>>,
/// Name of the table, only used when table doesn't exist yet
name: Option<String>,
/// Description of the table, only used when table doesn't exist yet
description: Option<String>,
/// Configurations of the delta table, only used when table doesn't exist
configuration: HashMap<String, Option<String>>,
}

impl WriteBuilder {
Expand All @@ -126,8 +134,12 @@ impl WriteBuilder {
write_batch_size: None,
batches: None,
safe_cast: false,
overwrite_schema: false,
writer_properties: None,
app_metadata: None,
name: None,
description: None,
configuration: Default::default(),
}
}

Expand All @@ -137,6 +149,12 @@ impl WriteBuilder {
self
}

/// Add overwrite_schema
pub fn with_overwrite_schema(mut self, overwrite_schema: bool) -> Self {
self.overwrite_schema = overwrite_schema;
self
}

/// When using `Overwrite` mode, replace data that matches a predicate
pub fn with_replace_where(mut self, predicate: impl Into<String>) -> Self {
self.predicate = Some(predicate.into());
Expand Down Expand Up @@ -205,6 +223,31 @@ impl WriteBuilder {
self
}

/// Specify the table name. Optionally qualified with
/// a database name [database_name.] table_name.
pub fn with_table_name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}

/// Comment to describe the table.
pub fn with_description(mut self, description: impl Into<String>) -> Self {
self.description = Some(description.into());
self
}

/// Set configuration on created table
pub fn with_configuration(
mut self,
configuration: impl IntoIterator<Item = (impl Into<String>, Option<impl Into<String>>)>,
) -> Self {
self.configuration = configuration
.into_iter()
.map(|(k, v)| (k.into(), v.map(|s| s.into())))
.collect();
self
}

async fn check_preconditions(&self) -> DeltaResult<Vec<Action>> {
match self.log_store.is_delta_table_location().await? {
true => {
Expand All @@ -229,10 +272,20 @@ impl WriteBuilder {
}?;
let mut builder = CreateBuilder::new()
.with_log_store(self.log_store.clone())
.with_columns(schema.fields().clone());
.with_columns(schema.fields().clone())
.with_configuration(self.configuration.clone());
if let Some(partition_columns) = self.partition_columns.as_ref() {
builder = builder.with_partition_columns(partition_columns.clone())
}

if let Some(name) = self.name.as_ref() {
builder = builder.with_table_name(name.clone());
};

if let Some(desc) = self.description.as_ref() {
builder = builder.with_comment(desc.clone());
};

let (_, actions, _) = builder.into_table_and_actions()?;
Ok(actions)
}
Expand Down Expand Up @@ -353,9 +406,11 @@ impl std::future::IntoFuture for WriteBuilder {
.or_else(|_| this.snapshot.arrow_schema())
.unwrap_or(schema.clone());

if !can_cast_batch(schema.fields(), table_schema.fields()) {
if !can_cast_batch(schema.fields(), table_schema.fields())
&& !this.overwrite_schema
{
return Err(DeltaTableError::Generic(
"Updating table schema not yet implemented".to_string(),
"Schema of data does not match table schema".to_string(),
));
};

Expand Down
14 changes: 9 additions & 5 deletions crates/deltalake-core/src/writer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ use std::io::Write;
use std::sync::Arc;

use arrow::array::{
as_boolean_array, as_generic_binary_array, as_primitive_array, as_string_array, Array,
as_boolean_array, as_generic_binary_array, as_largestring_array, as_primitive_array,
as_string_array, Array,
};
use arrow::datatypes::{
DataType, Date32Type, Date64Type, Int16Type, Int32Type, Int64Type, Int8Type,
Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type,
UInt64Type, UInt8Type,
DataType, Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
Int8Type, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
};
use arrow::json::ReaderBuilder;
use arrow::record_batch::*;
Expand Down Expand Up @@ -184,7 +185,10 @@ pub(crate) fn stringified_partition_value(
DataType::UInt16 => as_primitive_array::<UInt16Type>(arr).value(0).to_string(),
DataType::UInt32 => as_primitive_array::<UInt32Type>(arr).value(0).to_string(),
DataType::UInt64 => as_primitive_array::<UInt64Type>(arr).value(0).to_string(),
DataType::Float32 => as_primitive_array::<Float32Type>(arr).value(0).to_string(),
DataType::Float64 => as_primitive_array::<Float64Type>(arr).value(0).to_string(),
DataType::Utf8 => as_string_array(arr).value(0).to_string(),
DataType::LargeUtf8 => as_largestring_array(arr).value(0).to_string(),
DataType::Boolean => as_boolean_array(arr).value(0).to_string(),
DataType::Date32 => as_primitive_array::<Date32Type>(arr)
.value_as_date(0)
Expand Down
12 changes: 12 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,18 @@ def write_new_deltalake(
configuration: Optional[Mapping[str, Optional[str]]],
storage_options: Optional[Dict[str, str]],
) -> None: ...
def write_to_deltalake(
table_uri: str,
data: pyarrow.RecordBatchReader,
partition_by: Optional[List[str]],
mode: str,
max_rows_per_group: int,
overwrite_schema: bool,
name: Optional[str],
description: Optional[str],
configuration: Optional[Mapping[str, Optional[str]]],
storage_options: Optional[Dict[str, str]],
) -> None: ...
def convert_to_deltalake(
uri: str,
partition_by: Optional[pyarrow.Schema],
Expand Down
Loading
Loading