From 44e217fad50691cc4ff5b1bc34e13499b664e1a4 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Wed, 15 Nov 2023 20:41:18 +0100 Subject: [PATCH] Start to enable overwrite_schema --- crates/deltalake-core/src/operations/write.rs | 13 +++++++-- python/deltalake/_internal.pyi | 1 + python/deltalake/writer.py | 28 ++++++++++--------- python/src/lib.rs | 3 +- 4 files changed, 29 insertions(+), 16 deletions(-) diff --git a/crates/deltalake-core/src/operations/write.rs b/crates/deltalake-core/src/operations/write.rs index dec4b7ced7..d9b5f80d62 100644 --- a/crates/deltalake-core/src/operations/write.rs +++ b/crates/deltalake-core/src/operations/write.rs @@ -108,6 +108,8 @@ pub struct WriteBuilder { write_batch_size: Option, /// RecordBatches to be written into the table batches: Option>, + /// whether to overwrite the schema + overwrite_schema: bool, /// how to handle cast failures, either return NULL (safe=true) or return ERR (safe=false) safe_cast: bool, /// Parquet writer properties @@ -131,6 +133,7 @@ impl WriteBuilder { write_batch_size: None, batches: None, safe_cast: false, + overwrite_schema: false, writer_properties: None, app_metadata: None, } @@ -142,6 +145,12 @@ impl WriteBuilder { self } + /// Add overwrite_schema + pub fn with_overwrite_schema(mut self, overwrite_schema: bool) -> Self { + self.overwrite_schema = overwrite_schema; + self + } + /// When using `Overwrite` mode, replace data that matches a predicate pub fn with_replace_where(mut self, predicate: impl Into) -> Self { self.predicate = Some(predicate.into()); @@ -362,9 +371,9 @@ impl std::future::IntoFuture for WriteBuilder { .or_else(|_| this.snapshot.arrow_schema()) .unwrap_or(schema.clone()); - if !can_cast_batch(schema.fields(), table_schema.fields()) { + if !can_cast_batch(schema.fields(), table_schema.fields()) && !this.overwrite_schema { return Err(DeltaTableError::Generic( - "Updating table schema not yet implemented".to_string(), + "Schema of data does not match table schema".to_string(), )); }; diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 4035bc61b3..0b7875e220 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -145,6 +145,7 @@ def write_to_deltalake( partition_by: List[str], mode: str, max_rows_per_group: int, + overwrite_schema: bool, storage_options: Optional[Dict[str, str]], ) -> None: ... def batch_distinct(batch: pyarrow.RecordBatch) -> pyarrow.RecordBatch: ... diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index fa8e1f4b2f..b58287e10d 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -91,7 +91,7 @@ def write_deltalake( storage_options: Optional[Dict[str, str]] = None, partition_filters: Optional[List[Tuple[str, str, Any]]] = None, large_dtypes: bool = False, - engine: Literal['pyarrow', 'rust'] = 'pyarrow', + engine: Literal["pyarrow", "rust"] = "pyarrow", ) -> None: """Write to a Delta Lake table @@ -167,9 +167,7 @@ def write_deltalake( storage_options = table._storage_options or {} storage_options.update(storage_options or {}) - - if engine == 'pyarrow': - + if engine == "pyarrow": if _has_pandas and isinstance(data, pd.DataFrame): if schema is not None: data = pa.Table.from_pandas(data, schema=schema) @@ -189,9 +187,9 @@ def write_deltalake( schema = data.schema if filesystem is not None: - raise NotImplementedError("Filesystem support is not yet implemented. #570") - - + raise NotImplementedError( + "Filesystem support is not yet implemented. #570" + ) filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri, storage_options)) @@ -201,9 +199,9 @@ def write_deltalake( partition_by = [partition_by] if table: # already exists - if schema != table.schema().to_pyarrow(as_large_types=large_dtypes) and not ( - mode == "overwrite" and overwrite_schema - ): + if schema != table.schema().to_pyarrow( + as_large_types=large_dtypes + ) and not (mode == "overwrite" and overwrite_schema): raise ValueError( "Schema of data does not match table schema\n" f"Data schema:\n{schema}\nTable Schema:\n{table.schema().to_pyarrow(as_large_types=large_dtypes)}" @@ -249,7 +247,9 @@ def _large_to_normal_dtype(dtype: pa.DataType) -> pa.DataType: ] ) else: - partition_schema = pa.schema([schema.field(name) for name in partition_by]) + partition_schema = pa.schema( + [schema.field(name) for name in partition_by] + ) partitioning = ds.partitioning(partition_schema, flavor="hive") else: partitioning = None @@ -402,8 +402,10 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: partition_by=partition_by, mode=mode, max_rows_per_group=max_rows_per_group, - storage_options=storage_options - ) + overwrite_schema=overwrite_schema, + storage_options=storage_options, + ) + def __enforce_append_only( table: Optional[DeltaTable], diff --git a/python/src/lib.rs b/python/src/lib.rs index f3619da144..dff380decf 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1142,7 +1142,7 @@ fn write_to_deltalake( // name: Option, // description: Option, // configuration: Option>>, - // overwrite_schema: bool, + overwrite_schema: bool, storage_options: Option>, ) -> PyResult<()> { @@ -1163,6 +1163,7 @@ fn write_to_deltalake( let builder = table .write(batches) .with_save_mode(mode) + .with_overwrite_schema(overwrite_schema) .with_write_batch_size(max_rows_per_group as usize) .with_partition_columns(partition_by);