From a358aa3f5be70c694025cfbde22fd42095f079b9 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 26 Nov 2023 13:58:58 +0100 Subject: [PATCH] expose create to deltatable class --- python/deltalake/_internal.pyi | 10 +++++++ python/deltalake/table.py | 40 +++++++++++++++++++++++-- python/src/lib.rs | 46 +++++++++++++++++++++++++++++ python/tests/test_create.py | 54 ++++++++++++++++++++++++++++++++++ 4 files changed, 148 insertions(+), 2 deletions(-) create mode 100644 python/tests/test_create.py diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index f751afa36f..badfd077e4 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -150,6 +150,16 @@ def convert_to_deltalake( storage_options: Optional[Dict[str, str]], custom_metadata: Optional[Dict[str, str]], ) -> None: ... +def create_deltalake( + table_uri: str, + schema: pyarrow.Schema, + partition_by: List[str], + mode: str, + name: Optional[str], + description: Optional[str], + configuration: Optional[Mapping[str, Optional[str]]], + storage_options: Optional[Dict[str, str]], +) -> None: ... def batch_distinct(batch: pyarrow.RecordBatch) -> pyarrow.RecordBatch: ... # Can't implement inheritance (see note in src/schema.rs), so this is next diff --git a/python/deltalake/table.py b/python/deltalake/table.py index b238af7929..45446a9b2a 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -12,6 +12,8 @@ Generator, Iterable, List, + Literal, + Mapping, NamedTuple, Optional, Tuple, @@ -35,11 +37,12 @@ from deltalake._internal import DeltaDataChecker as _DeltaDataChecker from deltalake._internal import RawDeltaTable +from deltalake._internal import create_deltalake as _create_deltalake from deltalake._util import encode_partition_value from deltalake.data_catalog import DataCatalog from deltalake.exceptions import DeltaProtocolError from deltalake.fs import DeltaStorageHandler -from deltalake.schema import Schema +from deltalake.schema import Schema as DeltaSchema MAX_SUPPORTED_READER_VERSION = 1 MAX_SUPPORTED_WRITER_VERSION = 2 @@ -295,6 +298,39 @@ def from_data_catalog( table_uri=table_uri, version=version, log_buffer_size=log_buffer_size ) + @classmethod + def create( + cls, + table_uri: Union[str, Path], + schema: Union[pyarrow.Schema, DeltaSchema], + mode: Literal["error", "append", "overwrite", "ignore"] = "error", + partition_by: Optional[Union[List[str], str]] = None, + name: Optional[str] = None, + description: Optional[str] = None, + configuration: Optional[Mapping[str, Optional[str]]] = None, + storage_options: Optional[Dict[str, str]] = None, + ) -> "DeltaTable": + if isinstance(schema, DeltaSchema): + schema = schema.to_pyarrow() + if isinstance(partition_by, str): + partition_by = [partition_by] + + if isinstance(table_uri, Path): + table_uri = str(table_uri) + + _create_deltalake( + table_uri, + schema, + partition_by or [], + mode, + name, + description, + configuration, + storage_options, + ) + + return cls(table_uri=table_uri, storage_options=storage_options) + def version(self) -> int: """ Get the version of the DeltaTable. @@ -410,7 +446,7 @@ def load_with_datetime(self, datetime_string: str) -> None: def table_uri(self) -> str: return self._table.table_uri() - def schema(self) -> Schema: + def schema(self) -> DeltaSchema: """ Get the current schema of the DeltaTable. diff --git a/python/src/lib.rs b/python/src/lib.rs index 69195e866d..9964a76a64 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1126,6 +1126,51 @@ impl From<&PyAddAction> for Add { } } +#[pyfunction] +#[allow(clippy::too_many_arguments)] +fn create_deltalake( + table_uri: String, + schema: PyArrowType, + partition_by: Vec, + mode: String, + name: Option, + description: Option, + configuration: Option>>, + storage_options: Option>, +) -> PyResult<()> { + let table = DeltaTableBuilder::from_uri(table_uri) + .with_storage_options(storage_options.unwrap_or_default()) + .build() + .map_err(PythonError::from)?; + + let mode = mode.parse().map_err(PythonError::from)?; + let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?; + + let mut builder = DeltaOps(table) + .create() + .with_columns(schema.fields().clone()) + .with_save_mode(mode) + .with_partition_columns(partition_by); + + if let Some(name) = &name { + builder = builder.with_table_name(name); + }; + + if let Some(description) = &description { + builder = builder.with_comment(description); + }; + + if let Some(config) = configuration { + builder = builder.with_configuration(config); + }; + + rt()? + .block_on(builder.into_future()) + .map_err(PythonError::from)?; + + Ok(()) +} + #[pyfunction] #[allow(clippy::too_many_arguments)] fn write_new_deltalake( @@ -1269,6 +1314,7 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> { m.add_function(pyo3::wrap_pyfunction!(rust_core_version, m)?)?; m.add_function(pyo3::wrap_pyfunction!(write_new_deltalake, m)?)?; m.add_function(pyo3::wrap_pyfunction!(convert_to_deltalake, m)?)?; + m.add_function(pyo3::wrap_pyfunction!(create_deltalake, m)?)?; m.add_function(pyo3::wrap_pyfunction!(batch_distinct, m)?)?; m.add_class::()?; m.add_class::()?; diff --git a/python/tests/test_create.py b/python/tests/test_create.py new file mode 100644 index 0000000000..a618d741a1 --- /dev/null +++ b/python/tests/test_create.py @@ -0,0 +1,54 @@ +import pathlib + +import pyarrow as pa +import pytest + +from deltalake import DeltaTable +from deltalake.exceptions import DeltaError + + +def test_create_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table): + dt = DeltaTable.create( + tmp_path, + sample_data.schema, + name="test_name", + description="test_desc", + configuration={"delta.appendOnly": "false", "foo": "bar"}, + ) + + metadata = dt.metadata() + + assert metadata.name == "test_name" + assert metadata.description == "test_desc" + assert metadata.configuration == {"delta.appendOnly": "false", "foo": "bar"} + + +def test_create_modes(tmp_path: pathlib.Path, sample_data: pa.Table): + dt = DeltaTable.create(tmp_path, sample_data.schema, mode="error") + last_action = dt.history(1)[0] + + with pytest.raises(DeltaError): + dt = DeltaTable.create(tmp_path, sample_data.schema, mode="error") + + assert last_action["operation"] == "CREATE TABLE" + with pytest.raises(DeltaError): + dt = DeltaTable.create(tmp_path, sample_data.schema, mode="append") + + dt = DeltaTable.create(tmp_path, sample_data.schema, mode="ignore") + assert dt.version() == 0 + + dt = DeltaTable.create(tmp_path, sample_data.schema, mode="overwrite") + assert dt.version() == 1 + + last_action = dt.history(1)[0] + + assert last_action["operation"] == "CREATE OR REPLACE TABLE" + + +def test_create_schema(tmp_path: pathlib.Path, sample_data: pa.Table): + dt = DeltaTable.create( + tmp_path, + sample_data.schema, + ) + + assert dt.schema().to_pyarrow() == sample_data.schema