Skip to content

Commit

Permalink
finalize and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Nov 18, 2023
1 parent b7563c4 commit ee9e515
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 15 deletions.
3 changes: 1 addition & 2 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,7 @@ def write_new_deltalake(
) -> None: ...
def convert_to_deltalake(
uri: str,
mode: Literal["error", "append", "overwrite", "ignore"],
partition_by: Optional[Dict[str, str]],
partition_by: Optional[pyarrow.Schema],
partition_strategy: Optional[Literal["hive"]],
name: Optional[str],
description: Optional[str],
Expand Down
34 changes: 28 additions & 6 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,24 +394,47 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch:

def convert_to_deltalake(
uri: Union[str, Path],
mode: Literal["error", "append", "overwrite", "ignore"] = "error",
partition_by: Optional[Dict[str, str]] = None,
mode: Literal["error", "ignore"] = "error",
partition_by: Optional[pa.Schema] = None,
partition_strategy: Optional[Literal["hive"]] = None,
name: Optional[str] = None,
description: Optional[str] = None,
configuration: Optional[Mapping[str, Optional[str]]] = None,
storage_options: Optional[Dict[str, str]] = None,
custom_metadata: Optional[Dict[str, str]] = None,
) -> None:
"""Currently only parquet is supported. Converts parquet dataset to delta table."""
if partition_strategy != "hive":
"""
`Convert` parquet tables `to delta` tables.
Currently only HIVE partitioned tables are supported. `Convert to delta` creates
a transaction log commit with add actions, and additional properties provided such
as configuration, name, and description.
Args:
uri: URI of a table.
partition_by: Optional partitioning schema if table is partitioned.
partition_strategy: Optional partition strategy to read and convert
mode: How to handle existing data. Default is to error if table already exists.
If 'ignore', will not convert anything if table already exists.
name: User-provided identifier for this table.
description: User-provided description for this table.
configuration: A map containing configuration options for the metadata action.
storage_options: options passed to the native delta filesystem. Unused if 'filesystem' is defined.
custom_metadata: custom metadata that will be added to the transaction commit
"""
if partition_by is not None and partition_strategy is None:
raise ValueError("Partition strategy has to be provided with partition_by.")

if partition_strategy is not None and partition_strategy != "hive":
raise ValueError(
"Currently only `hive` partition strategy is supported to be converted."
)

if mode == "ignore" and try_get_deltatable(uri, storage_options) is not None:
return

_convert_to_deltalake(
str(uri),
mode,
partition_by,
partition_strategy,
name,
Expand All @@ -420,7 +443,6 @@ def convert_to_deltalake(
storage_options,
custom_metadata,
)

return


Expand Down
12 changes: 5 additions & 7 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1193,22 +1193,20 @@ fn write_new_deltalake(
#[allow(clippy::too_many_arguments)]
fn convert_to_deltalake(
uri: String,
// mode: &str,
_partition_schema: Option<Vec<String>>,
partition_schema: Option<PyArrowType<ArrowSchema>>,
partition_strategy: Option<String>,
name: Option<String>,
description: Option<String>,
configuration: Option<HashMap<String, Option<String>>>,
storage_options: Option<HashMap<String, String>>,
custom_metadata: Option<HashMap<String, String>>,
) -> PyResult<()> {
// let mode = save_mode_from_str(mode)?;
let mut builder = ConvertToDeltaBuilder::new().with_location(uri);
// .with_save_mode(mode);

// if let Some(part_schema) = partition_schema {
// builder = builder.with_partition_schema(part_schema); // Convert type properly in struct field
// }
if let Some(part_schema) = partition_schema {
let schema: StructType = (&part_schema.0).try_into().map_err(PythonError::from)?;
builder = builder.with_partition_schema(schema.fields().clone());
}

if let Some(partition_strategy) = &partition_strategy {
let strategy = partition_strategy_from_str(partition_strategy)?;
Expand Down
97 changes: 97 additions & 0 deletions python/tests/test_convert_to_delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import pathlib

import pyarrow as pa
import pyarrow.dataset as ds
import pytest

from deltalake import convert_to_deltalake
from deltalake.exceptions import DeltaError
from deltalake.table import DeltaTable


def test_local_convert_to_delta(tmp_path: pathlib.Path, sample_data: pa.Table):
ds.write_dataset(
sample_data,
tmp_path,
format="parquet",
existing_data_behavior="overwrite_or_ignore",
)

name = "converted_table"
description = "parquet table converted to delta table with delta-rs"
convert_to_deltalake(
tmp_path,
name=name,
description=description,
configuration={"delta.AppendOnly": "True"},
)

dt = DeltaTable(tmp_path)

assert dt.version() == 0
assert dt.files() == ["part-0.parquet"]
assert dt.metadata().name == name
assert dt.metadata().description == description
assert dt.metadata().configuration == {"delta.AppendOnly": "True"}


def test_convert_delta_write_modes(tmp_path: pathlib.Path, sample_data: pa.Table):
ds.write_dataset(
sample_data,
tmp_path,
format="parquet",
existing_data_behavior="overwrite_or_ignore",
)

convert_to_deltalake(
tmp_path,
)

with pytest.raises(DeltaError):
convert_to_deltalake(
tmp_path,
)

convert_to_deltalake(tmp_path, mode="ignore")


def test_convert_delta_with_partitioning(tmp_path: pathlib.Path, sample_data: pa.Table):
ds.write_dataset(
sample_data,
tmp_path,
format="parquet",
existing_data_behavior="overwrite_or_ignore",
partitioning=["utf8"],
partitioning_flavor="hive",
)

with pytest.raises(
DeltaError,
match="Generic error: The schema of partition columns must be provided to convert a Parquet table to a Delta table",
):
convert_to_deltalake(
tmp_path,
)
with pytest.raises(
ValueError, match="Partition strategy has to be provided with partition_by"
):
convert_to_deltalake(
tmp_path,
partition_by=pa.schema([pa.field("utf8", pa.string())]),
)

with pytest.raises(
ValueError,
match="Currently only `hive` partition strategy is supported to be converted.",
):
convert_to_deltalake(
tmp_path,
partition_by=pa.schema([pa.field("utf8", pa.string())]),
partition_strategy="directory",
)

convert_to_deltalake(
tmp_path,
partition_by=pa.schema([pa.field("utf8", pa.string())]),
partition_strategy="hive",
)

0 comments on commit ee9e515

Please sign in to comment.