Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable delta partitioning on arrow normalizer load id #2022

Merged
merged 3 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions dlt/common/libs/deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
)


def ensure_delta_compatible_arrow_schema(schema: pa.Schema) -> pa.Schema:
def ensure_delta_compatible_arrow_schema(
schema: pa.Schema,
partition_by: Optional[Union[List[str], str]] = None,
) -> pa.Schema:
"""Returns Arrow schema compatible with Delta table format.

Casts schema to replace data types not supported by Delta.
Expand All @@ -35,12 +38,24 @@ def ensure_delta_compatible_arrow_schema(schema: pa.Schema) -> pa.Schema:
pa.types.is_time: pa.string(),
pa.types.is_decimal256: pa.string(), # pyarrow does not allow downcasting to decimal128
}

# partition fields can't be dictionary: https://github.com/delta-io/delta-rs/issues/2969
if partition_by is not None:
if isinstance(partition_by, str):
partition_by = [partition_by]
if any(pa.types.is_dictionary(schema.field(col).type) for col in partition_by):
# cast all dictionary fields to string — this is rogue because
# 1. dictionary value type is disregarded
# 2. any non-partition dictionary fields are cast too
ARROW_TO_DELTA_COMPATIBLE_ARROW_TYPE_MAP[pa.types.is_dictionary] = pa.string()

# NOTE: also consider calling _convert_pa_schema_to_delta() from delta.schema which casts unsigned types
return cast_arrow_schema_types(schema, ARROW_TO_DELTA_COMPATIBLE_ARROW_TYPE_MAP)


def ensure_delta_compatible_arrow_data(
data: Union[pa.Table, pa.RecordBatchReader]
data: Union[pa.Table, pa.RecordBatchReader],
partition_by: Optional[Union[List[str], str]] = None,
) -> Union[pa.Table, pa.RecordBatchReader]:
"""Returns Arrow data compatible with Delta table format.

Expand All @@ -53,7 +68,7 @@ def ensure_delta_compatible_arrow_data(
version="17.0.0",
msg="`pyarrow>=17.0.0` is needed for `delta` table format on `filesystem` destination.",
)
schema = ensure_delta_compatible_arrow_schema(data.schema)
schema = ensure_delta_compatible_arrow_schema(data.schema, partition_by)
return data.cast(schema)


Expand Down Expand Up @@ -87,7 +102,7 @@ def write_delta_table(
# is released
write_deltalake( # type: ignore[call-overload]
table_or_uri=table_or_uri,
data=ensure_delta_compatible_arrow_data(data),
data=ensure_delta_compatible_arrow_data(data, partition_by),
partition_by=partition_by,
mode=get_delta_write_mode(write_disposition),
schema_mode="merge", # enable schema evolution (adding new columns)
Expand Down Expand Up @@ -116,9 +131,10 @@ def merge_delta_table(
primary_keys = get_columns_names_with_prop(schema, "primary_key")
predicate = " AND ".join([f"target.{c} = source.{c}" for c in primary_keys])

partition_by = get_columns_names_with_prop(schema, "partition")
qry = (
table.merge(
source=ensure_delta_compatible_arrow_data(data),
source=ensure_delta_compatible_arrow_data(data, partition_by),
predicate=predicate,
source_alias="source",
target_alias="target",
Expand Down
51 changes: 51 additions & 0 deletions tests/load/pipeline/test_filesystem_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,57 @@ def two_part():
assert dt.metadata().partition_columns == []


@pytest.mark.parametrize(
"destination_config",
destinations_configs(
table_format_filesystem_configs=True,
with_table_format="delta",
bucket_subset=(FILE_BUCKET),
),
ids=lambda x: x.name,
)
def test_delta_table_partitioning_arrow_load_id(
destination_config: DestinationTestConfiguration,
) -> None:
"""Tests partitioning on load id column added by Arrow normalizer.

Case needs special handling because of bug in delta-rs:
https://github.com/delta-io/delta-rs/issues/2969
"""
from dlt.common.libs.pyarrow import pyarrow
from dlt.common.libs.deltalake import get_delta_tables

os.environ["NORMALIZE__PARQUET_NORMALIZER__ADD_DLT_LOAD_ID"] = "true"

pipeline = destination_config.setup_pipeline("fs_pipe", dev_mode=True)

# append write disposition
info = pipeline.run(
pyarrow.table({"foo": [1]}),
table_name="delta_table",
columns={"_dlt_load_id": {"partition": True}},
table_format="delta",
)
assert_load_info(info)
dt = get_delta_tables(pipeline, "delta_table")["delta_table"]
assert dt.metadata().partition_columns == ["_dlt_load_id"]
assert load_table_counts(pipeline, "delta_table")["delta_table"] == 1

# merge write disposition
info = pipeline.run(
pyarrow.table({"foo": [1, 2]}),
table_name="delta_table",
write_disposition={"disposition": "merge", "strategy": "upsert"},
columns={"_dlt_load_id": {"partition": True}},
primary_key="foo",
table_format="delta",
)
assert_load_info(info)
dt = get_delta_tables(pipeline, "delta_table")["delta_table"]
assert dt.metadata().partition_columns == ["_dlt_load_id"]
assert load_table_counts(pipeline, "delta_table")["delta_table"] == 2


@pytest.mark.essential
@pytest.mark.parametrize(
"destination_config",
Expand Down
Loading