From 27ec1a30eec206ec756ef32db7455f720b469bf5 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Sun, 12 May 2024 22:24:08 +0200 Subject: [PATCH] applies NOT NULL when key prop is set when column exists --- dlt/common/schema/utils.py | 4 +- dlt/extract/extractors.py | 1 + dlt/extract/hints.py | 4 +- tests/common/schema/test_merges.py | 23 ++++--- tests/extract/test_sources.py | 66 +++++++++++++++++++ tests/load/pipeline/test_merge_disposition.py | 14 ++-- 6 files changed, 95 insertions(+), 17 deletions(-) diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index bd56a3aa74..10b7607a95 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -127,7 +127,7 @@ def has_default_column_prop_value(prop: str, value: Any) -> bool: if prop == "x-active-record-timestamp": # None is a valid value so it is not a default return False - return value is None + return value in (None, False) def remove_column_defaults(column_schema: TColumnSchema) -> TColumnSchema: @@ -484,8 +484,6 @@ def hint_to_column_prop(h: TColumnHint) -> TColumnProp: def get_columns_names_with_prop( table: TTableSchema, column_prop: Union[TColumnProp, str], include_incomplete: bool = False ) -> List[str]: - # column_prop: TColumnProp = hint_to_column_prop(hint_type) - # default = column_prop != "nullable" # default is true, only for nullable false return [ c["name"] for c in table["columns"].values() diff --git a/dlt/extract/extractors.py b/dlt/extract/extractors.py index b4afc5b1f8..bf4879ea4a 100644 --- a/dlt/extract/extractors.py +++ b/dlt/extract/extractors.py @@ -176,6 +176,7 @@ def _compute_and_update_table( computed_table["x-normalizer"] = {"evolve-columns-once": True} # type: ignore[typeddict-unknown-key] existing_table = self.schema._schema_tables.get(table_name, None) if existing_table: + # TODO: revise this. computed table should overwrite certain hints (ie. primary and merge keys) completely diff_table = utils.diff_table(existing_table, computed_table) else: diff_table = computed_table diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 75ad02e3fe..3f17fd64f4 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -392,7 +392,9 @@ def _merge_key(hint: TColumnProp, keys: TColumnNames, partial: TPartialTableSche keys = [keys] for key in keys: if key in partial["columns"]: - merge_column(partial["columns"][key], {hint: True}) # type: ignore + # set nullable to False if not set + nullable = partial["columns"][key].get("nullable", False) + merge_column(partial["columns"][key], {hint: True, "nullable": nullable}) # type: ignore else: partial["columns"][key] = new_column(key, nullable=False) partial["columns"][key][hint] = True diff --git a/tests/common/schema/test_merges.py b/tests/common/schema/test_merges.py index d59dbe1ce1..1159e1a126 100644 --- a/tests/common/schema/test_merges.py +++ b/tests/common/schema/test_merges.py @@ -16,21 +16,22 @@ "foreign_key": True, "data_type": "text", "name": "test", - "x-special": True, + "x-special": "value", "x-special-int": 100, "nullable": False, + "x-special-bool-true": True, "x-special-bool": False, "prop": None, } -COL_1_HINTS_DEFAULTS: TColumnSchema = { # type: ignore[typeddict-unknown-key] +COL_1_HINTS_NO_DEFAULTS: TColumnSchema = { # type: ignore[typeddict-unknown-key] "foreign_key": True, "data_type": "text", "name": "test", - "x-special": True, + "x-special": "value", "x-special-int": 100, "nullable": False, - "x-special-bool": False, + "x-special-bool-true": True, } COL_2_HINTS: TColumnSchema = {"nullable": True, "name": "test_2", "primary_key": False} @@ -46,7 +47,7 @@ ("nullable", True, True), ("nullable", False, False), ("nullable", None, True), - ("x-special", False, False), + ("x-special", False, True), ("x-special", True, False), ("x-special", None, True), ("unique", False, True), @@ -90,7 +91,7 @@ def test_check_column_with_props(prop: str, value: Any, is_default: bool) -> Non def test_column_remove_defaults() -> None: clean = utils.remove_column_defaults(copy(COL_1_HINTS)) # mind that nullable default is False and Nones will be removed - assert clean == COL_1_HINTS_DEFAULTS + assert clean == COL_1_HINTS_NO_DEFAULTS # check nullable True assert utils.remove_column_defaults(copy(COL_2_HINTS)) == {"name": "test_2"} @@ -101,9 +102,11 @@ def test_column_add_defaults() -> None: assert full["unique"] is False # remove defaults from full clean = utils.remove_column_defaults(copy(full)) - assert clean == COL_1_HINTS_DEFAULTS + assert clean == COL_1_HINTS_NO_DEFAULTS # prop is None and will be removed del full["prop"] # type: ignore[typeddict-item] + # same for x-special-bool + del full["x-special-bool"] # type: ignore[typeddict-item] assert utils.add_column_defaults(copy(clean)) == full # test incomplete @@ -182,9 +185,10 @@ def test_merge_columns() -> None: "cluster": False, "foreign_key": True, "data_type": "text", - "x-special": True, + "x-special": "value", "x-special-int": 100, "x-special-bool": False, + "x-special-bool-true": True, "prop": None, } @@ -196,9 +200,10 @@ def test_merge_columns() -> None: "cluster": False, "foreign_key": True, "data_type": "text", - "x-special": True, + "x-special": "value", "x-special-int": 100, "x-special-bool": False, + "x-special-bool-true": True, "prop": None, "primary_key": False, } diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index 5dd3d6c3ca..308b65bd37 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -9,6 +9,8 @@ from dlt.common.exceptions import DictValidationException, PipelineStateNotAvailable from dlt.common.pipeline import StateInjectableContext, source_state from dlt.common.schema import Schema +from dlt.common.schema.typing import TColumnProp, TColumnSchema +from dlt.common.schema import utils from dlt.common.typing import TDataItems from dlt.extract import DltResource, DltSource, Incremental @@ -1314,6 +1316,7 @@ def empty_gen(): assert empty_r.compute_table_schema()["columns"]["tags"] == { "data_type": "complex", "name": "tags", + "nullable": False, # NOT NULL because `tags` do not define it "primary_key": True, "merge_key": True, } @@ -1436,6 +1439,69 @@ def empty_gen(): ) +@pytest.mark.parametrize("key_prop", ("primary_key", "merge_key")) +def test_apply_hints_keys(key_prop: TColumnProp) -> None: + def empty_gen(): + yield [1, 2, 3] + + key_columns = ["id_1", "id_2"] + + empty = DltResource.from_data(empty_gen) + # apply compound key + empty.apply_hints(**{key_prop: key_columns}) # type: ignore + table = empty.compute_table_schema() + actual_keys = utils.get_columns_names_with_prop(table, key_prop, include_incomplete=True) + assert actual_keys == key_columns + # nullable is false + actual_keys = utils.get_columns_names_with_prop(table, "nullable", include_incomplete=True) + assert actual_keys == key_columns + + # apply new key + key_columns_2 = ["id_1", "id_3"] + empty.apply_hints(**{key_prop: key_columns_2}) # type: ignore + table = empty.compute_table_schema() + actual_keys = utils.get_columns_names_with_prop(table, key_prop, include_incomplete=True) + assert actual_keys == key_columns_2 + actual_keys = utils.get_columns_names_with_prop(table, "nullable", include_incomplete=True) + assert actual_keys == key_columns_2 + + # if column is present for a key, it get merged and nullable should be preserved + id_2_col: TColumnSchema = { + "name": "id_2", + "data_type": "bigint", + } + + empty.apply_hints(**{key_prop: key_columns}, columns=[id_2_col]) # type: ignore + table = empty.compute_table_schema() + actual_keys = utils.get_columns_names_with_prop(table, key_prop, include_incomplete=True) + assert set(actual_keys) == set(key_columns) + # nullable not set in id_2_col so NOT NULL is set + actual_keys = utils.get_columns_names_with_prop(table, "nullable", include_incomplete=True) + assert set(actual_keys) == set(key_columns) + + id_2_col["nullable"] = True + empty.apply_hints(**{key_prop: key_columns}, columns=[id_2_col]) # type: ignore + table = empty.compute_table_schema() + actual_keys = utils.get_columns_names_with_prop(table, key_prop, include_incomplete=True) + assert set(actual_keys) == set(key_columns) + # id_2 set to NULL + actual_keys = utils.get_columns_names_with_prop(table, "nullable", include_incomplete=True) + assert set(actual_keys) == {"id_1"} + + # apply key via schema + key_columns_3 = ["id_2", "id_1", "id_3"] + id_2_col[key_prop] = True + + empty = DltResource.from_data(empty_gen) + empty.apply_hints(**{key_prop: key_columns_2}, columns=[id_2_col]) # type: ignore + table = empty.compute_table_schema() + # all 3 columns have the compound key. we do not prevent setting keys via schema + actual_keys = utils.get_columns_names_with_prop(table, key_prop, include_incomplete=True) + assert actual_keys == key_columns_3 + actual_keys = utils.get_columns_names_with_prop(table, "nullable", include_incomplete=True) + assert actual_keys == key_columns_2 + + def test_resource_no_template() -> None: empty = DltResource.from_data([1, 2, 3], name="table") assert empty.write_disposition == "append" diff --git a/tests/load/pipeline/test_merge_disposition.py b/tests/load/pipeline/test_merge_disposition.py index 1785ae5882..f4e039ee81 100644 --- a/tests/load/pipeline/test_merge_disposition.py +++ b/tests/load/pipeline/test_merge_disposition.py @@ -820,7 +820,10 @@ def test_dedup_sort_hint(destination_config: DestinationTestConfiguration) -> No name=table_name, write_disposition="merge", primary_key="id", # sort hints only have effect when a primary key is provided - columns={"sequence": {"dedup_sort": "desc"}, "id": {"dedup_sort": None}}, + columns={ + "sequence": {"dedup_sort": "desc", "nullable": False}, + "val": {"dedup_sort": None}, + }, ) def data_resource(data): yield data @@ -848,7 +851,7 @@ def data_resource(data): assert sorted(observed, key=lambda d: d["id"]) == expected # now test "asc" sorting - data_resource.apply_hints(columns={"sequence": {"dedup_sort": "asc"}}) + data_resource.apply_hints(columns={"sequence": {"dedup_sort": "asc", "nullable": False}}) info = p.run(data_resource(data), loader_file_format=destination_config.file_format) assert_load_info(info) @@ -867,7 +870,7 @@ def data_resource(data): table_name = "test_dedup_sort_hint_complex" data_resource.apply_hints( table_name=table_name, - columns={"sequence": {"dedup_sort": "desc"}}, + columns={"sequence": {"dedup_sort": "desc", "nullable": False}}, ) # three records with same primary key @@ -891,7 +894,10 @@ def data_resource(data): table_name = "test_dedup_sort_hint_with_hard_delete" data_resource.apply_hints( table_name=table_name, - columns={"sequence": {"dedup_sort": "desc"}, "deleted": {"hard_delete": True}}, + columns={ + "sequence": {"dedup_sort": "desc", "nullable": False}, + "deleted": {"hard_delete": True}, + }, ) # three records with same primary key