Skip to content

Commit

Permalink
applies NOT NULL when key prop is set when column exists
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed May 12, 2024
1 parent 2be1400 commit 90fde3c
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 17 deletions.
4 changes: 1 addition & 3 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def has_default_column_prop_value(prop: str, value: Any) -> bool:
if prop == "x-active-record-timestamp":
# None is a valid value so it is not a default
return False
return value is None
return value in (None, False)


def remove_column_defaults(column_schema: TColumnSchema) -> TColumnSchema:
Expand Down Expand Up @@ -484,8 +484,6 @@ def hint_to_column_prop(h: TColumnHint) -> TColumnProp:
def get_columns_names_with_prop(
table: TTableSchema, column_prop: Union[TColumnProp, str], include_incomplete: bool = False
) -> List[str]:
# column_prop: TColumnProp = hint_to_column_prop(hint_type)
# default = column_prop != "nullable" # default is true, only for nullable false
return [
c["name"]
for c in table["columns"].values()
Expand Down
1 change: 1 addition & 0 deletions dlt/extract/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def _compute_and_update_table(
computed_table["x-normalizer"] = {"evolve-columns-once": True} # type: ignore[typeddict-unknown-key]
existing_table = self.schema._schema_tables.get(table_name, None)
if existing_table:
# TODO: revise this. computed table should overwrite certain hints (ie. primary and merge keys) completely
diff_table = utils.diff_table(existing_table, computed_table)
else:
diff_table = computed_table
Expand Down
4 changes: 3 additions & 1 deletion dlt/extract/hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,9 @@ def _merge_key(hint: TColumnProp, keys: TColumnNames, partial: TPartialTableSche
keys = [keys]
for key in keys:
if key in partial["columns"]:
merge_column(partial["columns"][key], {hint: True}) # type: ignore
# set nullable to False if not set
nullable = partial["columns"][key].get("nullable", False)
merge_column(partial["columns"][key], {hint: True, "nullable": nullable}) # type: ignore
else:
partial["columns"][key] = new_column(key, nullable=False)
partial["columns"][key][hint] = True
Expand Down
23 changes: 14 additions & 9 deletions tests/common/schema/test_merges.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,22 @@
"foreign_key": True,
"data_type": "text",
"name": "test",
"x-special": True,
"x-special": "value",
"x-special-int": 100,
"nullable": False,
"x-special-bool-true": True,
"x-special-bool": False,
"prop": None,
}

COL_1_HINTS_DEFAULTS: TColumnSchema = { # type: ignore[typeddict-unknown-key]
COL_1_HINTS_NO_DEFAULTS: TColumnSchema = { # type: ignore[typeddict-unknown-key]
"foreign_key": True,
"data_type": "text",
"name": "test",
"x-special": True,
"x-special": "value",
"x-special-int": 100,
"nullable": False,
"x-special-bool": False,
"x-special-bool-true": True,
}

COL_2_HINTS: TColumnSchema = {"nullable": True, "name": "test_2", "primary_key": False}
Expand All @@ -46,7 +47,7 @@
("nullable", True, True),
("nullable", False, False),
("nullable", None, True),
("x-special", False, False),
("x-special", False, True),
("x-special", True, False),
("x-special", None, True),
("unique", False, True),
Expand Down Expand Up @@ -90,7 +91,7 @@ def test_check_column_with_props(prop: str, value: Any, is_default: bool) -> Non
def test_column_remove_defaults() -> None:
clean = utils.remove_column_defaults(copy(COL_1_HINTS))
# mind that nullable default is False and Nones will be removed
assert clean == COL_1_HINTS_DEFAULTS
assert clean == COL_1_HINTS_NO_DEFAULTS
# check nullable True
assert utils.remove_column_defaults(copy(COL_2_HINTS)) == {"name": "test_2"}

Expand All @@ -101,9 +102,11 @@ def test_column_add_defaults() -> None:
assert full["unique"] is False
# remove defaults from full
clean = utils.remove_column_defaults(copy(full))
assert clean == COL_1_HINTS_DEFAULTS
assert clean == COL_1_HINTS_NO_DEFAULTS
# prop is None and will be removed
del full["prop"] # type: ignore[typeddict-item]
# same for x-special-bool
del full["x-special-bool"] # type: ignore[typeddict-item]
assert utils.add_column_defaults(copy(clean)) == full

# test incomplete
Expand Down Expand Up @@ -182,9 +185,10 @@ def test_merge_columns() -> None:
"cluster": False,
"foreign_key": True,
"data_type": "text",
"x-special": True,
"x-special": "value",
"x-special-int": 100,
"x-special-bool": False,
"x-special-bool-true": True,
"prop": None,
}

Expand All @@ -196,9 +200,10 @@ def test_merge_columns() -> None:
"cluster": False,
"foreign_key": True,
"data_type": "text",
"x-special": True,
"x-special": "value",
"x-special-int": 100,
"x-special-bool": False,
"x-special-bool-true": True,
"prop": None,
"primary_key": False,
}
Expand Down
65 changes: 65 additions & 0 deletions tests/extract/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from dlt.common.exceptions import DictValidationException, PipelineStateNotAvailable
from dlt.common.pipeline import StateInjectableContext, source_state
from dlt.common.schema import Schema
from dlt.common.schema.typing import TColumnProp, TColumnSchema
from dlt.common.schema import utils
from dlt.common.typing import TDataItems

from dlt.extract import DltResource, DltSource, Incremental
Expand Down Expand Up @@ -1436,6 +1438,69 @@ def empty_gen():
)


