Skip to content

Commit

Permalink
rebase to latest changes to write_deltalake
Browse files Browse the repository at this point in the history
Signed-off-by: Nikolay Ulmasov <[email protected]>
  • Loading branch information
r3stl355 committed Nov 29, 2023
1 parent c0635c5 commit cde9887
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 18 deletions.
37 changes: 20 additions & 17 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ def write_deltalake(
if isinstance(partition_by, str):
partition_by = [partition_by]

if isinstance(schema, Schema):
schema = schema.to_pyarrow()

if isinstance(data, RecordBatchReader):
data = convert_pyarrow_recordbatchreader(data, large_dtypes)
elif isinstance(data, pa.RecordBatch):
Expand Down Expand Up @@ -336,24 +339,24 @@ def _large_to_normal_dtype(dtype: pa.DataType) -> pa.DataType:
except KeyError:
return dtype

if partition_by:
table_schema: pa.Schema = schema
if PYARROW_MAJOR_VERSION < 12:
partition_schema = pa.schema(
[
pa.field(
name, _large_to_normal_dtype(table_schema.field(name).type)
)
for name in partition_by
]
)
if partition_by:
table_schema: pa.Schema = schema
if PYARROW_MAJOR_VERSION < 12:
partition_schema = pa.schema(
[
pa.field(
name, _large_to_normal_dtype(table_schema.field(name).type)
)
for name in partition_by
]
)
else:
partition_schema = pa.schema(
[table_schema.field(name) for name in partition_by]
)
partitioning = ds.partitioning(partition_schema, flavor="hive")
else:
partition_schema = pa.schema(
[table_schema.field(name) for name in partition_by]
)
partitioning = ds.partitioning(partition_schema, flavor="hive")
else:
partitioning = None
partitioning = None

add_actions: List[AddAction] = []

Expand Down
2 changes: 1 addition & 1 deletion python/tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from pyarrow.lib import RecordBatchReader

from deltalake import DeltaTable, Schema, write_deltalake
from deltalake.exceptions import CommitFailedError, DeltaProtocolError
from deltalake.exceptions import CommitFailedError, DeltaError, DeltaProtocolError
from deltalake.table import ProtocolVersions
from deltalake.writer import try_get_table_and_table_uri

Expand Down

0 comments on commit cde9887

Please sign in to comment.