diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index 1742ade224..441f81c7fb 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -120,6 +120,7 @@ def _add_dynamic_table(resource: DltResource, data_item: TDataItem = None, table """ Computes new table and does contract checks """ + # TODO: We have to normalize table identifiers here table = resource.compute_table_schema(data_item) if table_name: table["name"] = table_name @@ -142,8 +143,8 @@ def _add_dynamic_table(resource: DltResource, data_item: TDataItem = None, table if not checked_table: disallowed_tables.add(table["name"]) return False - - dynamic_tables[table_name] = [checked_table] + + dynamic_tables[checked_table["name"]] = [checked_table] return True # yield from all selected pipes diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 1c8289a74b..0c75bc15d4 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -863,8 +863,11 @@ def _extract_source(self, storage: ExtractorStorage, source: DltSource, max_para # if source schema does not exist in the pipeline if source_schema.name not in self._schema_storage: - # save new schema into the pipeline - self._schema_storage.save_schema(Schema(source_schema.name)) + # TODO: here we should create a new schema but copy hints and possibly other settings + # over from the schema table. Is this the right way? + new_schema = Schema(source_schema.name) + new_schema._settings = source_schema._settings + self._schema_storage.save_schema(new_schema) # and set as default if this is first schema in pipeline if not self.default_schema_name: @@ -879,7 +882,9 @@ def _extract_source(self, storage: ExtractorStorage, source: DltSource, max_para extract_id = extract_with_schema(storage, source, pipeline_schema, self.collector, max_parallel_items, workers) + # initialize import with fully discovered schema + # TODO: is this the right location for this? self._schema_storage.save_import_schema_if_not_exists(source_schema) # update the pipeline schema with all tables and contract settings diff --git a/tests/common/cases/schemas/eth/ethereum_schema_v7.yml b/tests/common/cases/schemas/eth/ethereum_schema_v7.yml index 5a8db47163..f8645d78ae 100644 --- a/tests/common/cases/schemas/eth/ethereum_schema_v7.yml +++ b/tests/common/cases/schemas/eth/ethereum_schema_v7.yml @@ -1,5 +1,5 @@ -version: 14 -version_hash: VuzNqiLOk7XuPxYLndMFMPHTDVItKU5ayiy70nQLdus= +version: 15 +version_hash: yjMtV4Zv0IJlfR5DPMwuXxGg8BRhy7E79L26XAHWEGE= engine_version: 7 name: ethereum tables: diff --git a/tests/common/schema/test_contract_mode_functions.py b/tests/common/schema/test_contract_mode_functions.py index 155ad918e2..6091c57e6f 100644 --- a/tests/common/schema/test_contract_mode_functions.py +++ b/tests/common/schema/test_contract_mode_functions.py @@ -2,6 +2,7 @@ import copy from dlt.common.schema import Schema, DEFAULT_SCHEMA_CONTRACT_MODE +from dlt.common.schema.schema import resolve_contract_settings_for_table from dlt.common.schema.exceptions import SchemaFrozenException @@ -59,18 +60,18 @@ def test_resolve_contract_settings() -> None: # defaults schema = get_schema() - assert schema.resolve_contract_settings_for_table(None, "tables") == DEFAULT_SCHEMA_CONTRACT_MODE - assert schema.resolve_contract_settings_for_table("tables", "child_table") == DEFAULT_SCHEMA_CONTRACT_MODE + assert resolve_contract_settings_for_table(None, "tables", schema) == DEFAULT_SCHEMA_CONTRACT_MODE + assert resolve_contract_settings_for_table("tables", "child_table", schema) == DEFAULT_SCHEMA_CONTRACT_MODE # table specific full setting schema = get_schema() schema.tables["tables"]["schema_contract"] = "freeze" - assert schema.resolve_contract_settings_for_table(None, "tables") == { + assert resolve_contract_settings_for_table(None, "tables", schema) == { "tables": "freeze", "columns": "freeze", "data_type": "freeze" } - assert schema.resolve_contract_settings_for_table("tables", "child_table") == { + assert resolve_contract_settings_for_table("tables", "child_table", schema) == { "tables": "freeze", "columns": "freeze", "data_type": "freeze" @@ -82,12 +83,12 @@ def test_resolve_contract_settings() -> None: "tables": "freeze", "columns": "discard_value", } - assert schema.resolve_contract_settings_for_table(None, "tables") == { + assert resolve_contract_settings_for_table(None, "tables", schema) == { "tables": "freeze", "columns": "discard_value", "data_type": "evolve" } - assert schema.resolve_contract_settings_for_table("tables", "child_table") == { + assert resolve_contract_settings_for_table("tables", "child_table", schema) == { "tables": "freeze", "columns": "discard_value", "data_type": "evolve" @@ -96,12 +97,12 @@ def test_resolve_contract_settings() -> None: # schema specific full setting schema = get_schema() schema._settings["schema_contract"] = "freeze" - assert schema.resolve_contract_settings_for_table(None, "tables") == { + assert resolve_contract_settings_for_table(None, "tables", schema) == { "tables": "freeze", "columns": "freeze", "data_type": "freeze" } - assert schema.resolve_contract_settings_for_table("tables", "child_table") == { + assert resolve_contract_settings_for_table("tables", "child_table", schema) == { "tables": "freeze", "columns": "freeze", "data_type": "freeze" @@ -113,35 +114,79 @@ def test_resolve_contract_settings() -> None: "tables": "freeze", "columns": "discard_value", } - assert schema.resolve_contract_settings_for_table(None, "tables") == { + assert resolve_contract_settings_for_table(None, "tables", schema) == { "tables": "freeze", "columns": "discard_value", "data_type": "evolve" } - assert schema.resolve_contract_settings_for_table("tables", "child_table") == { + assert resolve_contract_settings_for_table("tables", "child_table", schema) == { "tables": "freeze", "columns": "discard_value", "data_type": "evolve" } - # mixed settings + # mixed settings: table setting always prevails schema = get_schema() schema._settings["schema_contract"] = "freeze" schema.tables["tables"]["schema_contract"] = { "tables": "evolve", "columns": "discard_value", } - assert schema.resolve_contract_settings_for_table(None, "tables") == { + assert resolve_contract_settings_for_table(None, "tables", schema) == { "tables": "evolve", "columns": "discard_value", - "data_type": "freeze" + "data_type": "evolve" } - assert schema.resolve_contract_settings_for_table("tables", "child_table") == { + assert resolve_contract_settings_for_table("tables", "child_table", schema) == { "tables": "evolve", "columns": "discard_value", + "data_type": "evolve" + } + + # current and incoming schema + current_schema = get_schema() + current_schema._settings["schema_contract"] = "discard_value" + incoming_schema = get_schema() + incoming_schema._settings["schema_contract"] = "discard_row" + incoming_table = {"name": "incomplete_table", "schema_contract": "freeze"} + + + # incoming schema overrides + assert resolve_contract_settings_for_table(None, "tables", current_schema, incoming_schema) == { + "tables": "discard_row", + "columns": "discard_row", + "data_type": "discard_row" + } + + # direct incoming table overrides + assert resolve_contract_settings_for_table(None, "tables", current_schema, incoming_schema, incoming_table) == { + "tables": "freeze", + "columns": "freeze", "data_type": "freeze" } + # table defined on existing schema overrided incoming schema setting + current_schema.tables["tables"]["schema_contract"] = "discard_value" + assert resolve_contract_settings_for_table(None, "tables", current_schema, incoming_schema) == { + "tables": "discard_value", + "columns": "discard_value", + "data_type": "discard_value" + } + + # but table on incoming schema overrides again + incoming_schema.tables["tables"]["schema_contract"] = "discard_row" + assert resolve_contract_settings_for_table(None, "tables", current_schema, incoming_schema) == { + "tables": "discard_row", + "columns": "discard_row", + "data_type": "discard_row" + } + + # incoming table still overrides all + assert resolve_contract_settings_for_table(None, "tables", current_schema, incoming_schema, incoming_table) == { + "tables": "freeze", + "columns": "freeze", + "data_type": "freeze" + } # ensure other settings do not interfere with the main setting we are testing base_settings = [{ @@ -178,12 +223,12 @@ def test_check_adding_table(base_settings) -> None: # # check adding new table # - assert schema.apply_schema_contract({**base_settings, **{"tables": "evolve"}}, "new_table", data, new_table, False) == (data, new_table) - assert schema.apply_schema_contract({**base_settings, **{"tables": "discard_row"}}, "new_table", data, new_table, False) == (None, None) - assert schema.apply_schema_contract({**base_settings, **{"tables": "discard_value"}}, "new_table", data, new_table, False) == (None, None) + assert schema.apply_schema_contract({**base_settings, **{"tables": "evolve"}}, "new_table", data, new_table) == (data, new_table) + assert schema.apply_schema_contract({**base_settings, **{"tables": "discard_row"}}, "new_table", data, new_table) == (None, None) + assert schema.apply_schema_contract({**base_settings, **{"tables": "discard_value"}}, "new_table", data, new_table) == (None, None) with pytest.raises(SchemaFrozenException): - schema.apply_schema_contract({**base_settings, **{"tables": "freeze"}}, "new_table", data, new_table, False) + schema.apply_schema_contract({**base_settings, **{"tables": "freeze"}}, "new_table", data, new_table) @pytest.mark.parametrize("base_settings", base_settings) @@ -213,12 +258,12 @@ def test_check_adding_new_columns(base_settings) -> None: popped_table_update = copy.deepcopy(table_update) popped_table_update["columns"].pop("new_column") - assert schema.apply_schema_contract({**base_settings, **{"columns": "evolve"}}, "tables", copy.deepcopy(data_with_new_row), table_update, True) == (data_with_new_row, table_update) - assert schema.apply_schema_contract({**base_settings, **{"columns": "discard_row"}}, "tables", copy.deepcopy(data_with_new_row), table_update, True) == (None, None) - assert schema.apply_schema_contract({**base_settings, **{"columns": "discard_value"}}, "tables", copy.deepcopy(data_with_new_row), table_update, True) == (data, popped_table_update) + assert schema.apply_schema_contract({**base_settings, **{"columns": "evolve"}}, "tables", copy.deepcopy(data_with_new_row), table_update) == (data_with_new_row, table_update) + assert schema.apply_schema_contract({**base_settings, **{"columns": "discard_row"}}, "tables", copy.deepcopy(data_with_new_row), table_update) == (None, None) + assert schema.apply_schema_contract({**base_settings, **{"columns": "discard_value"}}, "tables", copy.deepcopy(data_with_new_row), table_update) == (data, popped_table_update) with pytest.raises(SchemaFrozenException): - schema.apply_schema_contract({**base_settings, **{"columns": "freeze"}}, "tables", copy.deepcopy(data_with_new_row), table_update, True) + schema.apply_schema_contract({**base_settings, **{"columns": "freeze"}}, "tables", copy.deepcopy(data_with_new_row), table_update) # @@ -245,12 +290,12 @@ def test_check_adding_new_columns(base_settings) -> None: popped_table_update["columns"].pop("incomplete_column_1") # incomplete columns should be treated like new columns - assert schema.apply_schema_contract({**base_settings, **{"columns": "evolve"}}, "mixed_table", copy.deepcopy(data_with_new_row), table_update, True) == (data_with_new_row, table_update) - assert schema.apply_schema_contract({**base_settings, **{"columns": "discard_row"}}, "mixed_table", copy.deepcopy(data_with_new_row), table_update, True) == (None, None) - assert schema.apply_schema_contract({**base_settings, **{"columns": "discard_value"}}, "mixed_table", copy.deepcopy(data_with_new_row), table_update, True) == (data, popped_table_update) + assert schema.apply_schema_contract({**base_settings, **{"columns": "evolve"}}, "mixed_table", copy.deepcopy(data_with_new_row), table_update) == (data_with_new_row, table_update) + assert schema.apply_schema_contract({**base_settings, **{"columns": "discard_row"}}, "mixed_table", copy.deepcopy(data_with_new_row), table_update) == (None, None) + assert schema.apply_schema_contract({**base_settings, **{"columns": "discard_value"}}, "mixed_table", copy.deepcopy(data_with_new_row), table_update) == (data, popped_table_update) with pytest.raises(SchemaFrozenException): - schema.apply_schema_contract({**base_settings, **{"columns": "freeze"}}, "mixed_table", copy.deepcopy(data_with_new_row), table_update, True) + schema.apply_schema_contract({**base_settings, **{"columns": "freeze"}}, "mixed_table", copy.deepcopy(data_with_new_row), table_update) @@ -281,16 +326,16 @@ def test_check_adding_new_variant() -> None: popped_table_update = copy.deepcopy(table_update) popped_table_update["columns"].pop("column_2_variant") - assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update), True) == (data_with_new_row, table_update) - assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "discard_row"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update), True) == (None, None) - assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "discard_value"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update), True) == (data, popped_table_update) + assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (data_with_new_row, table_update) + assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "discard_row"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (None, None) + assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "discard_value"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (data, popped_table_update) with pytest.raises(SchemaFrozenException): - schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "freeze"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update), True) + schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "freeze"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) # check interaction with new columns settings, variants are new columns.. with pytest.raises(SchemaFrozenException): - assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve", "columns": "freeze"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update), True) == (data_with_new_row, table_update) + assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve", "columns": "freeze"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (data_with_new_row, table_update) - assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve", "columns": "discard_row"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update), True) == (None, None) - assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve", "columns": "discard_value"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update), True) == (data, popped_table_update) \ No newline at end of file + assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve", "columns": "discard_row"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (None, None) + assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve", "columns": "discard_value"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (data, popped_table_update) \ No newline at end of file diff --git a/tests/common/utils.py b/tests/common/utils.py index 4c68e32bf3..c80981df37 100644 --- a/tests/common/utils.py +++ b/tests/common/utils.py @@ -15,7 +15,7 @@ COMMON_TEST_CASES_PATH = "./tests/common/cases/" # for import schema tests, change when upgrading the schema version -IMPORTED_VERSION_HASH_ETH_V7 = "VuzNqiLOk7XuPxYLndMFMPHTDVItKU5ayiy70nQLdus=" +IMPORTED_VERSION_HASH_ETH_V7 = "yjMtV4Zv0IJlfR5DPMwuXxGg8BRhy7E79L26XAHWEGE=" # test sentry DSN TEST_SENTRY_DSN = "https://797678dd0af64b96937435326c7d30c1@o1061158.ingest.sentry.io/4504306172821504" # preserve secrets path to be able to restore it diff --git a/tests/extract/test_extract.py b/tests/extract/test_extract.py index 530a089f1c..b35de90c1a 100644 --- a/tests/extract/test_extract.py +++ b/tests/extract/test_extract.py @@ -3,7 +3,7 @@ from dlt.common.storages import NormalizeStorageConfiguration from dlt.extract.extract import ExtractorStorage, extract from dlt.extract.source import DltResource, DltSource - +from dlt.common.schema import Schema from tests.utils import clean_test_storage from tests.extract.utils import expect_extracted_file @@ -18,7 +18,7 @@ def expect_tables(resource: DltResource) -> dlt.Schema: storage = ExtractorStorage(NormalizeStorageConfiguration()) extract_id = storage.create_extract_id() - schema_update = extract(extract_id, source, storage) + schema_update = extract(extract_id, source, storage, pipeline_schema=Schema("some_schema")) # odd and even tables assert len(schema_update) == 2 assert "odd_table" in schema_update @@ -42,7 +42,7 @@ def expect_tables(resource: DltResource) -> dlt.Schema: source = source.with_resources(resource.name) source.selected_resources[resource.name].bind(10).select_tables("odd_table") extract_id = storage.create_extract_id() - schema_update = extract(extract_id, source, storage) + schema_update = extract(extract_id, source, storage, pipeline_schema=Schema("some_schema")) assert len(schema_update) == 1 assert "odd_table" in schema_update for partials in schema_update.values(): diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 3160a2a1ee..8a5ea84982 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -404,7 +404,6 @@ def some_data(last_timestamp=dlt.sources.incremental("item.timestamp")): @dlt.resource def standalone_some_data(now=None, last_timestamp=dlt.sources.incremental("item.timestamp")): for i in range(-10, 10): - print(i) yield {"delta": i, "item": {"timestamp": (now or pendulum.now()).add(days=i).timestamp()}} diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index c668d81073..0bcaa2b70b 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -740,11 +740,13 @@ def some_source(): p = dlt.pipeline(pipeline_name=uniq_id(), destination='dummy') p.run(source) - assert source.schema.tables['some_table']['resource'] == 'static_data' - assert source.schema.tables['dynamic_func_table']['resource'] == 'dynamic_func_data' - assert source.schema.tables['dynamic_mark_table']['resource'] == 'dynamic_mark_data' - assert source.schema.tables['parent_table']['resource'] == 'nested_data' - assert 'resource' not in source.schema.tables['parent_table__items'] + schema = p.default_schema + + assert schema.tables['some_table']['resource'] == 'static_data' + assert schema.tables['dynamic_func_table']['resource'] == 'dynamic_func_data' + assert schema.tables['dynamic_mark_table']['resource'] == 'dynamic_mark_data' + assert schema.tables['parent_table']['resource'] == 'nested_data' + assert 'resource' not in schema.tables['parent_table__items'] def test_preserve_fields_order() -> None: