Skip to content

Commit

Permalink
handle delta arrow load id partition column merge disposition
Browse files Browse the repository at this point in the history
  • Loading branch information
jorritsandbrink committed Nov 4, 2024
1 parent 7389bbd commit 73a8f8e
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
3 changes: 2 additions & 1 deletion dlt/common/libs/deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,10 @@ def merge_delta_table(
primary_keys = get_columns_names_with_prop(schema, "primary_key")
predicate = " AND ".join([f"target.{c} = source.{c}" for c in primary_keys])

partition_by = get_columns_names_with_prop(schema, "partition")
qry = (
table.merge(
source=ensure_delta_compatible_arrow_data(data),
source=ensure_delta_compatible_arrow_data(data, partition_by),
predicate=predicate,
source_alias="source",
target_alias="target",
Expand Down
15 changes: 15 additions & 0 deletions tests/load/pipeline/test_filesystem_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ def test_delta_table_partitioning_arrow_load_id(

pipeline = destination_config.setup_pipeline("fs_pipe", dev_mode=True)

# append write disposition
info = pipeline.run(
pyarrow.table({"foo": [1]}),
table_name="delta_table",
Expand All @@ -621,6 +622,20 @@ def test_delta_table_partitioning_arrow_load_id(
assert dt.metadata().partition_columns == ["_dlt_load_id"]
assert load_table_counts(pipeline, "delta_table")["delta_table"] == 1

# merge write disposition
info = pipeline.run(
pyarrow.table({"foo": [1, 2]}),
table_name="delta_table",
write_disposition={"disposition": "merge", "strategy": "upsert"},
columns={"_dlt_load_id": {"partition": True}},
primary_key="foo",
table_format="delta",
)
assert_load_info(info)
dt = get_delta_tables(pipeline, "delta_table")["delta_table"]
assert dt.metadata().partition_columns == ["_dlt_load_id"]
assert load_table_counts(pipeline, "delta_table")["delta_table"] == 2


@pytest.mark.essential
@pytest.mark.parametrize(
Expand Down

0 comments on commit 73a8f8e

Please sign in to comment.