Skip to content

Commit

Permalink
some cleanup and proper asserts for first test
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Jan 31, 2024
1 parent 8dca5b2 commit c2968c8
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 24 deletions.
2 changes: 2 additions & 0 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class DestinationClientDwhConfiguration(DestinationClientConfiguration):
"""How to handle replace disposition for this destination, can be classic or staging"""
merge_strategy: TLoaderMergeStrategy = "merge"
"""How to handle merging, can be classic merge on primary key or merge keys, or scd2"""
load_timestamp: str = None
"""Configurable timestamp for load strategies that record validity dates"""

def normalize_dataset_name(self, schema: Schema) -> str:
"""Builds full db dataset (schema) name out of configured dataset name and schema name: {dataset_name}_{schema.name}. The resulting name is normalized.
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
"merge_key",
]
"""Known hints of a column used to declare hint regexes."""
TWriteDisposition = Literal["skip", "append", "replace", "merge", "scd2"]
TWriteDisposition = Literal["skip", "append", "replace", "merge"]
TTableFormat = Literal["iceberg"]
TTypeDetections = Literal[
"timestamp", "iso_timestamp", "iso_date", "large_integer", "hexbytes_to_text", "wei_to_double"
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/job_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def _create_append_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> L
return []

def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]:
now = pendulum.now()
now = self.config.load_timestamp or pendulum.now().to_iso8601_string()
return [
SqlMergeJob.from_table_chain(
table_chain,
Expand Down
95 changes: 73 additions & 22 deletions tests/load/pipeline/test_scd2_disposition.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import pytest, dlt, os
import pytest, dlt, os, pendulum

from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration
from tests.load.pipeline.utils import (
load_tables_to_dicts,
)


@pytest.mark.parametrize(
Expand All @@ -10,7 +13,9 @@
)
def test_simple_scd2_load(destination_config: DestinationTestConfiguration) -> None:
# use scd2
first_load = pendulum.now()
os.environ["DESTINATION__MERGE_STRATEGY"] = "scd2"
os.environ["DESTINATION__LOAD_TIMESTAMP"] = first_load.to_iso8601_string()

@dlt.resource(name="items", write_disposition="merge")
def load_items():
Expand Down Expand Up @@ -46,6 +51,7 @@ def load_items():

# new version of item 1
# item 3 deleted
# item 2 has a new child
@dlt.resource(name="items", write_disposition="merge")
def load_items_2():
yield from [
Expand All @@ -65,26 +71,71 @@ def load_items_2():
},
]

second_load = pendulum.now()
os.environ["DESTINATION__LOAD_TIMESTAMP"] = second_load.to_iso8601_string()

p.run(load_items_2())

with p.sql_client() as c:
with c.execute_query(
"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'items' ORDER"
" BY ORDINAL_POSITION"
) as cur:
print(cur.fetchall())
with c.execute_query("SELECT * FROM items") as cur:
for row in cur.fetchall():
print(row)

with p.sql_client() as c:
with c.execute_query(
"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME ="
" 'items__children' ORDER BY ORDINAL_POSITION"
) as cur:
print(cur.fetchall())
with c.execute_query("SELECT * FROM items__children") as cur:
for row in cur.fetchall():
print(row)
# print(p.default_schema.to_pretty_yaml())
assert False
tables = load_tables_to_dicts(p, "items", "items__children")
# we should have 4 items in total (3 from the first load and an update of item 1 from the second load)
assert len(tables["items"]) == 4
# 2 should be active (1 and 2)
active_items = [item for item in tables["items"] if item["_dlt_valid_until"] is None]
inactive_items = [item for item in tables["items"] if item["_dlt_valid_until"] is not None]
active_items.sort(key=lambda i: i["id"])
inactive_items.sort(key=lambda i: i["id"])
assert len(active_items) == 2

# changed in the second load
assert active_items[0]["id"] == 1
assert active_items[0]["name"] == "one_new"
assert active_items[0]["_dlt_valid_from"] == second_load

# did not change in the second load
assert active_items[1]["id"] == 2
assert active_items[1]["name"] == "two"
assert active_items[1]["_dlt_valid_from"] == first_load

# was valid between first and second load
assert inactive_items[0]["id"] == 1
assert inactive_items[0]["name"] == "one"
assert inactive_items[0]["_dlt_valid_from"] == first_load
assert inactive_items[0]["_dlt_valid_until"] == second_load

# was valid between first and second load
assert inactive_items[1]["id"] == 3
assert inactive_items[1]["name"] == "three"
assert inactive_items[1]["_dlt_valid_from"] == first_load
assert inactive_items[1]["_dlt_valid_until"] == second_load

# child tables
assert len(tables["items__children"]) == 3
active_child_items = [
item for item in tables["items__children"] if item["_dlt_valid_until"] is None
]
inactive_child_items = [
item for item in tables["items__children"] if item["_dlt_valid_until"] is not None
]
active_child_items.sort(key=lambda i: i["id_of"])
inactive_child_items.sort(key=lambda i: i["id_of"])

assert len(active_child_items) == 1

# the one active child item should be linked to the right parent, was create during 2. load
assert active_child_items[0]["id_of"] == 2
assert active_child_items[0]["name"] == "child2_new"
assert active_child_items[0]["_dlt_parent_id"] == active_items[1]["_dlt_id"]
assert active_child_items[0]["_dlt_valid_from"] == second_load

# check inactive child items
assert inactive_child_items[0]["id_of"] == 2
assert inactive_child_items[0]["name"] == "child2"
assert inactive_child_items[0]["_dlt_parent_id"] == active_items[1]["_dlt_id"]
assert inactive_child_items[0]["_dlt_valid_from"] == first_load
assert inactive_child_items[0]["_dlt_valid_until"] == second_load

assert inactive_child_items[1]["id_of"] == 3
assert inactive_child_items[1]["name"] == "child3"
assert inactive_child_items[1]["_dlt_parent_id"] == inactive_items[1]["_dlt_id"]
assert inactive_child_items[1]["_dlt_valid_from"] == first_load
assert inactive_child_items[1]["_dlt_valid_until"] == second_load

0 comments on commit c2968c8

Please sign in to comment.