Skip to content

Commit

Permalink
expose create to deltatable class
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Nov 26, 2023
1 parent 8ca8d65 commit a358aa3
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 2 deletions.
10 changes: 10 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,16 @@ def convert_to_deltalake(
storage_options: Optional[Dict[str, str]],
custom_metadata: Optional[Dict[str, str]],
) -> None: ...
def create_deltalake(
table_uri: str,
schema: pyarrow.Schema,
partition_by: List[str],
mode: str,
name: Optional[str],
description: Optional[str],
configuration: Optional[Mapping[str, Optional[str]]],
storage_options: Optional[Dict[str, str]],
) -> None: ...
def batch_distinct(batch: pyarrow.RecordBatch) -> pyarrow.RecordBatch: ...

# Can't implement inheritance (see note in src/schema.rs), so this is next
Expand Down
40 changes: 38 additions & 2 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
Generator,
Iterable,
List,
Literal,
Mapping,
NamedTuple,
Optional,
Tuple,
Expand All @@ -35,11 +37,12 @@

from deltalake._internal import DeltaDataChecker as _DeltaDataChecker
from deltalake._internal import RawDeltaTable
from deltalake._internal import create_deltalake as _create_deltalake
from deltalake._util import encode_partition_value
from deltalake.data_catalog import DataCatalog
from deltalake.exceptions import DeltaProtocolError
from deltalake.fs import DeltaStorageHandler
from deltalake.schema import Schema
from deltalake.schema import Schema as DeltaSchema

MAX_SUPPORTED_READER_VERSION = 1
MAX_SUPPORTED_WRITER_VERSION = 2
Expand Down Expand Up @@ -295,6 +298,39 @@ def from_data_catalog(
table_uri=table_uri, version=version, log_buffer_size=log_buffer_size
)

@classmethod
def create(
cls,
table_uri: Union[str, Path],
schema: Union[pyarrow.Schema, DeltaSchema],
mode: Literal["error", "append", "overwrite", "ignore"] = "error",
partition_by: Optional[Union[List[str], str]] = None,
name: Optional[str] = None,
description: Optional[str] = None,
configuration: Optional[Mapping[str, Optional[str]]] = None,
storage_options: Optional[Dict[str, str]] = None,
) -> "DeltaTable":
if isinstance(schema, DeltaSchema):
schema = schema.to_pyarrow()
if isinstance(partition_by, str):
partition_by = [partition_by]

if isinstance(table_uri, Path):
table_uri = str(table_uri)

_create_deltalake(
table_uri,
schema,
partition_by or [],
mode,
name,
description,
configuration,
storage_options,
)

return cls(table_uri=table_uri, storage_options=storage_options)

def version(self) -> int:
"""
Get the version of the DeltaTable.
Expand Down Expand Up @@ -410,7 +446,7 @@ def load_with_datetime(self, datetime_string: str) -> None:
def table_uri(self) -> str:
return self._table.table_uri()

def schema(self) -> Schema:
def schema(self) -> DeltaSchema:
"""
Get the current schema of the DeltaTable.
Expand Down
46 changes: 46 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,51 @@ impl From<&PyAddAction> for Add {
}
}

#[pyfunction]
#[allow(clippy::too_many_arguments)]
fn create_deltalake(
table_uri: String,
schema: PyArrowType<ArrowSchema>,
partition_by: Vec<String>,
mode: String,
name: Option<String>,
description: Option<String>,
configuration: Option<HashMap<String, Option<String>>>,
storage_options: Option<HashMap<String, String>>,
) -> PyResult<()> {
let table = DeltaTableBuilder::from_uri(table_uri)
.with_storage_options(storage_options.unwrap_or_default())
.build()
.map_err(PythonError::from)?;

let mode = mode.parse().map_err(PythonError::from)?;
let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?;

let mut builder = DeltaOps(table)
.create()
.with_columns(schema.fields().clone())
.with_save_mode(mode)
.with_partition_columns(partition_by);

if let Some(name) = &name {
builder = builder.with_table_name(name);
};

if let Some(description) = &description {
builder = builder.with_comment(description);
};

if let Some(config) = configuration {
builder = builder.with_configuration(config);
};

rt()?
.block_on(builder.into_future())
.map_err(PythonError::from)?;

Ok(())
}

#[pyfunction]
#[allow(clippy::too_many_arguments)]
fn write_new_deltalake(
Expand Down Expand Up @@ -1269,6 +1314,7 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(pyo3::wrap_pyfunction!(rust_core_version, m)?)?;
m.add_function(pyo3::wrap_pyfunction!(write_new_deltalake, m)?)?;
m.add_function(pyo3::wrap_pyfunction!(convert_to_deltalake, m)?)?;
m.add_function(pyo3::wrap_pyfunction!(create_deltalake, m)?)?;
m.add_function(pyo3::wrap_pyfunction!(batch_distinct, m)?)?;
m.add_class::<RawDeltaTable>()?;
m.add_class::<RawDeltaTableMetaData>()?;
Expand Down
54 changes: 54 additions & 0 deletions python/tests/test_create.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import pathlib

import pyarrow as pa
import pytest

from deltalake import DeltaTable
from deltalake.exceptions import DeltaError


def test_create_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table):
dt = DeltaTable.create(
tmp_path,
sample_data.schema,
name="test_name",
description="test_desc",
configuration={"delta.appendOnly": "false", "foo": "bar"},
)

metadata = dt.metadata()

assert metadata.name == "test_name"
assert metadata.description == "test_desc"
assert metadata.configuration == {"delta.appendOnly": "false", "foo": "bar"}


def test_create_modes(tmp_path: pathlib.Path, sample_data: pa.Table):
dt = DeltaTable.create(tmp_path, sample_data.schema, mode="error")
last_action = dt.history(1)[0]

with pytest.raises(DeltaError):
dt = DeltaTable.create(tmp_path, sample_data.schema, mode="error")

assert last_action["operation"] == "CREATE TABLE"
with pytest.raises(DeltaError):
dt = DeltaTable.create(tmp_path, sample_data.schema, mode="append")

dt = DeltaTable.create(tmp_path, sample_data.schema, mode="ignore")
assert dt.version() == 0

dt = DeltaTable.create(tmp_path, sample_data.schema, mode="overwrite")
assert dt.version() == 1

last_action = dt.history(1)[0]

assert last_action["operation"] == "CREATE OR REPLACE TABLE"


def test_create_schema(tmp_path: pathlib.Path, sample_data: pa.Table):
dt = DeltaTable.create(
tmp_path,
sample_data.schema,
)

assert dt.schema().to_pyarrow() == sample_data.schema

0 comments on commit a358aa3

Please sign in to comment.