From 9829729be79e65c92e8292111ad30e10786ec8a2 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 19 Nov 2023 16:11:33 +0100 Subject: [PATCH] feat(python): expose `convert_to_deltalake` (#1842) Exposes added `convert to delta` functionality by @junjunjd to Python API. - closes #1767 --------- Co-authored-by: Robert Pack <42610831+roeap@users.noreply.github.com> --- .../src/operations/convert_to_delta.rs | 16 ++- crates/deltalake-core/src/protocol/mod.rs | 20 +++- python/deltalake/__init__.py | 1 + python/deltalake/_internal.pyi | 10 ++ python/deltalake/writer.py | 55 +++++++++++ python/src/lib.rs | 68 ++++++++++--- python/tests/test_convert_to_delta.py | 97 +++++++++++++++++++ 7 files changed, 254 insertions(+), 13 deletions(-) create mode 100644 python/tests/test_convert_to_delta.py diff --git a/crates/deltalake-core/src/operations/convert_to_delta.rs b/crates/deltalake-core/src/operations/convert_to_delta.rs index 84fffa1578..644591727c 100644 --- a/crates/deltalake-core/src/operations/convert_to_delta.rs +++ b/crates/deltalake-core/src/operations/convert_to_delta.rs @@ -27,7 +27,7 @@ use serde_json::{Map, Value}; use std::{ collections::{HashMap, HashSet}, num::TryFromIntError, - str::Utf8Error, + str::{FromStr, Utf8Error}, sync::Arc, }; @@ -82,6 +82,20 @@ pub enum PartitionStrategy { Hive, } +impl FromStr for PartitionStrategy { + type Err = DeltaTableError; + + fn from_str(s: &str) -> DeltaResult { + match s.to_ascii_lowercase().as_str() { + "hive" => Ok(PartitionStrategy::Hive), + _ => Err(DeltaTableError::Generic(format!( + "Invalid partition strategy provided {}", + s + ))), + } + } +} + /// Build an operation to convert a Parquet table to a [`DeltaTable`] in place pub struct ConvertToDeltaBuilder { log_store: Option, diff --git a/crates/deltalake-core/src/protocol/mod.rs b/crates/deltalake-core/src/protocol/mod.rs index 8a5cd9f858..e2add9b529 100644 --- a/crates/deltalake-core/src/protocol/mod.rs +++ b/crates/deltalake-core/src/protocol/mod.rs @@ -23,8 +23,9 @@ use std::borrow::Borrow; use std::collections::HashMap; use std::hash::{Hash, Hasher}; use std::mem::take; +use std::str::FromStr; -use crate::errors::DeltaResult; +use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Add, CommitInfo, Metadata, Protocol, Remove}; use crate::logstore::LogStore; use crate::table::CheckPoint; @@ -589,6 +590,23 @@ pub enum SaveMode { Ignore, } +impl FromStr for SaveMode { + type Err = DeltaTableError; + + fn from_str(s: &str) -> DeltaResult { + match s.to_ascii_lowercase().as_str() { + "append" => Ok(SaveMode::Append), + "overwrite" => Ok(SaveMode::Overwrite), + "error" => Ok(SaveMode::ErrorIfExists), + "ignore" => Ok(SaveMode::Ignore), + _ => Err(DeltaTableError::Generic(format!( + "Invalid save mode provided: {}, only these are supported: ['append', 'overwrite', 'error', 'ignore']", + s + ))), + } + } +} + /// The OutputMode used in streaming operations. #[derive(Serialize, Deserialize, Debug, Clone)] pub enum OutputMode { diff --git a/python/deltalake/__init__.py b/python/deltalake/__init__.py index 129eaff1cf..b10a708309 100644 --- a/python/deltalake/__init__.py +++ b/python/deltalake/__init__.py @@ -6,4 +6,5 @@ from .schema import Schema as Schema from .table import DeltaTable as DeltaTable from .table import Metadata as Metadata +from .writer import convert_to_deltalake as convert_to_deltalake from .writer import write_deltalake as write_deltalake diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 2a4f77993c..854d96d4a8 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -152,6 +152,16 @@ def write_to_deltalake( configuration: Optional[Mapping[str, Optional[str]]], storage_options: Optional[Dict[str, str]], ) -> None: ... +def convert_to_deltalake( + uri: str, + partition_by: Optional[pyarrow.Schema], + partition_strategy: Optional[Literal["hive"]], + name: Optional[str], + description: Optional[str], + configuration: Optional[Mapping[str, Optional[str]]], + storage_options: Optional[Dict[str, str]], + custom_metadata: Optional[Dict[str, str]], +) -> None: ... def batch_distinct(batch: pyarrow.RecordBatch) -> pyarrow.RecordBatch: ... # Can't implement inheritance (see note in src/schema.rs), so this is next diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index cfbe57ce86..358f7fed41 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -38,6 +38,7 @@ from ._internal import DeltaDataChecker as _DeltaDataChecker from ._internal import batch_distinct +from ._internal import convert_to_deltalake as _convert_to_deltalake from ._internal import write_new_deltalake as write_deltalake_pyarrow from ._internal import write_to_deltalake as write_deltalake_rust from .exceptions import DeltaProtocolError, TableNotFoundError @@ -438,6 +439,60 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: raise ValueError("Only `pyarrow` or `rust` are valid inputs for the engine.") +def convert_to_deltalake( + uri: Union[str, Path], + mode: Literal["error", "ignore"] = "error", + partition_by: Optional[pa.Schema] = None, + partition_strategy: Optional[Literal["hive"]] = None, + name: Optional[str] = None, + description: Optional[str] = None, + configuration: Optional[Mapping[str, Optional[str]]] = None, + storage_options: Optional[Dict[str, str]] = None, + custom_metadata: Optional[Dict[str, str]] = None, +) -> None: + """ + `Convert` parquet tables `to delta` tables. + + Currently only HIVE partitioned tables are supported. `Convert to delta` creates + a transaction log commit with add actions, and additional properties provided such + as configuration, name, and description. + + Args: + uri: URI of a table. + partition_by: Optional partitioning schema if table is partitioned. + partition_strategy: Optional partition strategy to read and convert + mode: How to handle existing data. Default is to error if table already exists. + If 'ignore', will not convert anything if table already exists. + name: User-provided identifier for this table. + description: User-provided description for this table. + configuration: A map containing configuration options for the metadata action. + storage_options: options passed to the native delta filesystem. Unused if 'filesystem' is defined. + custom_metadata: custom metadata that will be added to the transaction commit + """ + if partition_by is not None and partition_strategy is None: + raise ValueError("Partition strategy has to be provided with partition_by.") + + if partition_strategy is not None and partition_strategy != "hive": + raise ValueError( + "Currently only `hive` partition strategy is supported to be converted." + ) + + if mode == "ignore" and try_get_deltatable(uri, storage_options) is not None: + return + + _convert_to_deltalake( + str(uri), + partition_by, + partition_strategy, + name, + description, + configuration, + storage_options, + custom_metadata, + ) + return + + def __enforce_append_only( table: Optional[DeltaTable], configuration: Optional[Mapping[str, Optional[str]]], diff --git a/python/src/lib.rs b/python/src/lib.rs index eddd742951..b2dfc7b134 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -27,6 +27,7 @@ use deltalake::datafusion::prelude::SessionContext; use deltalake::delta_datafusion::DeltaDataChecker; use deltalake::errors::DeltaTableError; use deltalake::kernel::{Action, Add, Invariant, Metadata, Remove, StructType}; +use deltalake::operations::convert_to_delta::{ConvertToDeltaBuilder, PartitionStrategy}; use deltalake::operations::delete::DeleteBuilder; use deltalake::operations::filesystem_check::FileSystemCheckBuilder; use deltalake::operations::merge::MergeBuilder; @@ -43,6 +44,7 @@ use deltalake::DeltaTableBuilder; use pyo3::exceptions::{PyIOError, PyRuntimeError, PyValueError}; use pyo3::prelude::*; use pyo3::types::{PyFrozenSet, PyType}; +use serde_json::{Map, Value}; use crate::error::DeltaProtocolError; use crate::error::PythonError; @@ -758,7 +760,8 @@ impl RawDeltaTable { schema: PyArrowType, partitions_filters: Option>, ) -> PyResult<()> { - let mode = save_mode_from_str(mode)?; + let mode = mode.parse().map_err(PythonError::from)?; + let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?; let existing_schema = self._table.get_schema().map_err(PythonError::from)?; @@ -1088,16 +1091,6 @@ fn batch_distinct(batch: PyArrowType) -> PyResult PyResult { - match value { - "append" => Ok(SaveMode::Append), - "overwrite" => Ok(SaveMode::Overwrite), - "error" => Ok(SaveMode::ErrorIfExists), - "ignore" => Ok(SaveMode::Ignore), - _ => Err(PyValueError::new_err("Invalid save mode")), - } -} - fn current_timestamp() -> i64 { let start = SystemTime::now(); let since_the_epoch = start @@ -1233,6 +1226,58 @@ fn write_new_deltalake( Ok(()) } +#[pyfunction] +#[allow(clippy::too_many_arguments)] +fn convert_to_deltalake( + uri: String, + partition_schema: Option>, + partition_strategy: Option, + name: Option, + description: Option, + configuration: Option>>, + storage_options: Option>, + custom_metadata: Option>, +) -> PyResult<()> { + let mut builder = ConvertToDeltaBuilder::new().with_location(uri); + + if let Some(part_schema) = partition_schema { + let schema: StructType = (&part_schema.0).try_into().map_err(PythonError::from)?; + builder = builder.with_partition_schema(schema.fields().clone()); + } + + if let Some(partition_strategy) = &partition_strategy { + let strategy: PartitionStrategy = partition_strategy.parse().map_err(PythonError::from)?; + builder = builder.with_partition_strategy(strategy); + } + + if let Some(name) = &name { + builder = builder.with_table_name(name); + } + + if let Some(description) = &description { + builder = builder.with_comment(description); + } + + if let Some(config) = configuration { + builder = builder.with_configuration(config); + }; + + if let Some(strg_options) = storage_options { + builder = builder.with_storage_options(strg_options); + }; + + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + builder = builder.with_metadata(json_metadata); + }; + + rt()? + .block_on(builder.into_future()) + .map_err(PythonError::from)?; + Ok(()) +} + #[pyclass(name = "DeltaDataChecker", module = "deltalake._internal")] struct PyDeltaDataChecker { inner: DeltaDataChecker, @@ -1278,6 +1323,7 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> { m.add("__version__", env!("CARGO_PKG_VERSION"))?; m.add_function(pyo3::wrap_pyfunction!(rust_core_version, m)?)?; m.add_function(pyo3::wrap_pyfunction!(write_new_deltalake, m)?)?; + m.add_function(pyo3::wrap_pyfunction!(convert_to_deltalake, m)?)?; m.add_function(pyo3::wrap_pyfunction!(write_to_deltalake, m)?)?; m.add_function(pyo3::wrap_pyfunction!(batch_distinct, m)?)?; m.add_class::()?; diff --git a/python/tests/test_convert_to_delta.py b/python/tests/test_convert_to_delta.py new file mode 100644 index 0000000000..29badf3358 --- /dev/null +++ b/python/tests/test_convert_to_delta.py @@ -0,0 +1,97 @@ +import pathlib + +import pyarrow as pa +import pyarrow.dataset as ds +import pytest + +from deltalake import convert_to_deltalake +from deltalake.exceptions import DeltaError +from deltalake.table import DeltaTable + + +def test_local_convert_to_delta(tmp_path: pathlib.Path, sample_data: pa.Table): + ds.write_dataset( + sample_data, + tmp_path, + format="parquet", + existing_data_behavior="overwrite_or_ignore", + ) + + name = "converted_table" + description = "parquet table converted to delta table with delta-rs" + convert_to_deltalake( + tmp_path, + name=name, + description=description, + configuration={"delta.AppendOnly": "True"}, + ) + + dt = DeltaTable(tmp_path) + + assert dt.version() == 0 + assert dt.files() == ["part-0.parquet"] + assert dt.metadata().name == name + assert dt.metadata().description == description + assert dt.metadata().configuration == {"delta.AppendOnly": "True"} + + +def test_convert_delta_write_modes(tmp_path: pathlib.Path, sample_data: pa.Table): + ds.write_dataset( + sample_data, + tmp_path, + format="parquet", + existing_data_behavior="overwrite_or_ignore", + ) + + convert_to_deltalake( + tmp_path, + ) + + with pytest.raises(DeltaError): + convert_to_deltalake( + tmp_path, + ) + + convert_to_deltalake(tmp_path, mode="ignore") + + +def test_convert_delta_with_partitioning(tmp_path: pathlib.Path, sample_data: pa.Table): + ds.write_dataset( + sample_data, + tmp_path, + format="parquet", + existing_data_behavior="overwrite_or_ignore", + partitioning=["utf8"], + partitioning_flavor="hive", + ) + + with pytest.raises( + DeltaError, + match="Generic error: The schema of partition columns must be provided to convert a Parquet table to a Delta table", + ): + convert_to_deltalake( + tmp_path, + ) + with pytest.raises( + ValueError, match="Partition strategy has to be provided with partition_by" + ): + convert_to_deltalake( + tmp_path, + partition_by=pa.schema([pa.field("utf8", pa.string())]), + ) + + with pytest.raises( + ValueError, + match="Currently only `hive` partition strategy is supported to be converted.", + ): + convert_to_deltalake( + tmp_path, + partition_by=pa.schema([pa.field("utf8", pa.string())]), + partition_strategy="directory", + ) + + convert_to_deltalake( + tmp_path, + partition_by=pa.schema([pa.field("utf8", pa.string())]), + partition_strategy="hive", + )