Skip to content

Commit

Permalink
formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Oct 7, 2023
1 parent de56cec commit 70dc65a
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 27 deletions.
4 changes: 2 additions & 2 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,8 @@ def merge(
self,
source: Union[pyarrow.Table, pyarrow.RecordBatch],
predicate: str,
source_alias: str = 'source',
target_alias: str = 'target',
source_alias: str = "source",
target_alias: str = "target",
strict_cast: bool = True,
) -> "TableMerger":
"""Pass the source data which you want to merge on the target delta table, providing a
Expand Down
2 changes: 1 addition & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use deltalake::checkpoints::create_checkpoint;
use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::errors::DeltaTableError;
use deltalake::operations::merge::MergeBuilder;
use deltalake::operations::delete::DeleteBuilder;
use deltalake::operations::merge::MergeBuilder;
use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType};
use deltalake::operations::restore::RestoreBuilder;
use deltalake::operations::transaction::commit;
Expand Down
70 changes: 46 additions & 24 deletions python/tests/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ def test_merge_when_matched_delete_wo_predicate(
)

dt.merge(
source=source_table, predicate="t.id = s.id", source_alias="s", target_alias='t',
source=source_table,
predicate="t.id = s.id",
source_alias="s",
target_alias="t",
).when_matched_delete().execute()

nrows = 4
Expand All @@ -34,7 +37,7 @@ def test_merge_when_matched_delete_wo_predicate(
"deleted": pa.array([False] * nrows),
}
)
result = dt.to_pyarrow_table().sort_by([('id','ascending')])
result = dt.to_pyarrow_table().sort_by([("id", "ascending")])
last_action = dt.history(1)[0]

assert last_action["operation"] == "MERGE"
Expand All @@ -58,7 +61,10 @@ def test_merge_when_matched_delete_with_predicate(
)

dt.merge(
source=source_table, predicate="t.id = s.id", source_alias="s", target_alias='t',
source=source_table,
predicate="t.id = s.id",
source_alias="s",
target_alias="t",
).when_matched_delete("s.deleted = True").execute()

nrows = 4
Expand All @@ -70,7 +76,7 @@ def test_merge_when_matched_delete_with_predicate(
"deleted": pa.array([False] * nrows),
}
)
result = dt.to_pyarrow_table().sort_by([('id','ascending')])
result = dt.to_pyarrow_table().sort_by([("id", "ascending")])
last_action = dt.history(1)[0]

assert last_action["operation"] == "MERGE"
Expand All @@ -94,7 +100,10 @@ def test_merge_when_matched_update_wo_predicate(
)

dt.merge(
source=source_table, predicate="t.id = s.id", source_alias="s", target_alias='t',
source=source_table,
predicate="t.id = s.id",
source_alias="s",
target_alias="t",
).when_matched_update({"price": "s.price", "sold": "s.sold"}).execute()

expected = pa.table(
Expand All @@ -105,7 +114,7 @@ def test_merge_when_matched_update_wo_predicate(
"deleted": pa.array([False] * 5),
}
)
result = dt.to_pyarrow_table().sort_by([('id','ascending')])
result = dt.to_pyarrow_table().sort_by([("id", "ascending")])
last_action = dt.history(1)[0]

assert last_action["operation"] == "MERGE"
Expand All @@ -129,7 +138,10 @@ def test_merge_when_matched_update_with_predicate(
)

dt.merge(
source=source_table, source_alias="source", target_alias='target', predicate="target.id = source.id",
source=source_table,
source_alias="source",
target_alias="target",
predicate="target.id = source.id",
).when_matched_update(
updates={"price": "source.price", "sold": "source.sold"},
predicate="source.deleted = False",
Expand All @@ -143,7 +155,7 @@ def test_merge_when_matched_update_with_predicate(
"deleted": pa.array([False] * 5),
}
)
result = dt.to_pyarrow_table().sort_by([('id','ascending')])
result = dt.to_pyarrow_table().sort_by([("id", "ascending")])
last_action = dt.history(1)[0]

assert last_action["operation"] == "MERGE"
Expand All @@ -167,7 +179,10 @@ def test_merge_when_not_matched_insert_wo_predicate(
)

dt.merge(
source=source_table, source_alias="source", target_alias='target', predicate="target.id = source.id"
source=source_table,
source_alias="source",
target_alias="target",
predicate="target.id = source.id",
).when_not_matched_insert(
updates={
"id": "source.id",
Expand All @@ -185,7 +200,7 @@ def test_merge_when_not_matched_insert_wo_predicate(
"deleted": pa.array([False] * 6),
}
)
result = dt.to_pyarrow_table().sort_by([('id','ascending')])
result = dt.to_pyarrow_table().sort_by([("id", "ascending")])
last_action = dt.history(1)[0]

assert last_action["operation"] == "MERGE"
Expand All @@ -209,7 +224,10 @@ def test_merge_when_not_matched_insert_with_predicate(
)

dt.merge(
source=source_table, source_alias="source", target_alias='target', predicate="target.id = source.id"
source=source_table,
source_alias="source",
target_alias="target",
predicate="target.id = source.id",
).when_not_matched_insert(
updates={
"id": "source.id",
Expand All @@ -228,7 +246,7 @@ def test_merge_when_not_matched_insert_with_predicate(
"deleted": pa.array([False] * 6),
}
)
result = dt.to_pyarrow_table().sort_by([('id','ascending')])
result = dt.to_pyarrow_table().sort_by([("id", "ascending")])
last_action = dt.history(1)[0]

assert last_action["operation"] == "MERGE"
Expand All @@ -252,7 +270,10 @@ def test_merge_when_not_matched_by_source_update_wo_predicate(
)

dt.merge(
source=source_table, source_alias="source", target_alias='target', predicate="target.id = source.id"
source=source_table,
source_alias="source",
target_alias="target",
predicate="target.id = source.id",
).when_not_matched_by_source_update(
updates={
"sold": "int'10'",
Expand All @@ -267,7 +288,7 @@ def test_merge_when_not_matched_by_source_update_wo_predicate(
"deleted": pa.array([False] * 5),
}
)
result = dt.to_pyarrow_table().sort_by([('id','ascending')])
result = dt.to_pyarrow_table().sort_by([("id", "ascending")])
last_action = dt.history(1)[0]

assert last_action["operation"] == "MERGE"
Expand All @@ -291,7 +312,10 @@ def test_merge_when_not_matched_by_source_update_with_predicate(
)

dt.merge(
source=source_table, source_alias="source", target_alias='target', predicate="target.id = source.id"
source=source_table,
source_alias="source",
target_alias="target",
predicate="target.id = source.id",
).when_not_matched_by_source_update(
updates={
"sold": "int'10'",
Expand All @@ -307,7 +331,7 @@ def test_merge_when_not_matched_by_source_update_with_predicate(
"deleted": pa.array([False] * 5),
}
)
result = dt.to_pyarrow_table().sort_by([('id','ascending')])
result = dt.to_pyarrow_table().sort_by([("id", "ascending")])
last_action = dt.history(1)[0]

assert last_action["operation"] == "MERGE"
Expand All @@ -331,26 +355,24 @@ def test_merge_when_not_matched_by_source_delete_with_predicate(
)

dt.merge(
source=source_table, source_alias="source", target_alias='target', predicate="target.id = source.id"
source=source_table,
source_alias="source",
target_alias="target",
predicate="target.id = source.id",
).when_not_matched_by_source_delete(predicate="target.price > bigint'3'").execute()

expected = pa.table(
{
"id": pa.array(["1", "2", "3", "4"]),
"price": pa.array(
[
0,
1,
2,
3
],
[0, 1, 2, 3],
pa.int64(),
),
"sold": pa.array([0, 1, 2, 3], pa.int32()),
"deleted": pa.array([False] * 4),
}
)
result = dt.to_pyarrow_table().sort_by([('id','ascending')])
result = dt.to_pyarrow_table().sort_by([("id", "ascending")])
last_action = dt.history(1)[0]

assert last_action["operation"] == "MERGE"
Expand Down

0 comments on commit 70dc65a

Please sign in to comment.