@pytest.mark.parametrize("key_prop", ("primary_key", "merge_key"))
def test_apply_hints_keys(key_prop: TColumnProp) -> None:
def empty_gen():
yield [1, 2, 3]

key_columns = ["id_1", "id_2"]

empty = DltResource.from_data(empty_gen)
# apply compound key
empty.apply_hints(**{key_prop: key_columns}) # type: ignore
table = empty.compute_table_schema()
actual_keys = utils.get_columns_names_with_prop(table, key_prop, include_incomplete=True)
assert actual_keys == key_columns
# nullable is false
actual_keys = utils.get_columns_names_with_prop(table, "nullable", include_incomplete=True)
assert actual_keys == key_columns

# apply new key
key_columns_2 = ["id_1", "id_3"]
empty.apply_hints(**{key_prop: key_columns_2}) # type: ignore
table = empty.compute_table_schema()
actual_keys = utils.get_columns_names_with_prop(table, key_prop, include_incomplete=True)
assert actual_keys == key_columns_2
actual_keys = utils.get_columns_names_with_prop(table, "nullable", include_incomplete=True)
assert actual_keys == key_columns_2

# if column is present for a key, it get merged and nullable should be preserved
id_2_col: TColumnSchema = {
"name": "id_2",
"data_type": "bigint",
}

empty.apply_hints(**{key_prop: key_columns}, columns=[id_2_col]) # type: ignore
table = empty.compute_table_schema()
actual_keys = utils.get_columns_names_with_prop(table, key_prop, include_incomplete=True)
assert set(actual_keys) == set(key_columns)
# nullable not set in id_2_col so NOT NULL is set
actual_keys = utils.get_columns_names_with_prop(table, "nullable", include_incomplete=True)
assert set(actual_keys) == set(key_columns)

id_2_col["nullable"] = True
empty.apply_hints(**{key_prop: key_columns}, columns=[id_2_col]) # type: ignore
table = empty.compute_table_schema()
actual_keys = utils.get_columns_names_with_prop(table, key_prop, include_incomplete=True)
assert set(actual_keys) == set(key_columns)
# id_2 set to NULL
actual_keys = utils.get_columns_names_with_prop(table, "nullable", include_incomplete=True)
assert set(actual_keys) == {"id_1"}

# apply key via schema
key_columns_3 = ["id_2", "id_1", "id_3"]
id_2_col[key_prop] = True

empty = DltResource.from_data(empty_gen)
empty.apply_hints(**{key_prop: key_columns_2}, columns=[id_2_col]) # type: ignore
table = empty.compute_table_schema()
# all 3 columns have the compound key. we do not prevent setting keys via schema
actual_keys = utils.get_columns_names_with_prop(table, key_prop, include_incomplete=True)
assert actual_keys == key_columns_3
actual_keys = utils.get_columns_names_with_prop(table, "nullable", include_incomplete=True)
assert actual_keys == key_columns_2


def test_resource_no_template() -> None:
empty = DltResource.from_data([1, 2, 3], name="table")
assert empty.write_disposition == "append"
Expand Down
14 changes: 10 additions & 4 deletions tests/load/pipeline/test_merge_disposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,10 @@ def test_dedup_sort_hint(destination_config: DestinationTestConfiguration) -> No
name=table_name,
write_disposition="merge",
primary_key="id", # sort hints only have effect when a primary key is provided
columns={"sequence": {"dedup_sort": "desc"}, "id": {"dedup_sort": None}},
columns={
"sequence": {"dedup_sort": "desc", "nullable": False},
"val": {"dedup_sort": None},
},
)
def data_resource(data):
yield data
Expand Down Expand Up @@ -848,7 +851,7 @@ def data_resource(data):
assert sorted(observed, key=lambda d: d["id"]) == expected

# now test "asc" sorting
data_resource.apply_hints(columns={"sequence": {"dedup_sort": "asc"}})
data_resource.apply_hints(columns={"sequence": {"dedup_sort": "asc", "nullable": False}})

info = p.run(data_resource(data), loader_file_format=destination_config.file_format)
assert_load_info(info)
Expand All @@ -867,7 +870,7 @@ def data_resource(data):
table_name = "test_dedup_sort_hint_complex"
data_resource.apply_hints(
table_name=table_name,
columns={"sequence": {"dedup_sort": "desc"}},
columns={"sequence": {"dedup_sort": "desc", "nullable": False}},
)

# three records with same primary key
Expand All @@ -891,7 +894,10 @@ def data_resource(data):
table_name = "test_dedup_sort_hint_with_hard_delete"
data_resource.apply_hints(
table_name=table_name,
columns={"sequence": {"dedup_sort": "desc"}, "deleted": {"hard_delete": True}},
columns={
"sequence": {"dedup_sort": "desc", "nullable": False},
"deleted": {"hard_delete": True},
},
)

# three records with same primary key
Expand Down

0 comments on commit 90fde3c

Please sign in to comment.