From 16e87a6998d14fbf6b06ffadbc862f8dcefc929b Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 18 Nov 2023 11:42:14 +0100 Subject: [PATCH] finalize and add tests --- python/deltalake/_internal.pyi | 3 +- python/deltalake/writer.py | 34 ++++++++-- python/src/lib.rs | 12 ++-- python/tests/test_convert_to_delta.py | 97 +++++++++++++++++++++++++++ 4 files changed, 131 insertions(+), 15 deletions(-) create mode 100644 python/tests/test_convert_to_delta.py diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 252f16d361..f751afa36f 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -142,8 +142,7 @@ def write_new_deltalake( ) -> None: ... def convert_to_deltalake( uri: str, - mode: Literal["error", "append", "overwrite", "ignore"], - partition_by: Optional[Dict[str, str]], + partition_by: Optional[pyarrow.Schema], partition_strategy: Optional[Literal["hive"]], name: Optional[str], description: Optional[str], diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index ab2059d075..dd0d350eb4 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -394,8 +394,8 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: def convert_to_deltalake( uri: Union[str, Path], - mode: Literal["error", "append", "overwrite", "ignore"] = "error", - partition_by: Optional[Dict[str, str]] = None, + mode: Literal["error", "ignore"] = "error", + partition_by: Optional[pa.Schema] = None, partition_strategy: Optional[Literal["hive"]] = None, name: Optional[str] = None, description: Optional[str] = None, @@ -403,15 +403,38 @@ def convert_to_deltalake( storage_options: Optional[Dict[str, str]] = None, custom_metadata: Optional[Dict[str, str]] = None, ) -> None: - """Currently only parquet is supported. Converts parquet dataset to delta table.""" - if partition_strategy != "hive": + """ + `Convert` parquet tables `to delta` tables. + + Currently only HIVE partitioned tables are supported. `Convert to delta` creates + a transaction log commit with add actions, and additional properties provided such + as configuration, name, and description. + + Args: + uri: URI of a table. + partition_by: Optional partitioning schema if table is partitioned. + partition_strategy: Optional partition strategy to read and convert + mode: How to handle existing data. Default is to error if table already exists. + If 'ignore', will not convert anything if table already exists. + name: User-provided identifier for this table. + description: User-provided description for this table. + configuration: A map containing configuration options for the metadata action. + storage_options: options passed to the native delta filesystem. Unused if 'filesystem' is defined. + custom_metadata: custom metadata that will be added to the transaction commit + """ + if partition_by is not None and partition_strategy is None: + raise ValueError("Partition strategy has to be provided with partition_by.") + + if partition_strategy is not None and partition_strategy != "hive": raise ValueError( "Currently only `hive` partition strategy is supported to be converted." ) + if mode == "ignore" and try_get_deltatable(uri, storage_options) is not None: + return + _convert_to_deltalake( str(uri), - mode, partition_by, partition_strategy, name, @@ -420,7 +443,6 @@ def convert_to_deltalake( storage_options, custom_metadata, ) - return diff --git a/python/src/lib.rs b/python/src/lib.rs index 9e7d63f8d3..65c92ea197 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1193,8 +1193,7 @@ fn write_new_deltalake( #[allow(clippy::too_many_arguments)] fn convert_to_deltalake( uri: String, - // mode: &str, - _partition_schema: Option>, + partition_schema: Option>, partition_strategy: Option, name: Option, description: Option, @@ -1202,13 +1201,12 @@ fn convert_to_deltalake( storage_options: Option>, custom_metadata: Option>, ) -> PyResult<()> { - // let mode = save_mode_from_str(mode)?; let mut builder = ConvertToDeltaBuilder::new().with_location(uri); - // .with_save_mode(mode); - // if let Some(part_schema) = partition_schema { - // builder = builder.with_partition_schema(part_schema); // Convert type properly in struct field - // } + if let Some(part_schema) = partition_schema { + let schema: StructType = (&part_schema.0).try_into().map_err(PythonError::from)?; + builder = builder.with_partition_schema(schema.fields().clone()); + } if let Some(partition_strategy) = &partition_strategy { let strategy = partition_strategy_from_str(partition_strategy)?; diff --git a/python/tests/test_convert_to_delta.py b/python/tests/test_convert_to_delta.py new file mode 100644 index 0000000000..29badf3358 --- /dev/null +++ b/python/tests/test_convert_to_delta.py @@ -0,0 +1,97 @@ +import pathlib + +import pyarrow as pa +import pyarrow.dataset as ds +import pytest + +from deltalake import convert_to_deltalake +from deltalake.exceptions import DeltaError +from deltalake.table import DeltaTable + + +def test_local_convert_to_delta(tmp_path: pathlib.Path, sample_data: pa.Table): + ds.write_dataset( + sample_data, + tmp_path, + format="parquet", + existing_data_behavior="overwrite_or_ignore", + ) + + name = "converted_table" + description = "parquet table converted to delta table with delta-rs" + convert_to_deltalake( + tmp_path, + name=name, + description=description, + configuration={"delta.AppendOnly": "True"}, + ) + + dt = DeltaTable(tmp_path) + + assert dt.version() == 0 + assert dt.files() == ["part-0.parquet"] + assert dt.metadata().name == name + assert dt.metadata().description == description + assert dt.metadata().configuration == {"delta.AppendOnly": "True"} + + +def test_convert_delta_write_modes(tmp_path: pathlib.Path, sample_data: pa.Table): + ds.write_dataset( + sample_data, + tmp_path, + format="parquet", + existing_data_behavior="overwrite_or_ignore", + ) + + convert_to_deltalake( + tmp_path, + ) + + with pytest.raises(DeltaError): + convert_to_deltalake( + tmp_path, + ) + + convert_to_deltalake(tmp_path, mode="ignore") + + +def test_convert_delta_with_partitioning(tmp_path: pathlib.Path, sample_data: pa.Table): + ds.write_dataset( + sample_data, + tmp_path, + format="parquet", + existing_data_behavior="overwrite_or_ignore", + partitioning=["utf8"], + partitioning_flavor="hive", + ) + + with pytest.raises( + DeltaError, + match="Generic error: The schema of partition columns must be provided to convert a Parquet table to a Delta table", + ): + convert_to_deltalake( + tmp_path, + ) + with pytest.raises( + ValueError, match="Partition strategy has to be provided with partition_by" + ): + convert_to_deltalake( + tmp_path, + partition_by=pa.schema([pa.field("utf8", pa.string())]), + ) + + with pytest.raises( + ValueError, + match="Currently only `hive` partition strategy is supported to be converted.", + ): + convert_to_deltalake( + tmp_path, + partition_by=pa.schema([pa.field("utf8", pa.string())]), + partition_strategy="directory", + ) + + convert_to_deltalake( + tmp_path, + partition_by=pa.schema([pa.field("utf8", pa.string())]), + partition_strategy="hive", + )