Skip to content

Commit

Permalink
Start to enable overwrite_schema
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Nov 15, 2023
1 parent 537d8a9 commit 44e217f
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 16 deletions.
13 changes: 11 additions & 2 deletions crates/deltalake-core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ pub struct WriteBuilder {
write_batch_size: Option<usize>,
/// RecordBatches to be written into the table
batches: Option<Vec<RecordBatch>>,
/// whether to overwrite the schema
overwrite_schema: bool,
/// how to handle cast failures, either return NULL (safe=true) or return ERR (safe=false)
safe_cast: bool,
/// Parquet writer properties
Expand All @@ -131,6 +133,7 @@ impl WriteBuilder {
write_batch_size: None,
batches: None,
safe_cast: false,
overwrite_schema: false,
writer_properties: None,
app_metadata: None,
}
Expand All @@ -142,6 +145,12 @@ impl WriteBuilder {
self
}

/// Add overwrite_schema
pub fn with_overwrite_schema(mut self, overwrite_schema: bool) -> Self {
self.overwrite_schema = overwrite_schema;
self
}

/// When using `Overwrite` mode, replace data that matches a predicate
pub fn with_replace_where(mut self, predicate: impl Into<String>) -> Self {
self.predicate = Some(predicate.into());
Expand Down Expand Up @@ -362,9 +371,9 @@ impl std::future::IntoFuture for WriteBuilder {
.or_else(|_| this.snapshot.arrow_schema())
.unwrap_or(schema.clone());

if !can_cast_batch(schema.fields(), table_schema.fields()) {
if !can_cast_batch(schema.fields(), table_schema.fields()) && !this.overwrite_schema {
return Err(DeltaTableError::Generic(
"Updating table schema not yet implemented".to_string(),
"Schema of data does not match table schema".to_string(),
));
};

Expand Down
1 change: 1 addition & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def write_to_deltalake(
partition_by: List[str],
mode: str,
max_rows_per_group: int,
overwrite_schema: bool,
storage_options: Optional[Dict[str, str]],
) -> None: ...
def batch_distinct(batch: pyarrow.RecordBatch) -> pyarrow.RecordBatch: ...
Expand Down
28 changes: 15 additions & 13 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def write_deltalake(
storage_options: Optional[Dict[str, str]] = None,
partition_filters: Optional[List[Tuple[str, str, Any]]] = None,
large_dtypes: bool = False,
engine: Literal['pyarrow', 'rust'] = 'pyarrow',
engine: Literal["pyarrow", "rust"] = "pyarrow",
) -> None:
"""Write to a Delta Lake table
Expand Down Expand Up @@ -167,9 +167,7 @@ def write_deltalake(
storage_options = table._storage_options or {}
storage_options.update(storage_options or {})


if engine == 'pyarrow':

if engine == "pyarrow":
if _has_pandas and isinstance(data, pd.DataFrame):
if schema is not None:
data = pa.Table.from_pandas(data, schema=schema)
Expand All @@ -189,9 +187,9 @@ def write_deltalake(
schema = data.schema

if filesystem is not None:
raise NotImplementedError("Filesystem support is not yet implemented. #570")


raise NotImplementedError(
"Filesystem support is not yet implemented. #570"
)

filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri, storage_options))

Expand All @@ -201,9 +199,9 @@ def write_deltalake(
partition_by = [partition_by]

if table: # already exists
if schema != table.schema().to_pyarrow(as_large_types=large_dtypes) and not (
mode == "overwrite" and overwrite_schema
):
if schema != table.schema().to_pyarrow(
as_large_types=large_dtypes
) and not (mode == "overwrite" and overwrite_schema):
raise ValueError(
"Schema of data does not match table schema\n"
f"Data schema:\n{schema}\nTable Schema:\n{table.schema().to_pyarrow(as_large_types=large_dtypes)}"
Expand Down Expand Up @@ -249,7 +247,9 @@ def _large_to_normal_dtype(dtype: pa.DataType) -> pa.DataType:
]
)
else:
partition_schema = pa.schema([schema.field(name) for name in partition_by])
partition_schema = pa.schema(
[schema.field(name) for name in partition_by]
)
partitioning = ds.partitioning(partition_schema, flavor="hive")
else:
partitioning = None
Expand Down Expand Up @@ -402,8 +402,10 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch:
partition_by=partition_by,
mode=mode,
max_rows_per_group=max_rows_per_group,
storage_options=storage_options
)
overwrite_schema=overwrite_schema,
storage_options=storage_options,
)


def __enforce_append_only(
table: Optional[DeltaTable],
Expand Down
3 changes: 2 additions & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ fn write_to_deltalake(
// name: Option<String>,
// description: Option<String>,
// configuration: Option<HashMap<String, Option<String>>>,
// overwrite_schema: bool,
overwrite_schema: bool,
storage_options: Option<HashMap<String, String>>,
) -> PyResult<()> {

Expand All @@ -1163,6 +1163,7 @@ fn write_to_deltalake(
let builder = table
.write(batches)
.with_save_mode(mode)
.with_overwrite_schema(overwrite_schema)
.with_write_batch_size(max_rows_per_group as usize)
.with_partition_columns(partition_by);

Expand Down

0 comments on commit 44e217f

Please sign in to comment.